// monitors which process is currently running // kills it if need be.... murder import { execFile, spawn } from 'child_process' import interpreters from './interpreters' import modules from './modules' import { kill } from 'tree-kill' import { remote } from './remote' export const state = { current_cpu_task: null, current_gpu_task: null, } export function get_current_cpu_task(){ return state.current_cpu_task } export function get_current_gpu_task(){ return state.current_gpu_task } export function get_current_task(processor) { if (processor === 'cpu') { return state.current_cpu_task } else { return state.current_gpu_task } } export function status () { return state } export function build_params(module, task) { const activity = module.activities[task.activity] const interpreter = interpreters[activity.type] let opt_params; if (activity.build_params) { opt_params = activity.build_params(task) } else { const opt = task.opt opt_params = Object.keys(opt).map(key => { const flag = '--' + key.replace(/-/g, '_') const value = opt[key] if (value === 'true') { return [flag] } return [flag, value] }).reduce((acc, cur) => acc.concat(cur), []) } const params = [ activity.script ].concat(activity.params || []).concat(opt_params) return { activity, interpreter, params } } export function run_system_command(cmd, cb) { console.log('running system command:', cmd) switch(cmd) { case 'nvidia-smi': case 'ps': case 'uptime': case 'w': execFile(cmd, cb) break case 'df': execFile('df', ['-h'], cb) break default: cb({ error: 'no such command' }) break } } export function run_task(task, preempt, watch){ const module = modules[task.module] if (! module) return { type: 'error', error: "No such module: " + task.module } const { activity, interpreter, params } = build_params(module, task) if (! interpreter) return { type: 'error', error: "No such interpreter: " + activity.interpreter } if (activity.cpu && state.current_cpu_task) { if (preempt) { kill_task(state.current_cpu_task) console.log('preempting currently running CPU task') } else { console.log('already running CPU task') return { type: 'error', error: 'task already running on cpu' } } } else if (state.current_gpu_task) { if (preempt) { console.log('preempting currently running GPU task') kill_task(state.current_gpu_task) } else { console.log('already running GPU task', state.current_gpu_task.pid) return { type: 'error', error: 'task already running on gpu' } } } console.log(activity, interpreter) console.log('running task', task.activity) console.log(module.cwd) console.log(interpreter.cmd, params) const subprocess = spawn(interpreter.cmd, params, { cwd: module.cwd, }) if (interpreter.gpu) { state.current_gpu_task = subprocess } else { state.current_cpu_task = subprocess } remote.emit('task_res', { type: 'task_begin', task }) if (watch) { console.log("watching stdout..") subprocess.stdout.on('data', data => { remote.emit('task_res', { type: 'stdout', data: data.toString('utf8') }) }) subprocess.stderr.on('data', data => { remote.emit('task_res', { type: 'stderr', data: data.toString('utf8') }) }) } subprocess.on('error', (err) => { console.log('task error', subprocess.pid, err) remote.emit('task_res', { type: 'task_error', task, err }) }) subprocess.on('close', () => { console.log('task ended', subprocess.pid) remote.emit('task_res', { type: 'task_finish', task }) }) } export function kill_task(subprocess){ console.log('kill pid', subprocess.pid) kill(subprocess.pid) }