diff options
Diffstat (limited to 'app')
| -rw-r--r-- | app/client/socket/socket.system.js | 22 | ||||
| -rw-r--r-- | app/client/socket/socket.task.js | 7 | ||||
| -rw-r--r-- | app/client/system/system.actions.js | 2 | ||||
| -rw-r--r-- | app/client/system/system.component.js | 27 | ||||
| -rw-r--r-- | app/client/system/system.reducer.js | 30 | ||||
| -rw-r--r-- | app/relay/interpreters.js | 7 | ||||
| -rw-r--r-- | app/relay/modules/test.js | 2 | ||||
| -rw-r--r-- | app/relay/remote.js | 2 | ||||
| -rw-r--r-- | app/relay/runner.js | 79 | ||||
| -rw-r--r-- | app/server/index.js | 12 |
10 files changed, 148 insertions, 42 deletions
diff --git a/app/client/socket/socket.system.js b/app/client/socket/socket.system.js index 0cdc625..38140c3 100644 --- a/app/client/socket/socket.system.js +++ b/app/client/socket/socket.system.js @@ -7,15 +7,27 @@ socket.on('system_res', (data) => { console.log('system response', data) switch (data.type) { case 'relay_connected': - return dispatch({ type: types.system.relay_connected }) + return dispatch({ + type: types.system.relay_connected + }) case 'relay_disconnected': - return dispatch({ type: types.system.relay_disconnected }) + return dispatch({ + type: types.system.relay_disconnected + }) case 'rpc_connected': - return dispatch({ type: types.system.rpc_connected, runner: data.runner }) + return dispatch({ + type: types.system.rpc_connected, + runner: data.runner, + }) case 'rpc_disconnected': - return dispatch({ type: types.system.rpc_disconnected }) + return dispatch({ + type: types.system.rpc_disconnected + }) case 'relay_status': - return dispatch({ type: data.rpc_connected ? types.system.rpc_connected : types.system.rpc_disconnected, runner: data.runner }) + return dispatch({ + type: data.rpc_connected ? types.system.rpc_connected : types.system.rpc_disconnected, + runner: data.runner, + }) case 'command_output': return dispatch({ type: types.system.command_output, diff --git a/app/client/socket/socket.task.js b/app/client/socket/socket.task.js index 0c8429b..ba074c0 100644 --- a/app/client/socket/socket.task.js +++ b/app/client/socket/socket.task.js @@ -3,6 +3,8 @@ import types from '../types' import { socket } from './socket.connection' +let finishTimeout; + socket.on('task_res', (data) => { console.log('system response', data) switch (data.type) { @@ -10,12 +12,13 @@ socket.on('task_res', (data) => { // return dispatch({ type: types.system.rpc_connected, runner: data.runner }) break case 'task_begin': - return dispatch({ type: types.task.task_begin, data: data.data }) + clearTimeout(finishTimeout) + return dispatch({ type: types.task.task_begin, task: data.task }) break case 'stop': break case 'task_finish': - return dispatch({ type: types.task.task_finish, data: data.data }) + return finishTimeout = setTimeout(() => dispatch({ type: types.task.task_finish, task: data.task }), 100) break case 'kill': break diff --git a/app/client/system/system.actions.js b/app/client/system/system.actions.js index ff32fd6..519e140 100644 --- a/app/client/system/system.actions.js +++ b/app/client/system/system.actions.js @@ -1,4 +1,4 @@ -import * as socket from '../socket' +import socket from '../socket' import types from '../types' export const run = (cmd) => { diff --git a/app/client/system/system.component.js b/app/client/system/system.component.js index 07428e5..680d1c0 100644 --- a/app/client/system/system.component.js +++ b/app/client/system/system.component.js @@ -34,8 +34,14 @@ class System extends Component { constructor(props){ super() } + componentDidUpdate(){ + console.log(this._screen.scrollHeight, this._screen.scrollTop, this._screen.offsetHeight) + if (this._screen.scrollHeight > this._screen.scrollTop - this._screen.offsetHeight + 100) { + this._screen.scrollTop = this._screen.scrollHeight + } + } render(){ - const { site, server, relay, rpc, actions } = this.props + const { site, server, relay, runner, rpc, actions } = this.props return ( <div className='system'> <div className='heading'> @@ -51,9 +57,8 @@ class System extends Component { } <Param title='Relay'>{relay.status}</Param> <Param title='RPC'>{rpc.status}</Param> - <Param title='CPU'>{rpc.cpu_cmd}</Param> - <Param title='GPU'>{rpc.gpu_cmd}</Param> - <Param title='Current Task'>train samplernn</Param> + <Param title='CPU'>{this.renderStatus(runner.cpu)}</Param> + <Param title='GPU'>{this.renderStatus(runner.gpu)}</Param> </Group> <Group title="Diagnostics"> <Param title='Check GPU'> @@ -89,6 +94,16 @@ class System extends Component { </div> ) } + renderStatus(processor){ + if (!processor) { + return 'unknown' + } + if (processor.status === 'IDLE') { + return 'idle' + } + const task = processor.task + return task.activity + ' ' + task.module + } renderCommandOutput(){ const { cmd, stdout, stderr } = this.props let output @@ -108,14 +123,14 @@ class System extends Component { } else { output = stdout - if (cmd.stderr) { + if (stderr.length) { output += '\n\n_________________________________\n\n' output += stderr } } return ( <div> - <div className='screen'>{output}</div> + <div ref={(ref) => this._screen = ref} className='screen'>{output}</div> </div> ) } diff --git a/app/client/system/system.reducer.js b/app/client/system/system.reducer.js index a7ae8d1..c29572e 100644 --- a/app/client/system/system.reducer.js +++ b/app/client/system/system.reducer.js @@ -22,8 +22,6 @@ const systemInitialState = { rpc: { connected: false, status: "unknown", - cpu_cmd: "unknown", - gpu_cmd: "unknown", error: null, }, cmd: { @@ -34,6 +32,10 @@ const systemInitialState = { stdout: "", stderr: "", }, + runner: { + cpu: { status: 'IDLE', task: {} }, + gpu: { status: 'IDLE', task: {} }, + }, stdout: "", stderr: "", } @@ -109,7 +111,8 @@ const systemReducer = (state = systemInitialState, action) => { status: 'connected', connected: true, error: null, - } + }, + runner: action.runner, } case types.system.rpc_connected: return { @@ -147,9 +150,30 @@ const systemReducer = (state = systemInitialState, action) => { case types.task.task_begin: return { ...state, + runner: { + ...state.runner, + [action.task.processor]: { status: 'RUNNING', task: action.task }, + }, + cmd: { + ...state.cmd, + loaded: false, + stdout: "", + stderr: "", + }, stdout: "", stderr: "", } + case types.task.task_finish: + if (state.runner[action.task.processor].task.uuid !== action.task.uuid) { + return state + } + return { + ...state, + runner: { + ...state.runner, + [action.task.processor]: { status: 'IDLE', task: {} }, + }, + } case types.system.stdout: return { ...state, diff --git a/app/relay/interpreters.js b/app/relay/interpreters.js index 63a2c25..90dfcaa 100644 --- a/app/relay/interpreters.js +++ b/app/relay/interpreters.js @@ -1,3 +1,5 @@ +require('dotenv').config() + export default { bash: { cmd: process.env.BASH_BIN || '/bin/bash', @@ -8,15 +10,18 @@ export default { gpu: false, }, python: { - cmd: process.env.PYTHON_BIN || '/usr/bin/python3', + cmd: process.env.PYTHON_BIN || '/usr/bin/python', + params: ['-u'], gpu: false, }, pytorch: { cmd: process.env.PYTORCH_BIN, + params: ['-u'], gpu: true, }, tensorflow: { cmd: process.env.TENSORFLOW_BIN, + params: ['-u'], gpu: true, }, }
\ No newline at end of file diff --git a/app/relay/modules/test.js b/app/relay/modules/test.js index 5619159..1d7042e 100644 --- a/app/relay/modules/test.js +++ b/app/relay/modules/test.js @@ -14,7 +14,7 @@ const gpu = { params: '--test', } const live = { - type: 'pytorch', + type: 'python', script: 'test.py', } diff --git a/app/relay/remote.js b/app/relay/remote.js index b8cfa15..5979c68 100644 --- a/app/relay/remote.js +++ b/app/relay/remote.js @@ -39,7 +39,7 @@ remote.on('cmd', (data) => { remote.on('task', (data) => { let response; - console.log(data) + // console.log(data) console.log('task', data.type) switch(data.type) { case 'start': diff --git a/app/relay/runner.js b/app/relay/runner.js index 039bfe7..699a343 100644 --- a/app/relay/runner.js +++ b/app/relay/runner.js @@ -1,15 +1,15 @@ -// monitors which process is currently running -// kills it if need be.... murder - import { execFile, spawn } from 'child_process' import interpreters from './interpreters' import modules from './modules' import kill from 'tree-kill' import { remote } from './remote' +import uuidv1 from 'uuid/v1' + +const idle_state = { status: 'IDLE', task: {} } export const state = { - current_cpu_task: null, - current_gpu_task: null, + current_cpu_task: idle_state, + current_gpu_task: idle_state, } export function get_current_cpu_task(){ @@ -28,8 +28,34 @@ export function get_current_task(processor) { } } +function serialize_task(t){ + if (!t || t.status === 'IDLE') { + return { + status: 'IDLE', + } + } + return { + status: 'RUNNING', + task: t.task, + pid: t.subprocess.pid, + } +} +function clear_task(is_gpu, task){ + if (is_gpu) { + if (state.current_gpu_task.task && state.current_gpu_task.task.uuid === task.uuid) { + state.current_gpu_task = idle_state + } + } else { + if (state.current_cpu_task.task && state.current_cpu_task.task.uuid === task.uuid) { + state.current_cpu_task = idle_state + } + } +} export function status () { - return state + return { + cpu: serialize_task(state.current_cpu_task), + gpu: serialize_task(state.current_gpu_task), + } } export function build_params(module, task) { @@ -50,7 +76,10 @@ export function build_params(module, task) { return [flag, value] }).reduce((acc, cur) => acc.concat(cur), []) } - const params = [ activity.script ].concat(activity.params || []).concat(opt_params) + const params = (interpreter.params || []) + .concat([ activity.script ]) + .concat(activity.params || []) + .concat(opt_params) return { activity, interpreter, @@ -82,18 +111,18 @@ export function run_task(task, preempt, watch){ const { activity, interpreter, params } = build_params(module, task) if (! interpreter) return { type: 'error', error: "No such interpreter: " + activity.interpreter } - if (interpreter.gpu && state.current_gpu_task) { + if (interpreter.gpu && state.current_gpu_task.status !== 'IDLE') { if (preempt) { console.log('preempting currently running GPU task') - kill_task(state.current_gpu_task) + kill_task(state.current_gpu_task.subprocess) } else { console.log('already running GPU task :(', state.current_gpu_task.pid) return { type: 'error', error: 'task already running on gpu' } } - } else if (!interpreter.gpu && state.current_cpu_task) { + } else if (!interpreter.gpu && state.current_cpu_task.status !== 'IDLE') { if (preempt) { console.log('preempting currently running CPU task') - kill_task(state.current_cpu_task) + kill_task(state.current_cpu_task.subprocess) } else { console.log('already running CPU task :(') return { type: 'error', error: 'task already running on cpu' } @@ -108,12 +137,20 @@ export function run_task(task, preempt, watch){ cwd: module.cwd, }) if (interpreter.gpu) { - state.current_gpu_task = subprocess + state.current_gpu_task = { + subprocess, task, status: 'RUNNING' + } } else { - state.current_cpu_task = subprocess + state.current_cpu_task = { + subprocess, task, status: 'RUNNING' + } } + + task.uuid = task.uuid || uuidv1() + task.processor = interpreter.gpu ? 'gpu' : 'cpu' remote.emit('task_res', { type: 'task_begin', task }) + if (watch) { console.log("watching stdout..") subprocess.stdout.on('data', data => { @@ -123,12 +160,22 @@ export function run_task(task, preempt, watch){ remote.emit('task_res', { type: 'stderr', data: data.toString('utf8') }) }) } - subprocess.on('error', (err) => { - console.log('task error', subprocess.pid, err) + + let finished = false + + subprocess.on('error', (err) => { + if (finished) return + finished = true + console.log('task error', subprocess.exitCode, err) + clear_task(interpreter.gpu, task) remote.emit('task_res', { type: 'task_error', task, err }) }) + subprocess.on('close', () => { - console.log('task ended', subprocess.pid) + if (finished) return + finished = true + console.log('task ended', subprocess.exitCode || '') + clear_task(interpreter.gpu, task) remote.emit('task_res', { type: 'task_finish', task }) }) } diff --git a/app/server/index.js b/app/server/index.js index b81ed57..775454f 100644 --- a/app/server/index.js +++ b/app/server/index.js @@ -45,17 +45,17 @@ function bind_relay(socket) { client.emit('system_res', { type: 'relay_connected' }) socket.on('res', data => { - console.log('Received response', data.cmd) + // console.log('Received response', data.cmd) client.emit('res', data) }) socket.on('status', data => { - console.log('Received status', data.key) + // console.log('Received status', data.key) client.emit('status', data) }) socket.on('system_res', data => { - console.log('System responded', data.type) + // console.log('System responded', data.type) client.emit('system_res', data) }) @@ -84,17 +84,17 @@ function bind_client(socket){ } socket.on('cmd', data => { - console.log('Client sent command', data) + // console.log('Client sent command', data) relay.emit('cmd', data) }) socket.on('system', data => { - console.log('Client sent system command', data) + // console.log('Client sent system command', data) relay.emit('system', data) }) socket.on('task', data => { - console.log('Client sent task command', data) + // console.log('Client sent task command', data) relay.emit('task', data) }) |
