diff options
| author | Jules Laplace <julescarbon@gmail.com> | 2018-05-26 23:51:54 +0200 |
|---|---|---|
| committer | Jules Laplace <julescarbon@gmail.com> | 2018-05-26 23:51:54 +0200 |
| commit | 1d4fca365ae76f193c05da6eb1d58b41b171e359 (patch) | |
| tree | c3cfd57d6bd0b9771efaccd957f63bb3f6f60ae6 /app | |
| parent | 2fa89d40071e4afffa2aeb1805eecf2f1c148cf0 (diff) | |
running basic tasks, monitoring stdout!
Diffstat (limited to 'app')
| -rw-r--r-- | app/client/socket/socket.task.js | 38 | ||||
| -rw-r--r-- | app/client/system/system.component.js | 6 | ||||
| -rw-r--r-- | app/client/task/task.actions.js | 4 | ||||
| -rw-r--r-- | app/relay/modules/test.js | 8 | ||||
| -rw-r--r-- | app/relay/runner.js | 62 |
5 files changed, 80 insertions, 38 deletions
diff --git a/app/client/socket/socket.task.js b/app/client/socket/socket.task.js index d65ed13..938c533 100644 --- a/app/client/socket/socket.task.js +++ b/app/client/socket/socket.task.js @@ -6,18 +6,47 @@ import { socket } from './socket.connection' socket.on('task_res', (data) => { console.log('system response', data) switch (data.type) { - // case 'rpc_connected': - // return dispatch({ type: types.system.rpc_connected, runner: data.runner }) + case 'start': + // return dispatch({ type: types.system.rpc_connected, runner: data.runner }) + break + case 'task_begin': + break + case 'stop': + break + case 'task_finish': + break + case 'kill': + break + case 'stdout': + console.log(data.data) + break + case 'stderr': + console.log(data.data) + break + case 'add': + break + case 'remove': + break + case 'start_queue': + break + case 'stop_queue': + break + case 'list': + break + case 'set_priority': + break + case 'error': + return console.log('task error', data) default: return console.log('no such task command', data.type) } }) -export function start_task(task, preempt) { +export function start_task(task, opt={}) { socket.emit('task', { type: 'start', task, - preempt, + ...opt, }) } @@ -25,5 +54,6 @@ export function stop_task(task) { socket.emit('task', { type: 'stop', task, + ...opt, }) } diff --git a/app/client/system/system.component.js b/app/client/system/system.component.js index 5d75964..f8cf139 100644 --- a/app/client/system/system.component.js +++ b/app/client/system/system.component.js @@ -10,21 +10,21 @@ import * as taskActions from '../task/task.actions' const cpu_test_task = { activity: 'cpu', - library: 'test', + module: 'test', dataset: 'test', epochs: 1, opt: {} } const gpu_test_task = { activity: 'gpu', - library: 'test', + module: 'test', dataset: 'test', epochs: 1, opt: {} } const live_test_task = { activity: 'live', - library: 'test', + module: 'test', dataset: 'test', epochs: 1, opt: {} diff --git a/app/client/task/task.actions.js b/app/client/task/task.actions.js index ea3dfff..6e39e71 100644 --- a/app/client/task/task.actions.js +++ b/app/client/task/task.actions.js @@ -2,11 +2,11 @@ import socket from '../socket' import types from '../types' export const start_task = (task, opt={}) => { - socket.task.start_task(task) + socket.task.start_task(task, opt) return { type: types.task.starting_task, task, ...opt } } export const stop_task = (task, opt={}) => { - socket.task.stop_task(task) + socket.task.stop_task(task, opt) return { type: types.task.stopping_task, task, ...opt } } diff --git a/app/relay/modules/test.js b/app/relay/modules/test.js index 6c94a0c..5619159 100644 --- a/app/relay/modules/test.js +++ b/app/relay/modules/test.js @@ -1,19 +1,17 @@ import path from 'path' const name = 'test' -const cwd = process.env.TEST_CWD || './test/module/' +const cwd = process.env.TEST_CWD || process.cwd() + '/test/module/' const cpu = { type: 'perl', script: 'test.pl', - params: (task) => { - } + params: '--train', } const gpu = { type: 'python', script: 'test.pl', - params: (task) => { - } + params: '--test', } const live = { type: 'pytorch', diff --git a/app/relay/runner.js b/app/relay/runner.js index e7aca72..f15e39b 100644 --- a/app/relay/runner.js +++ b/app/relay/runner.js @@ -35,12 +35,13 @@ export function status () { export function build_params(module, task) { const activity = module.activities[task.activity] const interpreter = interpreters[activity.type] - if (typeof activity.params === 'function') { - params = activity.params(task) + let opt_params; + if (activity.build_params) { + opt_params = activity.build_params(task) } else { - const opt = JSON.parse(task.opt) - const opt_params = Object.keys(opt).map(key => { + const opt = task.opt + opt_params = Object.keys(opt).map(key => { const flag = '--' + key.replace(/-/g, '_') const value = opt[key] if (value === 'true') { @@ -48,10 +49,11 @@ export function build_params(module, task) { } return [flag, value] }).reduce((acc, cur) => acc.concat(cur), []) - params = [ activity.script ].concat(activity.params || []).concat(opt_params) } + const params = [ activity.script ].concat(activity.params || []).concat(opt_params) return { activity, + interpreter, params } } @@ -76,50 +78,62 @@ export function run_system_command(cmd, cb) { export function run_task(task, preempt, watch){ const module = modules[task.module] - if (! module) throw new Error("No such module") + if (! module) return { type: 'error', error: "No such module: " + task.module } const { activity, interpreter, params } = build_params(module, task) + if (! interpreter) return { type: 'error', error: "No such interpreter: " + activity.interpreter } if (activity.cpu && state.current_cpu_task) { if (preempt) { - console.log('preempting currently running GPU task') + kill_task(state.current_cpu_task) + console.log('preempting currently running CPU task') } else { + console.log('already running CPU task') return { type: 'error', error: 'task already running on cpu' } } - } else { + } else if (state.current_gpu_task) { if (preempt) { - console.log('preempting currently running CPU task') + console.log('preempting currently running GPU task') + kill_task(state.current_gpu_task) } else { - return { type: 'error', error: 'task already running on cpu' } + console.log('already running GPU task', state.current_gpu_task.pid) + return { type: 'error', error: 'task already running on gpu' } } } + console.log(activity, interpreter) - console.log('running task', activity.name) - console.log(activity.interpreter, activity.script, params) - const subprocess = spawn(activity.interpreter, params) - if (activity.gpu) { + console.log('running task', task.activity) + console.log(module.cwd) + console.log(interpreter.cmd, params) + const subprocess = spawn(interpreter.cmd, params, { + cwd: module.cwd, + }) + if (interpreter.gpu) { state.current_gpu_task = subprocess } else { state.current_cpu_task = subprocess } + remote.emit('task_res', { type: 'task_begin', task }) + if (watch) { + console.log("watching stdout..") + subprocess.stdout.on('data', data => { + remote.emit('task_res', { type: 'stdout', data: data.toString('utf8') }) + }) + subprocess.stderr.on('data', data => { + remote.emit('task_res', { type: 'stderr', data: data.toString('utf8') }) + }) + } subprocess.on('error', (err) => { - console.log('process error', subprocess.pid, err) + console.log('task error', subprocess.pid, err) remote.emit('task_res', { type: 'task_error', task, err }) }) subprocess.on('close', () => { - console.log('process ended', subprocess.pid) + console.log('task ended', subprocess.pid) remote.emit('task_res', { type: 'task_finish', task }) }) - if (watch) { - response.stdout.on('data', data => { - remote.emit('task_res', { type: 'stdout', data }) - }) - response.stderr.on('data', data => { - remote.emit('task_res', { type: 'stderr', data }) - }) - } } export function kill_task(subprocess){ + console.log('kill pid', subprocess.pid) kill(subprocess.pid) }
\ No newline at end of file |
