import { execFile, spawn } from 'child_process' import interpreters from './interpreters' import modules from './modules' import kill from 'tree-kill' import { remote } from './remote' import { set_connected } from './rpc' import uuidv1 from 'uuid/v1' import * as fs from 'fs' import * as path from 'path' import readdir from 'fs-readdir-promise' import * as q from './queue' const MAX_TRANSFER_SIZE = 1024 * 1024 * 2.5 const idle_state = { status: 'IDLE', task: {} } export const state = { current_cpu_task: idle_state, current_gpu_task: idle_state, } export function get_current_cpu_task(){ return state.current_cpu_task } export function get_current_gpu_task(){ return state.current_gpu_task } export function get_current_task(processor) { if (processor === 'cpu') { return state.current_cpu_task } else { return state.current_gpu_task } } function serialize_task(t){ if (!t || t.status === 'IDLE') { return { status: 'IDLE', } } return { status: 'RUNNING', task: t.task, uuid: t.task.uuid, pid: t.subprocess.pid, } } function clear_task(is_gpu, task){ if (is_gpu) { if (state.current_gpu_task.task && state.current_gpu_task.task.uuid === task.uuid) { state.current_gpu_task.processor = null state.current_gpu_task = idle_state } } else { if (state.current_cpu_task.task && state.current_cpu_task.task.uuid === task.uuid) { state.current_gpu_task.processor = null state.current_cpu_task = idle_state } } } function sanitize_path(f){ return f.replace(/^\//,'').replace(/\.\./, '') } export function upload_file(task, cb) { const module = modules[task.module] const filepath = path.join(module.cwd, sanitize_path(task.path), sanitize_path(task.filename)) const params = [ '-F', 'module=' + task.module, '-F', 'activity=' + task.activity, '-F', 'generated=' + (String(task.generated) === 'true'), '-F', 'processed=' + (String(task.processed) === 'true'), '-F', "file=@" + filepath, process.env.API_REMOTE + '/api/folder/' + task.folder_id + '/upload/', ] console.log(params) execFile('curl', params, cb) // curl \ // -F "module=samplernn" \ // -F "activity=train" \ // -F "file=@woods1.jpg" \ // localhost:7013/api/folder/1/upload/ } export function status () { return { cpu: serialize_task(state.current_cpu_task), gpu: serialize_task(state.current_gpu_task), } } export function build_params(module, activity, task) { const interpreter = interpreters[activity.type] let opt_params, activity_params; if (typeof activity.params === 'function') { opt_params = activity.params(task) if (opt_params === 'CANCEL') { return { cancelled: true }; } activity_params = [] } else { const opt = task.opt || {} opt_params = Object.keys(opt).map(key => { const flag = '--' + key.replace(/-/g, '_') const value = opt[key] if (value === 'true') { return [flag] } return [flag, value] }).reduce((acc, cur) => acc.concat(cur), []) activity_params = activity.params } const params = (interpreter.params || []) .concat([ activity.script ]) .concat(activity_params) .concat(opt_params) return { activity, interpreter, params } } export function run_system_command(opt, cb) { console.log('running system command:', opt.cmd) switch(opt.cmd) { case 'nvidia-smi': case 'uptime': case 'w': execFile(opt.cmd, [], cb) break case 'ps': execFile('ps', ['au'], cb) break case 'df': execFile('df', ['-h'], cb) break case 'ls': list_directory(opt, cb) break case 'du': disk_usage(opt, cb) break case 'list_sequences': list_sequences(opt, cb) break case 'dir_to_video': dir_to_video(opt, cb) break default: cb({ error: 'no such command' }) break } } export function module_dir(opt, dir){ if (!opt.module || ! modules[opt.module]) { return null } const module = modules[opt.module] if (!module) { return null } return path.join(module.cwd, dir.replace(/\.\.?\//g, '')) } export function read_file(opt, cb) { const fn = module_dir(opt, opt.fn) if (!fn) return cb([]) stat_promise(fn).then(stat => { if (stat.size > MAX_TRANSFER_SIZE) { return cb({ error: 'file too large'}) } fs.readFile(fn, (err, buf) => cb({ error: err, name: opt.fn, path: fn, date: stat.ctime, size: stat.size, buf })) }).catch(() => cb({ error: 'error reading file' })) } export function list_directory(opt, cb) { const dir = module_dir(opt, opt.dir) if (!dir) return cb([]) fs.readdir(dir, (err, files) => { const statPromises = (files || []).filter(f => f[0] !== '.').map(f => { return new Promise((resolve, reject) => { const full_path = path.join(dir, f) fs.stat(full_path, (err, stat={}) => { resolve({ name: f, date: stat.ctime, size: stat.size, dir: stat.isDirectory ? stat.isDirectory() : false, }) }) }) }) Promise.all(statPromises).then(stats => { cb(stats, dir) }).catch(error => { cb(error) }) }) } export function count_directory(opt, cb) { const dir = module_dir(opt, opt.dir) if (!dir) return cb([]) fs.readdir(dir, (err, files) => { err ? cb(err) : cb(files.length) }) } // list the contents of a directory of sequences export function list_sequences(opt, cb) { list_directory(opt, (files, root_dir) => { // console.log(files, root_dir) const sequencePromises = files.filter(d => !!d.dir).map(f => { return list_sequence(opt, f, root_dir) }) Promise.all(sequencePromises).then(cb).catch(error => { console.error(error) cb([]) }) }) } export function list_sequence(opt, f, root_dir) { return new Promise( (resolve, reject) => { let sequence = { name: f.name, date: f.date, size: f.size, frame: null, count: 0, } readdir(path.join(root_dir, f.name)).then(files => { if (! files.length) { return resolve(sequence) } const middle_file = files[Math.floor(files.length/2)] if (! middle_file) return resolve(sequence) sequence.frame = { prefix: middle_file.split('_')[0], } sequence.count = files.length // console.log(sequence.count) return stat_promise(path.join(root_dir, f.name, middle_file)) }).then(stat => { sequence.frame.date = stat.date sequence.frame.size = stat.size resolve(sequence) }).catch(err => reject(err)) }) } export function stat_promise(fn) { return new Promise((resolve, reject) => { fs.stat(fn, (err, stat={}) => { if (err) reject(err) resolve(stat) }) }) } export function dir_to_video(opt, db) { const dir = module_dir(opt, opt.dir) if (!dir) return cb([]) // input: the path (in results/) you want as a video // output: the path (in renders/) that contains the video // run the dir to video script with CWD as the directory and first input as ../renders plus the directory name // list the file in renders... execFile('./bin/dir_to_video.pl', params, { cwd: module.cwd, }, cb) } export function disk_usage(opt, cb) { const dir = module_dir(opt, opt.dir) if (!dir) return cb([]) execFile('du', ['-d', 1, dir], cb) } export function run_script(task, cb) { if (!task.module || ! modules[task.module]) { cb("") } const module = modules[task.module] const activity = module.activities[task.activity] const { interpreter, params, cancelled } = build_params(module, activity, task) if (cancelled) return { type: 'error', error: "Task builder cancelled process" } if (! interpreter) return { type: 'error', error: "No such interpreter: " + activity.interpreter } if (! activity.isScript) return { type: 'error', error: "Not a script: " + task.module } console.log('running task', task.activity) console.log(module.cwd) console.log(interpreter.cmd, params) execFile(interpreter.cmd, params, { cwd: module.cwd, }, cb) } export function get_processor(task){ if (! task) return null const module = modules[task.module] if (! module) return null const activity = module.activities[task.activity] if (! activity) return null const interpreter = interpreters[activity.type] return interpreter.gpu ? 'gpu' : 'cpu' } export function get_queue(task){ const processor = get_processor(task) if (!processor) return null return (processor === 'cpu') ? q.cpu : q.gpu } export function run_task(task, preempt=false, watch=false){ if (! task) return null const module = modules[task.module] if (! module) return { type: 'error', error: "No such module: " + task.module } const activity = task.activity_object || module.activities[task.activity] delete task.activity_object if (! activity) return { type: 'error', error: 'No such activity in module: ' + task.module + ' ' + task.activity } return run_task_with_activity(task, module, activity, preempt, watch) } export function run_task_with_activity(task, module, activity, preempt=false, watch=false) { const { interpreter, params, cancelled } = build_params(module, activity, task) if (cancelled) return finish({ type: 'task_error', task, err: "Task generator cancelled task", }) if (! interpreter) return { type: 'error', error: "No such interpreter: " + activity.interpreter } if (interpreter.gpu && state.current_gpu_task.status !== 'IDLE') { if (preempt) { console.log('preempting currently running GPU task') terminate(state.current_gpu_task) } else { console.log('already running GPU task :(', state.current_gpu_task.pid) return { type: 'error', busy: true, preempt: false, error: 'task already running on gpu' } } } else if (!interpreter.gpu && state.current_cpu_task.status !== 'IDLE') { if (preempt) { console.log('preempting currently running CPU task') terminate(state.current_cpu_task) } else { console.log('already running CPU task :(') return { type: 'error', busy: true, preempt: false, error: 'task already running on cpu' } } } console.log('running task', task.activity) console.log(module.cwd) console.log(interpreter.cmd, params) task.uuid = task.uuid || uuidv1() task.started = new Date().toString() task.processing = true const subprocess = spawn(interpreter.cmd, params, { cwd: module.cwd, env: module.env || {}, }) if (interpreter.gpu) { state.current_gpu_task = { subprocess, task, status: 'RUNNING' } } else { state.current_cpu_task = { subprocess, task, status: 'RUNNING' } } const processor = task.processor = interpreter.gpu ? 'gpu' : 'cpu' remote.emit('task_res', { type: 'task_begin', task }) watch && console.log("watching stdout..") let count = 0 subprocess.stdout.on('data', buf => { const data = buf.toString('utf8') watch && remote.emit('task_res', { type: 'stdout', processor, data }) if (activity.listen) { const res = activity.listen(task, data, count++) if (res) remote.emit('task_res', res) } }) subprocess.stderr.on('data', buf => { const data = buf.toString('utf8') watch && remote.emit('task_res', { type: 'stderr', processor, data }) }) let finished = false subprocess.on('error', (err) => { if (finished) return console.log('already finished') finished = true task.processing = false task.completed = true task.success = false console.log('>>> task error', subprocess.exitCode, err) finish({ type: 'task_error', task, err, }) }) subprocess.on('close', () => { if (finished) return console.log('already finished') finished = true console.log('>>> task ended', subprocess.exitCode || '') task.processing = false task.completed = true task.success = true finish({ type: 'task_finish', task, }) }) function finish (task_res) { remote.emit('task_res', task_res) interpreter && clear_task(interpreter.gpu, task_res.task) set_connected(false) // remove task from queue..? // queue.remove_task(task) console.log('success?', task.success, activity) if (task.success && activity.after) { if (activity.after in modules[task.module].activities) { const after_activity = modules[task.module].activities[activity.after] const task_result = run_task_with_activity(task, module, after_activity, preempt, watch) if (task_result.busy) { task.activity_object = after_activity const interpreter = interpreters[activity.type] if (interpreter.gpu) { q.gpu.add_next_task(task) } else { q.cpu.add_next_task(task) } } return task_result } else { console.warn('no such after activity:', task.module, activity.after) } } return run_next_task() } return task } export function start_queue () { console.log('starting queue!') const res = run_next_task() console.log(res) } export function run_next_task () { const status = [q.cpu, q.gpu].map(queue => { const status = queue.processor === 'gpu' ? state.current_gpu_task.status : state.current_cpu_task.status console.log(queue.is_active(), status) if (queue.is_active() && status === 'IDLE') { console.log(queue.processor, "is free") const task = queue.get_next_task() return run_task(task, false, true) } else { console.log(queue.processor, "is busy") return { processor: queue.processor, status: 'busy' } } }) return status } export function stop_task (task, sigkill) { if (!task) return { error: 'no such task' } if (task === 'cpu' || (task.uuid && state.current_cpu_task.task.uuid === task.uuid)) { console.log('stop cpu task') terminate(state.current_cpu_task, sigkill) return { status: 'ok' } } else if (task === 'gpu' || (task.uuid && state.current_gpu_task.task.uuid === task.uuid)) { console.log('stop gpu task') terminate(state.current_gpu_task, sigkill) return { status: 'ok' } } console.log('no task to kill') return { error: 'no such task' } } export function terminate (processor, sigkill=false) { if (!processor || !processor.subprocess) { console.log('nothing running on', processor) return } console.log('kill pid', processor.subprocess.pid) processor.task.cancelled = true if (sigkill) { kill(processor.subprocess.pid, 'SIGKILL', err => { if (err) { console.error(err) } else { console.log('process killed via SIGKILL -', processor.subprocess) processor.subprocess = null } }) } else { kill(processor.subprocess.pid, 'SIGTERM', err => { if (err) { console.log(err) } else { console.log('process killed via SIGTERM -', processor.subprocess) processor.subprocess = null } }) } }