import { execFile, spawn } from 'child_process' import interpreters from './interpreters' import modules from './modules' import kill from 'tree-kill' import { remote } from './remote' import { set_connected } from './rpc' import uuidv1 from 'uuid/v1' import * as fs from 'fs' import * as path from 'path' import * as queue from './queue' const idle_state = { status: 'IDLE', task: {} } export const state = { current_cpu_task: idle_state, current_gpu_task: idle_state, } export function get_current_cpu_task(){ return state.current_cpu_task } export function get_current_gpu_task(){ return state.current_gpu_task } export function get_current_task(processor) { if (processor === 'cpu') { return state.current_cpu_task } else { return state.current_gpu_task } } function serialize_task(t){ if (!t || t.status === 'IDLE') { return { status: 'IDLE', } } return { status: 'RUNNING', task: t.task, uuid: t.task.uuid, pid: t.subprocess.pid, } } function clear_task(is_gpu, task){ if (is_gpu) { if (state.current_gpu_task.task && state.current_gpu_task.task.uuid === task.uuid) { state.current_gpu_task = idle_state } } else { if (state.current_cpu_task.task && state.current_cpu_task.task.uuid === task.uuid) { state.current_cpu_task = idle_state } } } function sanitize_path(f){ return f.replace(/^\//,'').replace(/\.\./, '') } export function upload_file(task, cb) { const module = modules[task.module] const filepath = path.join(module.cwd, sanitize_path(task.path), sanitize_path(task.filename)) const params = [ '-F', 'module=' + task.module, '-F', 'activity=' + task.activity, '-F', 'generated=' + (String(task.generated) === 'true'), '-F', 'processed=' + (String(task.processed) === 'true'), '-F', "file=@" + filepath, process.env.API_REMOTE + '/api/folder/' + task.folder_id + '/upload/', ] console.log(params) execFile('curl', params, cb) // curl \ // -F "module=samplernn" \ // -F "activity=train" \ // -F "file=@woods1.jpg" \ // localhost:7013/api/folder/1/upload/ } export function status () { return { cpu: serialize_task(state.current_cpu_task), gpu: serialize_task(state.current_gpu_task), } } export function build_params(module, activity, task) { const interpreter = interpreters[activity.type] let opt_params, activity_params; if (typeof activity.params === 'function') { opt_params = activity.params(task) activity_params = [] } else { const opt = task.opt || {} opt_params = Object.keys(opt).map(key => { const flag = '--' + key.replace(/-/g, '_') const value = opt[key] if (value === 'true') { return [flag] } return [flag, value] }).reduce((acc, cur) => acc.concat(cur), []) activity_params = activity.params } const params = (interpreter.params || []) .concat([ activity.script ]) .concat(activity_params) .concat(opt_params) return { activity, interpreter, params } } export function run_system_command(cmd, cb) { console.log('running system command:', cmd.cmd) switch(cmd.cmd) { case 'nvidia-smi': case 'uptime': case 'w': execFile(cmd.cmd, [], cb) break case 'ps': execFile(cmd.cmd, ['au'], cb) break case 'df': execFile('df', ['-h'], cb) break case 'ls': list_directory(cmd, cb) break case 'du': disk_usage(cmd, cb) break default: cb({ error: 'no such command' }) break } } export function list_directory(opt, cb) { if (!opt.module || ! modules[opt.module]) { cb([]) } const module = modules[opt.module] const dir = path.join(module.cwd, opt.dir.replace(/\.\.?\//g, '')) fs.readdir(dir, (err, files) => { const statPromises = (files || []).filter(f => f[0] !== '.').map(f => { return new Promise((resolve, reject) => { const full_path = path.join(dir, f) fs.stat(full_path, (err, stat={}) => { resolve({ name: f, path: full_path, date: stat.ctime, size: stat.size, dir: stat.isDirectory(), }) }) }) }) Promise.all(statPromises).then(stats => { cb(stats) }).catch(error => { cb(error) }) }) } export function disk_usage(opt, cb) { if (!opt.module || ! modules[opt.module]) { cb([]) } const module = modules[opt.module] const dir = path.join(module.cwd, opt.dir.replace(/\.\.?\//g, '')) execFile('du', ['-d', 1, dir], cb) } export function run_script(task, cb) { if (!task.module || ! modules[task.module]) { cb("") } const module = modules[task.module] const activity = module.activities[task.activity] const { interpreter, params } = build_params(module, activity, task) if (! interpreter) return { type: 'error', error: "No such interpreter: " + activity.interpreter } if (! activity.isScript) return { type: 'error', error: "Not a script: " + task.module } console.log('running task', task.activity) console.log(module.cwd) console.log(interpreter.cmd, params) execFile(interpreter.cmd, params, { cwd: module.cwd, }, cb) } export function get_processor(task){ if (! task) return null const module = modules[task.module] if (! module) return null const activity = module.activities[task.activity] if (! activity) return null const interpreter = interpreters[activity.type] return interpreter.gpu ? 'gpu' : 'cpu' } export function run_task(task, preempt=false, watch=false){ if (! task) return null const module = modules[task.module] if (! module) return { type: 'error', error: "No such module: " + task.module } const activity = module.activities[task.activity] if (! activity) return { type: 'error', error: 'No such activity in module: ' + task.module + ' ' + task.activity } return run_task_with_activity(task, module, activity, preempt, watch) } export function run_task_with_activity(task, module, activity, preempt=false, watch=false) { const { interpreter, params } = build_params(module, activity, task) if (! interpreter) return { type: 'error', error: "No such interpreter: " + activity.interpreter } if (interpreter.gpu && state.current_gpu_task.status !== 'IDLE') { if (preempt) { console.log('preempting currently running GPU task') terminate(state.current_gpu_task) } else { console.log('already running GPU task :(', state.current_gpu_task.pid) return { type: 'error', preempt: false, error: 'task already running on gpu' } } } else if (!interpreter.gpu && state.current_cpu_task.status !== 'IDLE') { if (preempt) { console.log('preempting currently running CPU task') terminate(state.current_cpu_task) } else { console.log('already running CPU task :(') return { type: 'error', preempt: false, error: 'task already running on cpu' } } } console.log('running task', task.activity) console.log(module.cwd) console.log(interpreter.cmd, params) task.uuid = task.uuid || uuidv1() task.started = new Date().toString() task.processing = true const subprocess = spawn(interpreter.cmd, params, { cwd: module.cwd, }) if (interpreter.gpu) { state.current_gpu_task = { subprocess, task, status: 'RUNNING' } } else { state.current_cpu_task = { subprocess, task, status: 'RUNNING' } } const processor = task.processor = interpreter.gpu ? 'gpu' : 'cpu' remote.emit('task_res', { type: 'task_begin', task }) watch && console.log("watching stdout..") let count = 0 subprocess.stdout.on('data', buf => { const data = buf.toString('utf8') watch && remote.emit('task_res', { type: 'stdout', processor, data }) if (activity.listen) { const res = activity.listen(task, data, count++) if (res) remote.emit('task_res', res) } }) subprocess.stderr.on('data', buf => { const data = buf.toString('utf8') watch && remote.emit('task_res', { type: 'stderr', processor, data }) }) let finished = false subprocess.on('error', (err) => { if (finished) return finished = true task.processing = false task.completed = true task.success = false console.log('task error', subprocess.exitCode, err) finish({ type: 'task_error', task, err, }) }) subprocess.on('close', () => { if (finished) return finished = true console.log('task ended', subprocess.exitCode || '') set_connected(false) task.processing = false task.completed = true task.success = true finish({ type: 'task_finish', task, }) }) function finish(task_res){ remote.emit('task_res', task_res) clear_task(interpreter.gpu, task_res.task) set_connected(false) // remove task from queue // queue.remove_task(task) if (task.success && activity.after) { return run_task_with_activity(task, module, activity.after, preempt, watch) } return run_next_task() } return task } export function start_queue(){ run_next_task() } export function run_next_task(){ if (queue.is_active()) { console.log(queue.list_tasks()) const task = queue.get_next_task() return run_task(task, false, true) } } export function stop_task(task){ if (!task) return if (task === 'cpu' || state.current_cpu_task.task.uuid === task.uuid) { terminate(state.current_cpu_task) return { status: 'ok' } } else if (task === 'gpu' || state.current_gpu_task.task.uuid === task.uuid) { terminate(state.current_gpu_task) return { status: 'ok' } } return { error: 'no such task' } } export function terminate(processor){ if (!processor || !processor.subprocess) { return } console.log('kill pid', processor.subprocess.pid) processor.task.cancelled = true kill(processor.subprocess.pid) }