summaryrefslogtreecommitdiff
path: root/app
diff options
context:
space:
mode:
authorJules Laplace <julescarbon@gmail.com>2018-05-26 23:51:54 +0200
committerJules Laplace <julescarbon@gmail.com>2018-05-26 23:51:54 +0200
commit1d4fca365ae76f193c05da6eb1d58b41b171e359 (patch)
treec3cfd57d6bd0b9771efaccd957f63bb3f6f60ae6 /app
parent2fa89d40071e4afffa2aeb1805eecf2f1c148cf0 (diff)
running basic tasks, monitoring stdout!
Diffstat (limited to 'app')
-rw-r--r--app/client/socket/socket.task.js38
-rw-r--r--app/client/system/system.component.js6
-rw-r--r--app/client/task/task.actions.js4
-rw-r--r--app/relay/modules/test.js8
-rw-r--r--app/relay/runner.js62
5 files changed, 80 insertions, 38 deletions
diff --git a/app/client/socket/socket.task.js b/app/client/socket/socket.task.js
index d65ed13..938c533 100644
--- a/app/client/socket/socket.task.js
+++ b/app/client/socket/socket.task.js
@@ -6,18 +6,47 @@ import { socket } from './socket.connection'
socket.on('task_res', (data) => {
console.log('system response', data)
switch (data.type) {
- // case 'rpc_connected':
- // return dispatch({ type: types.system.rpc_connected, runner: data.runner })
+ case 'start':
+ // return dispatch({ type: types.system.rpc_connected, runner: data.runner })
+ break
+ case 'task_begin':
+ break
+ case 'stop':
+ break
+ case 'task_finish':
+ break
+ case 'kill':
+ break
+ case 'stdout':
+ console.log(data.data)
+ break
+ case 'stderr':
+ console.log(data.data)
+ break
+ case 'add':
+ break
+ case 'remove':
+ break
+ case 'start_queue':
+ break
+ case 'stop_queue':
+ break
+ case 'list':
+ break
+ case 'set_priority':
+ break
+ case 'error':
+ return console.log('task error', data)
default:
return console.log('no such task command', data.type)
}
})
-export function start_task(task, preempt) {
+export function start_task(task, opt={}) {
socket.emit('task', {
type: 'start',
task,
- preempt,
+ ...opt,
})
}
@@ -25,5 +54,6 @@ export function stop_task(task) {
socket.emit('task', {
type: 'stop',
task,
+ ...opt,
})
}
diff --git a/app/client/system/system.component.js b/app/client/system/system.component.js
index 5d75964..f8cf139 100644
--- a/app/client/system/system.component.js
+++ b/app/client/system/system.component.js
@@ -10,21 +10,21 @@ import * as taskActions from '../task/task.actions'
const cpu_test_task = {
activity: 'cpu',
- library: 'test',
+ module: 'test',
dataset: 'test',
epochs: 1,
opt: {}
}
const gpu_test_task = {
activity: 'gpu',
- library: 'test',
+ module: 'test',
dataset: 'test',
epochs: 1,
opt: {}
}
const live_test_task = {
activity: 'live',
- library: 'test',
+ module: 'test',
dataset: 'test',
epochs: 1,
opt: {}
diff --git a/app/client/task/task.actions.js b/app/client/task/task.actions.js
index ea3dfff..6e39e71 100644
--- a/app/client/task/task.actions.js
+++ b/app/client/task/task.actions.js
@@ -2,11 +2,11 @@ import socket from '../socket'
import types from '../types'
export const start_task = (task, opt={}) => {
- socket.task.start_task(task)
+ socket.task.start_task(task, opt)
return { type: types.task.starting_task, task, ...opt }
}
export const stop_task = (task, opt={}) => {
- socket.task.stop_task(task)
+ socket.task.stop_task(task, opt)
return { type: types.task.stopping_task, task, ...opt }
}
diff --git a/app/relay/modules/test.js b/app/relay/modules/test.js
index 6c94a0c..5619159 100644
--- a/app/relay/modules/test.js
+++ b/app/relay/modules/test.js
@@ -1,19 +1,17 @@
import path from 'path'
const name = 'test'
-const cwd = process.env.TEST_CWD || './test/module/'
+const cwd = process.env.TEST_CWD || process.cwd() + '/test/module/'
const cpu = {
type: 'perl',
script: 'test.pl',
- params: (task) => {
- }
+ params: '--train',
}
const gpu = {
type: 'python',
script: 'test.pl',
- params: (task) => {
- }
+ params: '--test',
}
const live = {
type: 'pytorch',
diff --git a/app/relay/runner.js b/app/relay/runner.js
index e7aca72..f15e39b 100644
--- a/app/relay/runner.js
+++ b/app/relay/runner.js
@@ -35,12 +35,13 @@ export function status () {
export function build_params(module, task) {
const activity = module.activities[task.activity]
const interpreter = interpreters[activity.type]
- if (typeof activity.params === 'function') {
- params = activity.params(task)
+ let opt_params;
+ if (activity.build_params) {
+ opt_params = activity.build_params(task)
}
else {
- const opt = JSON.parse(task.opt)
- const opt_params = Object.keys(opt).map(key => {
+ const opt = task.opt
+ opt_params = Object.keys(opt).map(key => {
const flag = '--' + key.replace(/-/g, '_')
const value = opt[key]
if (value === 'true') {
@@ -48,10 +49,11 @@ export function build_params(module, task) {
}
return [flag, value]
}).reduce((acc, cur) => acc.concat(cur), [])
- params = [ activity.script ].concat(activity.params || []).concat(opt_params)
}
+ const params = [ activity.script ].concat(activity.params || []).concat(opt_params)
return {
activity,
+ interpreter,
params
}
}
@@ -76,50 +78,62 @@ export function run_system_command(cmd, cb) {
export function run_task(task, preempt, watch){
const module = modules[task.module]
- if (! module) throw new Error("No such module")
+ if (! module) return { type: 'error', error: "No such module: " + task.module }
const { activity, interpreter, params } = build_params(module, task)
+ if (! interpreter) return { type: 'error', error: "No such interpreter: " + activity.interpreter }
if (activity.cpu && state.current_cpu_task) {
if (preempt) {
- console.log('preempting currently running GPU task')
+ kill_task(state.current_cpu_task)
+ console.log('preempting currently running CPU task')
} else {
+ console.log('already running CPU task')
return { type: 'error', error: 'task already running on cpu' }
}
- } else {
+ } else if (state.current_gpu_task) {
if (preempt) {
- console.log('preempting currently running CPU task')
+ console.log('preempting currently running GPU task')
+ kill_task(state.current_gpu_task)
} else {
- return { type: 'error', error: 'task already running on cpu' }
+ console.log('already running GPU task', state.current_gpu_task.pid)
+ return { type: 'error', error: 'task already running on gpu' }
}
}
+ console.log(activity, interpreter)
- console.log('running task', activity.name)
- console.log(activity.interpreter, activity.script, params)
- const subprocess = spawn(activity.interpreter, params)
- if (activity.gpu) {
+ console.log('running task', task.activity)
+ console.log(module.cwd)
+ console.log(interpreter.cmd, params)
+ const subprocess = spawn(interpreter.cmd, params, {
+ cwd: module.cwd,
+ })
+ if (interpreter.gpu) {
state.current_gpu_task = subprocess
}
else {
state.current_cpu_task = subprocess
}
+ remote.emit('task_res', { type: 'task_begin', task })
+ if (watch) {
+ console.log("watching stdout..")
+ subprocess.stdout.on('data', data => {
+ remote.emit('task_res', { type: 'stdout', data: data.toString('utf8') })
+ })
+ subprocess.stderr.on('data', data => {
+ remote.emit('task_res', { type: 'stderr', data: data.toString('utf8') })
+ })
+ }
subprocess.on('error', (err) => {
- console.log('process error', subprocess.pid, err)
+ console.log('task error', subprocess.pid, err)
remote.emit('task_res', { type: 'task_error', task, err })
})
subprocess.on('close', () => {
- console.log('process ended', subprocess.pid)
+ console.log('task ended', subprocess.pid)
remote.emit('task_res', { type: 'task_finish', task })
})
- if (watch) {
- response.stdout.on('data', data => {
- remote.emit('task_res', { type: 'stdout', data })
- })
- response.stderr.on('data', data => {
- remote.emit('task_res', { type: 'stderr', data })
- })
- }
}
export function kill_task(subprocess){
+ console.log('kill pid', subprocess.pid)
kill(subprocess.pid)
} \ No newline at end of file