diff options
| author | Jules Laplace <julescarbon@gmail.com> | 2018-09-22 19:06:12 +0200 |
|---|---|---|
| committer | Jules Laplace <julescarbon@gmail.com> | 2018-09-22 19:06:12 +0200 |
| commit | bab7a5431b2255851034b19fb1e5cba641882131 (patch) | |
| tree | 663437f85cd30bcec42bb83b3bda7b78c4fa143d /app | |
| parent | 0abd557635d4fb3bde6cd4ab7ead659a9a85b3c8 (diff) | |
split out system from runner
Diffstat (limited to 'app')
| -rw-r--r-- | app/relay/remote.js | 15 | ||||
| -rw-r--r-- | app/relay/runner.js | 254 | ||||
| -rw-r--r-- | app/relay/system.js | 258 |
3 files changed, 267 insertions, 260 deletions
diff --git a/app/relay/remote.js b/app/relay/remote.js index a0639a5..7cb7455 100644 --- a/app/relay/remote.js +++ b/app/relay/remote.js @@ -4,6 +4,7 @@ import * as q from './queue' const io = require('socket.io-client') import * as runner from './runner' +import * as system from './system' export const remote = io.connect(process.env.SOCKETIO_REMOTE) export function emit (key, payload) { remote.emit(key, payload) } @@ -97,7 +98,7 @@ remote.on('system', (data) => { // console.log(data) switch(data.cmd) { case 'run_system_command': - runner.run_system_command(data.payload, (error, stdout, stderr) => { + system.run_system_command(data.payload, (error, stdout, stderr) => { remote.emit('system_res', { type: 'run_system_command', cmd: data.payload, @@ -107,7 +108,7 @@ remote.on('system', (data) => { }) break case 'list_directory': - runner.list_directory(data.payload, files => { + system.list_directory(data.payload, files => { remote.emit('system_res', { type: 'list_directory', dir: data.payload, @@ -117,7 +118,7 @@ remote.on('system', (data) => { }) break case 'count_directory': - runner.count_directory(data.payload, count => { + system.count_directory(data.payload, count => { remote.emit('system_res', { type: 'count_directory', dir: data.payload, @@ -127,7 +128,7 @@ remote.on('system', (data) => { }) break case 'list_sequences': - runner.list_sequences(data.payload, sequences => { + system.list_sequences(data.payload, sequences => { remote.emit('system_res', { type: 'list_sequences', dir: data.payload, @@ -137,7 +138,7 @@ remote.on('system', (data) => { }) break case 'upload_file': - runner.upload_file(data.payload, (error, stdout, stderr) => { + system.upload_file(data.payload, (error, stdout, stderr) => { remote.emit('system_res', { type: 'upload_file', query: data.payload, @@ -147,7 +148,7 @@ remote.on('system', (data) => { }) break case 'read_file': - runner.read_file(data.payload, (file) => { + system.read_file(data.payload, (file) => { remote.emit('system_res', { type: 'read_file', query: data.payload, @@ -165,7 +166,7 @@ remote.on('system', (data) => { }) break case 'run_script': - runner.run_script(data.payload, (error, stdout, stderr) => { + system.run_script(data.payload, (error, stdout, stderr) => { remote.emit('system_res', { type: 'run_script', cmd: data.payload, diff --git a/app/relay/runner.js b/app/relay/runner.js index 1e90fca..70988b0 100644 --- a/app/relay/runner.js +++ b/app/relay/runner.js @@ -1,22 +1,13 @@ -import { execFile, spawn } from 'child_process' +import { spawn } from 'child_process' import interpreters from './interpreters' import modules from './modules' import kill from 'tree-kill' import { remote } from './remote' import { set_connected } from './rpc' import uuidv1 from 'uuid/v1' -import * as fs from 'fs' -import * as path from 'path' -import readdir from 'fs-readdir-promise' -import sharp from 'sharp' import * as q from './queue' -const MEGABYTES = 1024 * 1024 -const MAX_TRANSFER_SIZE = 2.5 * MEGABYTES -const MAX_LOAD_SIZE = 108 * MEGABYTES -const THUMBNAIL_SIZE = 200 -const THUMBNAIL_QUALITY = 80 const idle_state = { status: 'IDLE', task: {} } export const state = { @@ -68,37 +59,6 @@ function clear_task(is_gpu, task){ } } -function sanitize_path(f){ - return f.replace(/^\//,'').replace(/\.\./, '') -} - -export function upload_file(task, cb) { - const module = modules[task.module] - const filepath = path.join(module.cwd, sanitize_path(task.path), sanitize_path(task.filename)) - const params = [ - '-F', 'module=' + task.module, - '-F', 'activity=' + task.activity, - '-F', 'generated=' + (String(task.generated) === 'true'), - '-F', 'processed=' + (String(task.processed) === 'true'), - '-F', "file=@" + filepath, - process.env.API_REMOTE + '/api/folder/' + task.folder_id + '/upload/', - ] - console.log(params) - execFile('curl', params, cb) - // curl \ - // -F "module=samplernn" \ - // -F "activity=train" \ - // -F "file=@woods1.jpg" \ - // localhost:7013/api/folder/1/upload/ -} - -export function status () { - return { - cpu: serialize_task(state.current_cpu_task), - gpu: serialize_task(state.current_gpu_task), - } -} - export function build_params(module, activity, task) { const interpreter = interpreters[activity.type] let opt_params, activity_params; @@ -132,218 +92,6 @@ export function build_params(module, activity, task) { } } -export function run_system_command(opt, cb) { - console.log('running system command:', opt.cmd) - switch(opt.cmd) { - case 'nvidia-smi': - case 'uptime': - case 'w': - execFile(opt.cmd, [], cb) - break - case 'ps': - execFile('ps', ['au'], cb) - break - case 'df': - execFile('df', ['-h'], cb) - break - case 'ls': - list_directory(opt, cb) - break - case 'du': - disk_usage(opt, cb) - break - case 'list_sequences': - list_sequences(opt, cb) - break - case 'dir_to_video': - dir_to_video(opt, cb) - break - default: - cb({ error: 'no such command' }) - break - } -} - -export function module_dir(opt, dir){ - if (!opt.module || ! modules[opt.module]) { - return null - } - const module = modules[opt.module] - if (!module) { - return null - } - return path.join(module.cwd, dir.replace(/\.\.?\//g, '')) -} - -export function read_file(opt, cb) { - const fn = module_dir(opt, opt.fn) - if (!fn) return cb([]) - stat_promise(fn).then(stat => { - if (stat.size > MAX_TRANSFER_SIZE) { - return cb({ error: 'file too large'}) - } - fs.readFile(fn, (err, buf) => cb({ - error: err, - name: opt.fn, - path: fn, - date: stat.ctime, - size: stat.size, - buf - })) - }).catch(() => cb({ error: 'error reading file' })) -} - -export function thumbnail(opt, cb) { - const fn = module_dir(opt, opt.fn) - if (!fn) return cb([]) - stat_promise(fn).then(stat => { - if (stat.size > MAX_LOAD_SIZE) { - return cb({ error: 'file too large'}) - } - sharp(fn) - .resize(opt.size || THUMBNAIL_SIZE) - .jpeg({ quality: opt.quality || THUMBNAIL_QUALITY }) - .toBuffer() - .then(buf => cb({ - error: err, - name: opt.fn, - path: fn, - date: stat.ctime, - size: stat.size, - buf - })) - .catch(err => cb({ - error: err, - name: opt.fn, - path: fn, - date: stat.ctime, - size: stat.size, - buf: null, - })) - }) -} - -export function list_directory(opt, cb) { - const dir = module_dir(opt, opt.dir) - if (!dir) return cb([]) - fs.readdir(dir, (err, files) => { - const statPromises = (files || []).filter(f => f[0] !== '.').map(f => { - return new Promise((resolve, reject) => { - const full_path = path.join(dir, f) - fs.stat(full_path, (err, stat={}) => { - resolve({ - name: f, - date: stat.ctime, - size: stat.size, - dir: stat.isDirectory ? stat.isDirectory() : false, - }) - }) - }) - }) - Promise.all(statPromises).then(stats => { - cb(stats, dir) - }).catch(error => { - cb(error) - }) - }) -} - -export function count_directory(opt, cb) { - const dir = module_dir(opt, opt.dir) - if (!dir) return cb([]) - fs.readdir(dir, (err, files) => { - err ? cb(err) : cb(files.length) - }) -} - -// list the contents of a directory of sequences -export function list_sequences(opt, cb) { - list_directory(opt, (files, root_dir) => { - // console.log(files, root_dir) - const sequencePromises = files.filter(d => !!d.dir).map(f => { - return list_sequence(opt, f, root_dir) - }) - Promise.all(sequencePromises).then(cb).catch(error => { - console.error(error) - cb([]) - }) - }) -} -export function list_sequence(opt, f, root_dir) { - return new Promise( (resolve, reject) => { - let sequence = { - name: f.name, - date: f.date, - size: f.size, - frame: null, - count: 0, - } - readdir(path.join(root_dir, f.name)).then(files => { - if (! files.length) { - return resolve(sequence) - } - const middle_file = files[Math.floor(files.length/2)] - if (! middle_file) return resolve(sequence) - sequence.frame = { - prefix: middle_file.split('_')[0], - } - sequence.count = files.length - // console.log(sequence.count) - return stat_promise(path.join(root_dir, f.name, middle_file)) - }).then(stat => { - sequence.frame.date = stat.date - sequence.frame.size = stat.size - resolve(sequence) - }).catch(err => reject(err)) - }) -} - -export function stat_promise(fn) { - return new Promise((resolve, reject) => { - fs.stat(fn, (err, stat={}) => { - if (err) reject(err) - resolve(stat) - }) - }) -} -export function dir_to_video(opt, db) { - const dir = module_dir(opt, opt.dir) - if (!dir) return cb([]) - // input: the path (in results/) you want as a video - // output: the path (in renders/) that contains the video - // run the dir to video script with CWD as the directory and first input as ../renders plus the directory name - // list the file in renders... - execFile('./bin/dir_to_video.pl', params, { - cwd: module.cwd, - }, cb) -} - - -export function disk_usage(opt, cb) { - const dir = module_dir(opt, opt.dir) - if (!dir) return cb([]) - execFile('du', ['-d', 1, dir], cb) -} - -export function run_script(task, cb) { - if (!task.module || ! modules[task.module]) { - cb("") - } - const module = modules[task.module] - const activity = module.activities[task.activity] - const { interpreter, params, cancelled } = build_params(module, activity, task) - if (cancelled) return { type: 'error', error: "Task builder cancelled process" } - if (! interpreter) return { type: 'error', error: "No such interpreter: " + activity.interpreter } - if (! activity.isScript) return { type: 'error', error: "Not a script: " + task.module } - - console.log('running task', task.activity) - console.log(module.cwd) - console.log(interpreter.cmd, params) - execFile(interpreter.cmd, params, { - cwd: module.cwd, - }, cb) -} - export function get_processor(task){ if (! task) return null const module = modules[task.module] diff --git a/app/relay/system.js b/app/relay/system.js new file mode 100644 index 0000000..90dbf66 --- /dev/null +++ b/app/relay/system.js @@ -0,0 +1,258 @@ +import { execFile } from 'child_process' +import * as path from 'path' +import * as fs from 'fs' +import readdir from 'fs-readdir-promise' +import sharp from 'sharp' + +import modules from './modules' +import * as runner from './runner' + +const MEGABYTES = 1024 * 1024 +const MAX_TRANSFER_SIZE = 2.5 * MEGABYTES +const MAX_LOAD_SIZE = 108 * MEGABYTES +const THUMBNAIL_SIZE = 200 +const THUMBNAIL_QUALITY = 80 + +function sanitize_path(f){ + return f.replace(/^\//,'').replace(/\.\./, '') +} + +export function upload_file(task, cb) { + const module = modules[task.module] + const filepath = path.join(module.cwd, sanitize_path(task.path), sanitize_path(task.filename)) + const params = [ + '-F', 'module=' + task.module, + '-F', 'activity=' + task.activity, + '-F', 'generated=' + (String(task.generated) === 'true'), + '-F', 'processed=' + (String(task.processed) === 'true'), + '-F', "file=@" + filepath, + process.env.API_REMOTE + '/api/folder/' + task.folder_id + '/upload/', + ] + console.log(params) + execFile('curl', params, cb) + // curl \ + // -F "module=samplernn" \ + // -F "activity=train" \ + // -F "file=@woods1.jpg" \ + // localhost:7013/api/folder/1/upload/ +} + +export function status () { + return { + cpu: serialize_task(state.current_cpu_task), + gpu: serialize_task(state.current_gpu_task), + } +} + +export function run_system_command(opt, cb) { + console.log('running system command:', opt.cmd) + switch(opt.cmd) { + case 'nvidia-smi': + case 'uptime': + case 'w': + execFile(opt.cmd, [], cb) + break + case 'ps': + execFile('ps', ['au'], cb) + break + case 'df': + execFile('df', ['-h'], cb) + break + case 'ls': + list_directory(opt, cb) + break + case 'du': + disk_usage(opt, cb) + break + case 'list_sequences': + list_sequences(opt, cb) + break + case 'dir_to_video': + dir_to_video(opt, cb) + break + default: + cb({ error: 'no such command' }) + break + } +} + +export function module_dir(opt, dir){ + if (!opt.module || ! modules[opt.module]) { + return null + } + const module = modules[opt.module] + if (!module) { + return null + } + return path.join(module.cwd, dir.replace(/\.\.?\//g, '')) +} + +export function read_file(opt, cb) { + const fn = module_dir(opt, opt.fn) + if (!fn) return cb([]) + stat_promise(fn).then(stat => { + if (stat.size > MAX_TRANSFER_SIZE) { + return cb({ error: 'file too large'}) + } + fs.readFile(fn, (err, buf) => cb({ + error: err, + name: opt.fn, + path: fn, + date: stat.ctime, + size: stat.size, + buf + })) + }).catch(() => cb({ error: 'error reading file' })) +} + +export function thumbnail(opt, cb) { + const fn = module_dir(opt, opt.fn) + if (!fn) return cb([]) + stat_promise(fn).then(stat => { + if (stat.size > MAX_LOAD_SIZE) { + return cb({ error: 'file too large'}) + } + sharp(fn) + .resize(opt.size || THUMBNAIL_SIZE) + .jpeg({ quality: opt.quality || THUMBNAIL_QUALITY }) + .toBuffer() + .then(buf => cb({ + error: err, + name: opt.fn, + path: fn, + date: stat.ctime, + size: stat.size, + buf + })) + .catch(err => cb({ + error: err, + name: opt.fn, + path: fn, + date: stat.ctime, + size: stat.size, + buf: null, + })) + }) +} + +export function list_directory(opt, cb) { + const dir = module_dir(opt, opt.dir) + if (!dir) return cb([]) + fs.readdir(dir, (err, files) => { + const statPromises = (files || []).filter(f => f[0] !== '.').map(f => { + return new Promise((resolve, reject) => { + const full_path = path.join(dir, f) + fs.stat(full_path, (err, stat={}) => { + resolve({ + name: f, + date: stat.ctime, + size: stat.size, + dir: stat.isDirectory ? stat.isDirectory() : false, + }) + }) + }) + }) + Promise.all(statPromises).then(stats => { + cb(stats, dir) + }).catch(error => { + cb(error) + }) + }) +} + +export function count_directory(opt, cb) { + const dir = module_dir(opt, opt.dir) + if (!dir) return cb([]) + fs.readdir(dir, (err, files) => { + err ? cb(err) : cb(files.length) + }) +} + +// list the contents of a directory of sequences +export function list_sequences(opt, cb) { + list_directory(opt, (files, root_dir) => { + // console.log(files, root_dir) + const sequencePromises = files.filter(d => !!d.dir).map(f => { + return list_sequence(opt, f, root_dir) + }) + Promise.all(sequencePromises).then(cb).catch(error => { + console.error(error) + cb([]) + }) + }) +} +export function list_sequence(opt, f, root_dir) { + return new Promise( (resolve, reject) => { + let sequence = { + name: f.name, + date: f.date, + size: f.size, + frame: null, + count: 0, + } + readdir(path.join(root_dir, f.name)).then(files => { + if (! files.length) { + return resolve(sequence) + } + const middle_file = files[Math.floor(files.length/2)] + if (! middle_file) return resolve(sequence) + sequence.frame = { + prefix: middle_file.split('_')[0], + } + sequence.count = files.length + // console.log(sequence.count) + return stat_promise(path.join(root_dir, f.name, middle_file)) + }).then(stat => { + sequence.frame.date = stat.date + sequence.frame.size = stat.size + resolve(sequence) + }).catch(err => reject(err)) + }) +} + +export function stat_promise(fn) { + return new Promise((resolve, reject) => { + fs.stat(fn, (err, stat={}) => { + if (err) reject(err) + resolve(stat) + }) + }) +} +export function dir_to_video(opt, db) { + const dir = module_dir(opt, opt.dir) + if (!dir) return cb([]) + // input: the path (in results/) you want as a video + // output: the path (in renders/) that contains the video + // run the dir to video script with CWD as the directory and first input as ../renders plus the directory name + // list the file in renders... + execFile('./bin/dir_to_video.pl', params, { + cwd: module.cwd, + }, cb) +} + + +export function disk_usage(opt, cb) { + const dir = module_dir(opt, opt.dir) + if (!dir) return cb([]) + execFile('du', ['-d', 1, dir], cb) +} + +export function run_script(task, cb) { + if (!task.module || ! modules[task.module]) { + cb("") + } + const module = modules[task.module] + const activity = module.activities[task.activity] + const { interpreter, params, cancelled } = runner.build_params(module, activity, task) + if (cancelled) return { type: 'error', error: "Task builder cancelled process" } + if (! interpreter) return { type: 'error', error: "No such interpreter: " + activity.interpreter } + if (! activity.isScript) return { type: 'error', error: "Not a script: " + task.module } + + console.log('running task', task.activity) + console.log(module.cwd) + console.log(interpreter.cmd, params) + execFile(interpreter.cmd, params, { + cwd: module.cwd, + }, cb) +} + |
