summaryrefslogtreecommitdiff
path: root/app
diff options
context:
space:
mode:
authorJules Laplace <julescarbon@gmail.com>2018-06-04 23:57:23 +0200
committerJules Laplace <julescarbon@gmail.com>2018-06-04 23:57:23 +0200
commita960d57ad80a65d5cf028f2595b38ca07bb46b83 (patch)
tree58bc92a8c30511a1e4caa79f7ce5a3528d7ac35b /app
parent1ccc4c798d95133cde1d4558318f0640a772526a (diff)
updating task status on serverrrrrr
Diffstat (limited to 'app')
-rw-r--r--app/client/queue/queue.actions.js13
-rw-r--r--app/client/socket/socket.task.js39
-rw-r--r--app/client/system/system.actions.js2
-rw-r--r--app/client/system/system.component.js6
-rw-r--r--app/client/types.js6
-rw-r--r--app/relay/modules/test.js6
-rw-r--r--app/relay/queue.js7
-rw-r--r--app/relay/remote.js12
-rw-r--r--app/relay/runner.js8
-rw-r--r--app/server/bridge.js14
10 files changed, 73 insertions, 40 deletions
diff --git a/app/client/queue/queue.actions.js b/app/client/queue/queue.actions.js
index 1885e2c..6049e1b 100644
--- a/app/client/queue/queue.actions.js
+++ b/app/client/queue/queue.actions.js
@@ -18,4 +18,15 @@ export const add_task = (new_task) => (dispatch) => {
.then((task) => {
socket.task.add_task(task)
})
-} \ No newline at end of file
+}
+
+export const start_queue = (task, opt={}) => {
+ socket.task.start_queue(task, opt)
+ return { type: types.task.starting_queue, task, ...opt }
+}
+
+export const stop_queue = (task, opt={}) => {
+ socket.task.stop_queue(task, opt)
+ return { type: types.task.stopping_queue, task, ...opt }
+}
+
diff --git a/app/client/socket/socket.task.js b/app/client/socket/socket.task.js
index 44e6b27..7c581b8 100644
--- a/app/client/socket/socket.task.js
+++ b/app/client/socket/socket.task.js
@@ -6,7 +6,7 @@ import { socket } from './socket.connection'
let finishTimeout;
socket.on('task_res', (data) => {
- console.log('system response', data)
+ console.log('task response', data)
switch (data.type) {
case 'start':
// return dispatch({ type: types.system.rpc_connected, runner: data.runner })
@@ -55,34 +55,13 @@ socket.on('task_res', (data) => {
}
})
-export function add_task(task, opt={}) {
- socket.emit('task', {
- type: 'add',
- task,
- ...opt,
- })
+export function emit(type, task={}, opt={}) {
+ socket.emit('task', { type, task, ...opt, })
}
-export function remove_task(task, opt={}) {
- socket.emit('task', {
- type: 'remove',
- task,
- ...opt,
- })
-}
-
-export function start_task(task, opt={}) {
- socket.emit('task', {
- type: 'start',
- task,
- ...opt,
- })
-}
-
-export function stop_task(task, opt={}) {
- socket.emit('task', {
- type: 'stop',
- task,
- ...opt,
- })
-}
+export const add_task = (task, opt={}) => emit('add', task, opt)
+export const remove_task = (task, opt={}) => emit('remove', task, opt)
+export const start_task = (task, opt={}) => emit('start', task, opt)
+export const stop_task = (task, opt={}) => emit('stop', task, opt)
+export const start_queue = (opt={}) => emit('start_queue', {}, opt)
+export const stop_queue = (opt={}) => emit('stop_queue', {}, opt)
diff --git a/app/client/system/system.actions.js b/app/client/system/system.actions.js
index c661769..7039ff2 100644
--- a/app/client/system/system.actions.js
+++ b/app/client/system/system.actions.js
@@ -32,7 +32,7 @@ export const changeTool = (tool) => {
export const enqueue_test_task = (dataset) => dispatch => {
const task = {
module: 'test',
- activity: 'test',
+ activity: 'cpu',
dataset: dataset,
}
return actions.queue.add_task(task)
diff --git a/app/client/system/system.component.js b/app/client/system/system.component.js
index b4a0ed7..bf77237 100644
--- a/app/client/system/system.component.js
+++ b/app/client/system/system.component.js
@@ -94,7 +94,11 @@ class System extends Component {
<button onClick={() => actions.live.set_param('fruit', choice(fruits))}>Set</button>
</Param>
<Param title='Queue'>
- <button onClick={() => actions.system.enqueue_test_task(choice(fruits))}>+ Add</button>
+ <button onClick={() => actions.queue.start_queue()}>Start</button>
+ <button onClick={() => actions.queue.stop_queue()}>Stop</button>
+ </Param>
+ <Param title=''>
+ <button onClick={() => actions.system.enqueue_test_task(choice(fruits))}>+Add</button>
</Param>
</Group>
</div>
diff --git a/app/client/types.js b/app/client/types.js
index 0c7c785..2d4b359 100644
--- a/app/client/types.js
+++ b/app/client/types.js
@@ -26,9 +26,13 @@ export default {
]),
task: crud_type('task', [
'starting_task',
- 'task_begin',
'stopping_task',
+ 'task_begin',
'task_finish',
+ 'start_queue',
+ 'stop_queue',
+ 'starting_queue',
+ 'stopping_queue',
'progress',
'epoch',
]),
diff --git a/app/relay/modules/test.js b/app/relay/modules/test.js
index 57a324a..8876f7c 100644
--- a/app/relay/modules/test.js
+++ b/app/relay/modules/test.js
@@ -7,6 +7,12 @@ const cpu = {
type: 'perl',
script: 'test.pl',
params: '--train',
+ listen: (task, line, i) => {
+ if ( (parseInt(line) % 10) === 0) {
+ return { type: 'epoch', task, epoch: (i/10)|0 }
+ }
+ return null
+ }
}
const gpu = {
type: 'python',
diff --git a/app/relay/queue.js b/app/relay/queue.js
index 9f9e821..6c65e06 100644
--- a/app/relay/queue.js
+++ b/app/relay/queue.js
@@ -1,8 +1,10 @@
+// get the processor that uses the task and give it its own queue!
+
let queue = []
-let active = true
+let active = false
let status = 'waiting'
-export const is_active = () => active
+export const is_active = () => active && queue.length
export const get_status = () => status
export const activate = () => {
active = true
@@ -15,3 +17,4 @@ export const deactivate = reason => {
export const add_task = task => queue.push(task)
export const remove_task = task => queue = queue.filter(t => t.id !== task.id)
export const get_next_task = () => queue.shift()
+export const list_tasks = () => queue
diff --git a/app/relay/remote.js b/app/relay/remote.js
index 6fee366..935298a 100644
--- a/app/relay/remote.js
+++ b/app/relay/remote.js
@@ -51,19 +51,25 @@ remote.on('task', (data) => {
break
case 'add':
queue.add_task(data.task)
- queue.activate()
- runner.start_queue()
+ if (! queue.is_active()) {
+ queue.activate()
+ runner.start_queue()
+ }
break
case 'remove':
queue.remove_task(data.task)
break
case 'start_queue':
- queue.activate()
+ if (! queue.is_active()) {
+ queue.activate()
+ runner.start_queue()
+ }
break
case 'stop_queue':
queue.deactivate('user')
break
case 'list':
+ response = { type: 'list', tasks: queue.list_tasks() }
break
case 'set_priority':
break
diff --git a/app/relay/runner.js b/app/relay/runner.js
index 4b5522d..734e3ea 100644
--- a/app/relay/runner.js
+++ b/app/relay/runner.js
@@ -189,11 +189,14 @@ export function run_task(task, preempt=false, watch=false){
const module = modules[task.module]
if (! module) return { type: 'error', error: "No such module: " + task.module }
const activity = module.activities[task.activity]
+console.log(task)
+ if (! activity) return { type: 'error', error: 'No such activity in module: ' + task.module + ' ' + task.activity }
return run_task_with_activity(task, module, activity, preempt, watch)
}
export function run_task_with_activity(task, module, activity, preempt=false, watch=false) {
const { interpreter, params } = build_params(module, activity, task)
+console.log(activity)
if (! interpreter) return { type: 'error', error: "No such interpreter: " + activity.interpreter }
if (interpreter.gpu && state.current_gpu_task.status !== 'IDLE') {
@@ -298,8 +301,13 @@ export function run_task_with_activity(task, module, activity, preempt=false, wa
return task
}
+export function start_queue(){
+ run_next_task()
+}
+
export function run_next_task(){
if (queue.is_active()) {
+ console.log(queue.list_tasks())
const task = queue.get_next_task()
return run_task(task)
}
diff --git a/app/server/bridge.js b/app/server/bridge.js
index ef586ac..8d0222c 100644
--- a/app/server/bridge.js
+++ b/app/server/bridge.js
@@ -50,7 +50,19 @@ function bind_relay(socket) {
// messages related to queuing and tasks
socket.on('task_res', data => {
- client.emit('task_res', data)
+ console.log(data)
+ if (data.task && data.task.id) {
+ delete data.task.created_at
+ delete data.task.updated_at
+ db.models.task.update(data.task.id, data.task)
+ .then(task => {
+ data.task = task
+ client.emit('task_res', data)
+ })
+ }
+ else {
+ client.emit('task_res', data)
+ }
})
// // data responses from the server, telling us that files, folders, etc were created