summaryrefslogtreecommitdiff
path: root/app
diff options
context:
space:
mode:
authorJules Laplace <julescarbon@gmail.com>2018-06-04 22:19:11 +0200
committerJules Laplace <julescarbon@gmail.com>2018-06-04 22:19:11 +0200
commitcfe98e6eef5ca24b5c2656fcda8e3fac71d55a1d (patch)
treefd8724292ce4309b96811160261fbcb4fa30fe06 /app
parentd38fd8419223560bc82f6153de80f889bbd75b01 (diff)
going over all this task stuff
Diffstat (limited to 'app')
-rw-r--r--app/client/modules/samplernn/samplernn.actions.js15
-rw-r--r--app/client/queue/queue.reducer.js2
-rw-r--r--app/client/socket/socket.task.js16
-rw-r--r--app/relay/remote.js6
-rw-r--r--app/relay/runner.js30
5 files changed, 42 insertions, 27 deletions
diff --git a/app/client/modules/samplernn/samplernn.actions.js b/app/client/modules/samplernn/samplernn.actions.js
index c196cc5..ef3e417 100644
--- a/app/client/modules/samplernn/samplernn.actions.js
+++ b/app/client/modules/samplernn/samplernn.actions.js
@@ -282,18 +282,3 @@ export const fetch_url = (url) => (dispatch) => {
opt: { url }
}, { preempt: true, watch: true })
}
-
-export const train_task_now = (dataset, epochs=1) => (dispatch) => {
- const task = {
- module: 'samplernn',
- activity: 'train',
- dataset: dataset,
- epochs: epochs,
- opt: {
- sample_length: 44100 * 5,
- n_samples: 6,
- keep_old_checkpoints: false,
- }
- }
- return actions.queue.add_task(task)
-} \ No newline at end of file
diff --git a/app/client/queue/queue.reducer.js b/app/client/queue/queue.reducer.js
index 02d6943..05cd015 100644
--- a/app/client/queue/queue.reducer.js
+++ b/app/client/queue/queue.reducer.js
@@ -25,7 +25,7 @@ const queueReducer = (state = queueInitialState, action) => {
queue: state.queue.concat([action.data]),
}
case types.task.index:
- console.log(action.data)
+ console.log(action.data)
return {
...state,
tasks: action.data.reduce((a,b) => (a[b.id] = b, a), {}),
diff --git a/app/client/socket/socket.task.js b/app/client/socket/socket.task.js
index 595066e..2f3c65a 100644
--- a/app/client/socket/socket.task.js
+++ b/app/client/socket/socket.task.js
@@ -47,6 +47,22 @@ socket.on('task_res', (data) => {
}
})
+export function add_task(task, opt={}) {
+ socket.emit('task', {
+ type: 'add',
+ task,
+ ...opt,
+ })
+}
+
+export function remove_task(task, opt={}) {
+ socket.emit('task', {
+ type: 'remove',
+ task,
+ ...opt,
+ })
+}
+
export function start_task(task, opt={}) {
socket.emit('task', {
type: 'start',
diff --git a/app/relay/remote.js b/app/relay/remote.js
index 252258f..13a613c 100644
--- a/app/relay/remote.js
+++ b/app/relay/remote.js
@@ -48,12 +48,18 @@ remote.on('task', (data) => {
response = runner.stop_task(data.task)
break
case 'add':
+ queue.add_task(data.task)
+ queue.activate()
+ runner.start_queue()
break
case 'remove':
+ queue.remove_task(data.task)
break
case 'start_queue':
+ queue.activate()
break
case 'stop_queue':
+ queue.deactivate('user')
break
case 'list':
break
diff --git a/app/relay/runner.js b/app/relay/runner.js
index 1a72025..94b7779 100644
--- a/app/relay/runner.js
+++ b/app/relay/runner.js
@@ -8,6 +8,8 @@ import uuidv1 from 'uuid/v1'
import * as fs from 'fs'
import * as path from 'path'
+import * as queue from 'queue'
+
const idle_state = { status: 'IDLE', task: {} }
export const state = {
@@ -182,14 +184,15 @@ export function run_script(task, cb) {
}, cb)
}
-export function run_task(task, preempt, watch){
+export function run_task(task, preempt=false, watch=false){
+ if (! task) return null
const module = modules[task.module]
if (! module) return { type: 'error', error: "No such module: " + task.module }
const activity = module.activities[task.activity]
- run_task_with_activity(task, module, activity, preempt, watch)
+ return run_task_with_activity(task, module, activity, preempt, watch)
}
-export function run_task_with_activity(task, module, activity, preempt, watch) {
+export function run_task_with_activity(task, module, activity, preempt=false, watch=false) {
const { interpreter, params } = build_params(module, activity, task)
if (! interpreter) return { type: 'error', error: "No such interpreter: " + activity.interpreter }
@@ -199,7 +202,7 @@ export function run_task_with_activity(task, module, activity, preempt, watch) {
terminate(state.current_gpu_task)
} else {
console.log('already running GPU task :(', state.current_gpu_task.pid)
- return { type: 'error', error: 'task already running on gpu' }
+ return { type: 'error', preempt: false, error: 'task already running on gpu' }
}
} else if (!interpreter.gpu && state.current_cpu_task.status !== 'IDLE') {
if (preempt) {
@@ -207,7 +210,7 @@ export function run_task_with_activity(task, module, activity, preempt, watch) {
terminate(state.current_cpu_task)
} else {
console.log('already running CPU task :(')
- return { type: 'error', error: 'task already running on cpu' }
+ return { type: 'error', preempt: false, error: 'task already running on cpu' }
}
}
@@ -215,7 +218,9 @@ export function run_task_with_activity(task, module, activity, preempt, watch) {
console.log(module.cwd)
console.log(interpreter.cmd, params)
+ task.uuid = task.uuid || uuidv1()
task.started = new Date().toString()
+ task.processing = true
const subprocess = spawn(interpreter.cmd, params, {
cwd: module.cwd,
})
@@ -230,7 +235,6 @@ export function run_task_with_activity(task, module, activity, preempt, watch) {
}
}
- task.uuid = task.uuid || uuidv1()
const processor = task.processor = interpreter.gpu ? 'gpu' : 'cpu'
remote.emit('task_res', { type: 'task_begin', task })
@@ -248,6 +252,7 @@ export function run_task_with_activity(task, module, activity, preempt, watch) {
subprocess.on('error', (err) => {
if (finished) return
finished = true
+ task.processing = false
console.log('task error', subprocess.exitCode, err)
finish({
type: 'task_error',
@@ -260,6 +265,8 @@ export function run_task_with_activity(task, module, activity, preempt, watch) {
finished = true
console.log('task ended', subprocess.exitCode || '')
set_connected(false)
+ task.processing = false
+ task.completed = true
finish({
type: 'task_finish',
task,
@@ -277,14 +284,15 @@ export function run_task_with_activity(task, module, activity, preempt, watch) {
}
return run_next_task()
}
+
+ return task
}
export function run_next_task(){
- // get next task from the queue...
- /*
- const task = queue.get_next_task()
- return run_task(task)
- */
+ if (queue.is_active()) {
+ const task = queue.get_next_task()
+ return run_task(task)
+ }
}
export function stop_task(task){