import { execFile, spawn } from 'child_process' import interpreters from './interpreters' import modules from './modules' import kill from 'tree-kill' import { remote } from './remote' import { set_connected } from './rpc' import uuidv1 from 'uuid/v1' const idle_state = { status: 'IDLE', task: {} } export const state = { current_cpu_task: idle_state, current_gpu_task: idle_state, } export function get_current_cpu_task(){ return state.current_cpu_task } export function get_current_gpu_task(){ return state.current_gpu_task } export function get_current_task(processor) { if (processor === 'cpu') { return state.current_cpu_task } else { return state.current_gpu_task } } function serialize_task(t){ if (!t || t.status === 'IDLE') { return { status: 'IDLE', } } return { status: 'RUNNING', task: t.task, uuid: t.task.uuid, pid: t.subprocess.pid, } } function clear_task(is_gpu, task){ if (is_gpu) { if (state.current_gpu_task.task && state.current_gpu_task.task.uuid === task.uuid) { state.current_gpu_task = idle_state } } else { if (state.current_cpu_task.task && state.current_cpu_task.task.uuid === task.uuid) { state.current_cpu_task = idle_state } } } export function status () { return { cpu: serialize_task(state.current_cpu_task), gpu: serialize_task(state.current_gpu_task), } } export function build_params(module, task) { const activity = module.activities[task.activity] const interpreter = interpreters[activity.type] let opt_params; if (typeof activity.params === 'function') { opt_params = activity.params(task) } else { const opt = task.opt opt_params = Object.keys(opt).map(key => { const flag = '--' + key.replace(/-/g, '_') const value = opt[key] if (value === 'true') { return [flag] } return [flag, value] }).reduce((acc, cur) => acc.concat(cur), []) } const params = (interpreter.params || []) .concat([ activity.script ]) .concat(activity.params || []) .concat(opt_params) return { activity, interpreter, params } } export function run_system_command(cmd, cb) { console.log('running system command:', cmd) switch(cmd) { case 'nvidia-smi': case 'ps': case 'uptime': case 'w': execFile(cmd, cb) break case 'df': execFile('df', ['-h'], cb) break default: cb({ error: 'no such command' }) break } } export function run_task(task, preempt, watch){ const module = modules[task.module] if (! module) return { type: 'error', error: "No such module: " + task.module } const { activity, interpreter, params } = build_params(module, task) if (! interpreter) return { type: 'error', error: "No such interpreter: " + activity.interpreter } if (interpreter.gpu && state.current_gpu_task.status !== 'IDLE') { if (preempt) { console.log('preempting currently running GPU task') terminate(state.current_gpu_task.subprocess) } else { console.log('already running GPU task :(', state.current_gpu_task.pid) return { type: 'error', error: 'task already running on gpu' } } } else if (!interpreter.gpu && state.current_cpu_task.status !== 'IDLE') { if (preempt) { console.log('preempting currently running CPU task') terminate(state.current_cpu_task.subprocess) } else { console.log('already running CPU task :(') return { type: 'error', error: 'task already running on cpu' } } } console.log('running task', task.activity) console.log(module.cwd) console.log(interpreter.cmd, params) const subprocess = spawn(interpreter.cmd, params, { cwd: module.cwd, }) if (interpreter.gpu) { state.current_gpu_task = { subprocess, task, status: 'RUNNING' } } else { state.current_cpu_task = { subprocess, task, status: 'RUNNING' } } task.uuid = task.uuid || uuidv1() const processor = task.processor = interpreter.gpu ? 'gpu' : 'cpu' remote.emit('task_res', { type: 'task_begin', task }) watch && console.log("watching stdout..") subprocess.stdout.on('data', data => { watch && remote.emit('task_res', { type: 'stdout', processor, data: data.toString('utf8') }) }) subprocess.stderr.on('data', data => { watch && remote.emit('task_res', { type: 'stderr', processor, data: data.toString('utf8') }) }) let finished = false subprocess.on('error', (err) => { if (finished) return finished = true console.log('task error', subprocess.exitCode, err) clear_task(interpreter.gpu, task) remote.emit('task_res', { type: 'task_error', task, err }) set_connected(false) }) subprocess.on('close', () => { if (finished) return finished = true console.log('task ended', subprocess.exitCode || '') clear_task(interpreter.gpu, task) remote.emit('task_res', { type: 'task_finish', task }) set_connected(false) }) } export function stop_task(task){ if (state.current_cpu_task.task.uuid === task.uuid) { terminate(state.current_cpu_task.subprocess) return { status: 'ok' } } else if (state.current_gpu_task.task.uuid === task.uuid) { terminate(state.current_gpu_task.subprocess) return { status: 'ok' } } return { error: 'no such task' } } export function terminate(subprocess){ if (!subprocess) { return } console.log('kill pid', subprocess.pid) kill(subprocess.pid) }