import { spawn } from 'child_process' import interpreters from './interpreters' import modules from './modules' import kill from 'tree-kill' import { remote } from './remote' import { set_connected } from './rpc' import uuidv1 from 'uuid/v1' import * as q from './queue' const idle_state = { status: 'IDLE', task: {} } export const state = { current_cpu_task: idle_state, current_gpu_task: idle_state, } export function status() { return { cpu: serialize_task(state.current_cpu_task), gpu: serialize_task(state.current_gpu_task), } } export function get_current_cpu_task(){ return state.current_cpu_task } export function get_current_gpu_task(){ return state.current_gpu_task } export function get_current_task(processor) { if (processor === 'cpu') { return state.current_cpu_task } else { return state.current_gpu_task } } function serialize_task(t){ if (!t || t.status === 'IDLE') { return { status: 'IDLE', } } if (!t.subprocess) { // TODO: figure out why there's no subprocess here return { status: 'IDLE', } } return { status: 'RUNNING', task: t.task, uuid: t.task.uuid, pid: t.subprocess.pid, } } function clear_task(is_gpu, task){ if (is_gpu) { if (state.current_gpu_task.task && state.current_gpu_task.task.uuid === task.uuid) { state.current_gpu_task.processor = null state.current_gpu_task = idle_state } } else { if (state.current_cpu_task.task && state.current_cpu_task.task.uuid === task.uuid) { state.current_gpu_task.processor = null state.current_cpu_task = idle_state } } } export function build_params(module, activity, task) { const interpreter = interpreters[activity.type] let opt_params, activity_params; if (typeof activity.params === 'function') { opt_params = activity.params(task) if (opt_params === 'CANCEL') { return { cancelled: true }; } activity_params = [] } else { const opt = task.opt || {} opt_params = Object.keys(opt).map(key => { const flag = '--' + key.replace(/-/g, '_') const value = opt[key] if (value === 'true') { return [flag] } return [flag, value] }).reduce((acc, cur) => acc.concat(cur), []) activity_params = activity.params } const params = (interpreter.params || []) .concat([ activity.script ]) .concat(activity_params) .concat(opt_params) return { activity, interpreter, params } } export function get_processor(task){ if (! task) return null const module = modules[task.module] if (! module) return null const activity = module.activities[task.activity] if (! activity) return null const interpreter = interpreters[activity.type] return interpreter.gpu ? 'gpu' : 'cpu' } export function get_queue(task){ const processor = get_processor(task) if (!processor) return null return (processor === 'cpu') ? q.cpu : q.gpu } export function run_task(task, preempt=false, watch=false){ if (! task) return null const module = modules[task.module] if (! module) return { type: 'error', error: "No such module: " + task.module } const activity = task.activity_object || module.activities[task.activity] delete task.activity_object if (! activity) return { type: 'error', error: 'No such activity in module: ' + task.module + ' ' + task.activity } return run_task_with_activity(task, module, activity, preempt, watch) } export function run_task_with_activity(task, module, activity, preempt=false, watch=false) { const { interpreter, params, cancelled } = build_params(module, activity, task) if (cancelled) return finish({ type: 'task_error', task, err: "Task generator cancelled task", }) if (! interpreter) return { type: 'error', error: "No such interpreter: " + activity.interpreter } if (interpreter.gpu && state.current_gpu_task.status !== 'IDLE') { if (preempt) { console.log('preempting currently running GPU task') terminate(state.current_gpu_task) } else { console.log('already running GPU task :(', state.current_gpu_task.pid) return { type: 'error', busy: true, preempt: false, error: 'task already running on gpu' } } } else if (!interpreter.gpu && state.current_cpu_task.status !== 'IDLE') { if (preempt) { console.log('preempting currently running CPU task') terminate(state.current_cpu_task) } else { console.log('already running CPU task :(') return { type: 'error', busy: true, preempt: false, error: 'task already running on cpu' } } } console.log('running task', task.activity) console.log(module.cwd) console.log(interpreter.cmd, params) task.uuid = task.uuid || uuidv1() task.started = new Date().toString() task.processing = true const subprocess = spawn(interpreter.cmd, params, { cwd: module.cwd, env: module.env || {}, }) if (interpreter.gpu) { state.current_gpu_task = { subprocess, task, status: 'RUNNING' } } else { state.current_cpu_task = { subprocess, task, status: 'RUNNING' } } const processor = task.processor = interpreter.gpu ? 'gpu' : 'cpu' remote.emit('task_res', { type: 'task_begin', task }) watch && console.log("watching stdout..") let count = 0 subprocess.stdout.on('data', buf => { const data = buf.toString('utf8') watch && remote.emit('task_res', { type: 'stdout', processor, data }) if (activity.listen) { const res = activity.listen(task, data, count++) if (res) remote.emit('task_res', res) } }) subprocess.stderr.on('data', buf => { const data = buf.toString('utf8') watch && remote.emit('task_res', { type: 'stderr', processor, data }) if (activity.listen) { const res = activity.listen(task, data, count++) if (res) remote.emit('task_res', res) } }) let finished = false subprocess.on('error', (err) => { if (finished) return console.log('already finished') finished = true task.processing = false task.completed = true task.success = false console.log('>>> task error', subprocess.exitCode, err) finish({ type: 'task_error', task, err, }) }) subprocess.on('close', () => { if (finished) return console.log('already finished') finished = true console.log('>>> task ended', subprocess.exitCode || '') task.processing = false task.completed = true task.success = true finish({ type: 'task_finish', task, }) }) function finish (task_res) { remote.emit('task_res', task_res) interpreter && clear_task(interpreter.gpu, task_res.task) set_connected(false) // remove task from queue..? // queue.remove_task(task) console.log('success?', task.success, activity) if (task.success && activity.after) { if (activity.after in modules[task.module].activities) { const after_activity = modules[task.module].activities[activity.after] const task_result = run_task_with_activity(task, module, after_activity, preempt, watch) if (task_result.busy) { task.activity_object = after_activity const interpreter = interpreters[activity.type] if (interpreter.gpu) { q.gpu.add_next_task(task) } else { q.cpu.add_next_task(task) } } return task_result } else { console.warn('no such after activity:', task.module, activity.after) } } return run_next_task() } return task } export function start_queue () { console.log('starting queue!') const res = run_next_task() console.log(res) } export function run_next_task () { const status = [q.cpu, q.gpu].map(queue => { const status = queue.processor === 'gpu' ? state.current_gpu_task.status : state.current_cpu_task.status console.log(queue.is_active(), status) if (queue.is_active() && status === 'IDLE') { console.log(queue.processor, "is free") const task = queue.get_next_task() return run_task(task, false, true) } else { console.log(queue.processor, "is busy") return { processor: queue.processor, status: 'busy' } } }) return status } export function stop_task (task, sigkill) { if (!task) return { error: 'no such task' } if (task === 'cpu' || (task.uuid && state.current_cpu_task.task.uuid === task.uuid)) { console.log('stop cpu task') terminate(state.current_cpu_task, sigkill) return { status: 'ok' } } else if (task === 'gpu' || (task.uuid && state.current_gpu_task.task.uuid === task.uuid)) { console.log('stop gpu task') terminate(state.current_gpu_task, sigkill) return { status: 'ok' } } console.log('no task to kill') return { error: 'no such task' } } export function terminate (processor, sigkill=false) { if (!processor || !processor.subprocess) { console.log('nothing running on', processor) return } console.log('kill pid', processor.subprocess.pid) processor.task.cancelled = true if (sigkill) { kill(processor.subprocess.pid, 'SIGKILL', err => { if (err) { console.error(err) } else { console.log('process killed via SIGKILL -', processor.subprocess) processor.subprocess = null } }) } else { kill(processor.subprocess.pid, 'SIGTERM', err => { if (err) { console.log(err) } else { console.log('process killed via SIGTERM -', processor.subprocess) processor.subprocess = null } }) } }