summaryrefslogtreecommitdiff
path: root/app
diff options
context:
space:
mode:
Diffstat (limited to 'app')
-rw-r--r--app/client/socket/socket.system.js22
-rw-r--r--app/client/socket/socket.task.js7
-rw-r--r--app/client/system/system.actions.js2
-rw-r--r--app/client/system/system.component.js27
-rw-r--r--app/client/system/system.reducer.js30
-rw-r--r--app/relay/interpreters.js7
-rw-r--r--app/relay/modules/test.js2
-rw-r--r--app/relay/remote.js2
-rw-r--r--app/relay/runner.js79
-rw-r--r--app/server/index.js12
10 files changed, 148 insertions, 42 deletions
diff --git a/app/client/socket/socket.system.js b/app/client/socket/socket.system.js
index 0cdc625..38140c3 100644
--- a/app/client/socket/socket.system.js
+++ b/app/client/socket/socket.system.js
@@ -7,15 +7,27 @@ socket.on('system_res', (data) => {
console.log('system response', data)
switch (data.type) {
case 'relay_connected':
- return dispatch({ type: types.system.relay_connected })
+ return dispatch({
+ type: types.system.relay_connected
+ })
case 'relay_disconnected':
- return dispatch({ type: types.system.relay_disconnected })
+ return dispatch({
+ type: types.system.relay_disconnected
+ })
case 'rpc_connected':
- return dispatch({ type: types.system.rpc_connected, runner: data.runner })
+ return dispatch({
+ type: types.system.rpc_connected,
+ runner: data.runner,
+ })
case 'rpc_disconnected':
- return dispatch({ type: types.system.rpc_disconnected })
+ return dispatch({
+ type: types.system.rpc_disconnected
+ })
case 'relay_status':
- return dispatch({ type: data.rpc_connected ? types.system.rpc_connected : types.system.rpc_disconnected, runner: data.runner })
+ return dispatch({
+ type: data.rpc_connected ? types.system.rpc_connected : types.system.rpc_disconnected,
+ runner: data.runner,
+ })
case 'command_output':
return dispatch({
type: types.system.command_output,
diff --git a/app/client/socket/socket.task.js b/app/client/socket/socket.task.js
index 0c8429b..ba074c0 100644
--- a/app/client/socket/socket.task.js
+++ b/app/client/socket/socket.task.js
@@ -3,6 +3,8 @@ import types from '../types'
import { socket } from './socket.connection'
+let finishTimeout;
+
socket.on('task_res', (data) => {
console.log('system response', data)
switch (data.type) {
@@ -10,12 +12,13 @@ socket.on('task_res', (data) => {
// return dispatch({ type: types.system.rpc_connected, runner: data.runner })
break
case 'task_begin':
- return dispatch({ type: types.task.task_begin, data: data.data })
+ clearTimeout(finishTimeout)
+ return dispatch({ type: types.task.task_begin, task: data.task })
break
case 'stop':
break
case 'task_finish':
- return dispatch({ type: types.task.task_finish, data: data.data })
+ return finishTimeout = setTimeout(() => dispatch({ type: types.task.task_finish, task: data.task }), 100)
break
case 'kill':
break
diff --git a/app/client/system/system.actions.js b/app/client/system/system.actions.js
index ff32fd6..519e140 100644
--- a/app/client/system/system.actions.js
+++ b/app/client/system/system.actions.js
@@ -1,4 +1,4 @@
-import * as socket from '../socket'
+import socket from '../socket'
import types from '../types'
export const run = (cmd) => {
diff --git a/app/client/system/system.component.js b/app/client/system/system.component.js
index 07428e5..680d1c0 100644
--- a/app/client/system/system.component.js
+++ b/app/client/system/system.component.js
@@ -34,8 +34,14 @@ class System extends Component {
constructor(props){
super()
}
+ componentDidUpdate(){
+ console.log(this._screen.scrollHeight, this._screen.scrollTop, this._screen.offsetHeight)
+ if (this._screen.scrollHeight > this._screen.scrollTop - this._screen.offsetHeight + 100) {
+ this._screen.scrollTop = this._screen.scrollHeight
+ }
+ }
render(){
- const { site, server, relay, rpc, actions } = this.props
+ const { site, server, relay, runner, rpc, actions } = this.props
return (
<div className='system'>
<div className='heading'>
@@ -51,9 +57,8 @@ class System extends Component {
}
<Param title='Relay'>{relay.status}</Param>
<Param title='RPC'>{rpc.status}</Param>
- <Param title='CPU'>{rpc.cpu_cmd}</Param>
- <Param title='GPU'>{rpc.gpu_cmd}</Param>
- <Param title='Current Task'>train samplernn</Param>
+ <Param title='CPU'>{this.renderStatus(runner.cpu)}</Param>
+ <Param title='GPU'>{this.renderStatus(runner.gpu)}</Param>
</Group>
<Group title="Diagnostics">
<Param title='Check GPU'>
@@ -89,6 +94,16 @@ class System extends Component {
</div>
)
}
+ renderStatus(processor){
+ if (!processor) {
+ return 'unknown'
+ }
+ if (processor.status === 'IDLE') {
+ return 'idle'
+ }
+ const task = processor.task
+ return task.activity + ' ' + task.module
+ }
renderCommandOutput(){
const { cmd, stdout, stderr } = this.props
let output
@@ -108,14 +123,14 @@ class System extends Component {
}
else {
output = stdout
- if (cmd.stderr) {
+ if (stderr.length) {
output += '\n\n_________________________________\n\n'
output += stderr
}
}
return (
<div>
- <div className='screen'>{output}</div>
+ <div ref={(ref) => this._screen = ref} className='screen'>{output}</div>
</div>
)
}
diff --git a/app/client/system/system.reducer.js b/app/client/system/system.reducer.js
index a7ae8d1..c29572e 100644
--- a/app/client/system/system.reducer.js
+++ b/app/client/system/system.reducer.js
@@ -22,8 +22,6 @@ const systemInitialState = {
rpc: {
connected: false,
status: "unknown",
- cpu_cmd: "unknown",
- gpu_cmd: "unknown",
error: null,
},
cmd: {
@@ -34,6 +32,10 @@ const systemInitialState = {
stdout: "",
stderr: "",
},
+ runner: {
+ cpu: { status: 'IDLE', task: {} },
+ gpu: { status: 'IDLE', task: {} },
+ },
stdout: "",
stderr: "",
}
@@ -109,7 +111,8 @@ const systemReducer = (state = systemInitialState, action) => {
status: 'connected',
connected: true,
error: null,
- }
+ },
+ runner: action.runner,
}
case types.system.rpc_connected:
return {
@@ -147,9 +150,30 @@ const systemReducer = (state = systemInitialState, action) => {
case types.task.task_begin:
return {
...state,
+ runner: {
+ ...state.runner,
+ [action.task.processor]: { status: 'RUNNING', task: action.task },
+ },
+ cmd: {
+ ...state.cmd,
+ loaded: false,
+ stdout: "",
+ stderr: "",
+ },
stdout: "",
stderr: "",
}
+ case types.task.task_finish:
+ if (state.runner[action.task.processor].task.uuid !== action.task.uuid) {
+ return state
+ }
+ return {
+ ...state,
+ runner: {
+ ...state.runner,
+ [action.task.processor]: { status: 'IDLE', task: {} },
+ },
+ }
case types.system.stdout:
return {
...state,
diff --git a/app/relay/interpreters.js b/app/relay/interpreters.js
index 63a2c25..90dfcaa 100644
--- a/app/relay/interpreters.js
+++ b/app/relay/interpreters.js
@@ -1,3 +1,5 @@
+require('dotenv').config()
+
export default {
bash: {
cmd: process.env.BASH_BIN || '/bin/bash',
@@ -8,15 +10,18 @@ export default {
gpu: false,
},
python: {
- cmd: process.env.PYTHON_BIN || '/usr/bin/python3',
+ cmd: process.env.PYTHON_BIN || '/usr/bin/python',
+ params: ['-u'],
gpu: false,
},
pytorch: {
cmd: process.env.PYTORCH_BIN,
+ params: ['-u'],
gpu: true,
},
tensorflow: {
cmd: process.env.TENSORFLOW_BIN,
+ params: ['-u'],
gpu: true,
},
} \ No newline at end of file
diff --git a/app/relay/modules/test.js b/app/relay/modules/test.js
index 5619159..1d7042e 100644
--- a/app/relay/modules/test.js
+++ b/app/relay/modules/test.js
@@ -14,7 +14,7 @@ const gpu = {
params: '--test',
}
const live = {
- type: 'pytorch',
+ type: 'python',
script: 'test.py',
}
diff --git a/app/relay/remote.js b/app/relay/remote.js
index b8cfa15..5979c68 100644
--- a/app/relay/remote.js
+++ b/app/relay/remote.js
@@ -39,7 +39,7 @@ remote.on('cmd', (data) => {
remote.on('task', (data) => {
let response;
- console.log(data)
+ // console.log(data)
console.log('task', data.type)
switch(data.type) {
case 'start':
diff --git a/app/relay/runner.js b/app/relay/runner.js
index 039bfe7..699a343 100644
--- a/app/relay/runner.js
+++ b/app/relay/runner.js
@@ -1,15 +1,15 @@
-// monitors which process is currently running
-// kills it if need be.... murder
-
import { execFile, spawn } from 'child_process'
import interpreters from './interpreters'
import modules from './modules'
import kill from 'tree-kill'
import { remote } from './remote'
+import uuidv1 from 'uuid/v1'
+
+const idle_state = { status: 'IDLE', task: {} }
export const state = {
- current_cpu_task: null,
- current_gpu_task: null,
+ current_cpu_task: idle_state,
+ current_gpu_task: idle_state,
}
export function get_current_cpu_task(){
@@ -28,8 +28,34 @@ export function get_current_task(processor) {
}
}
+function serialize_task(t){
+ if (!t || t.status === 'IDLE') {
+ return {
+ status: 'IDLE',
+ }
+ }
+ return {
+ status: 'RUNNING',
+ task: t.task,
+ pid: t.subprocess.pid,
+ }
+}
+function clear_task(is_gpu, task){
+ if (is_gpu) {
+ if (state.current_gpu_task.task && state.current_gpu_task.task.uuid === task.uuid) {
+ state.current_gpu_task = idle_state
+ }
+ } else {
+ if (state.current_cpu_task.task && state.current_cpu_task.task.uuid === task.uuid) {
+ state.current_cpu_task = idle_state
+ }
+ }
+}
export function status () {
- return state
+ return {
+ cpu: serialize_task(state.current_cpu_task),
+ gpu: serialize_task(state.current_gpu_task),
+ }
}
export function build_params(module, task) {
@@ -50,7 +76,10 @@ export function build_params(module, task) {
return [flag, value]
}).reduce((acc, cur) => acc.concat(cur), [])
}
- const params = [ activity.script ].concat(activity.params || []).concat(opt_params)
+ const params = (interpreter.params || [])
+ .concat([ activity.script ])
+ .concat(activity.params || [])
+ .concat(opt_params)
return {
activity,
interpreter,
@@ -82,18 +111,18 @@ export function run_task(task, preempt, watch){
const { activity, interpreter, params } = build_params(module, task)
if (! interpreter) return { type: 'error', error: "No such interpreter: " + activity.interpreter }
- if (interpreter.gpu && state.current_gpu_task) {
+ if (interpreter.gpu && state.current_gpu_task.status !== 'IDLE') {
if (preempt) {
console.log('preempting currently running GPU task')
- kill_task(state.current_gpu_task)
+ kill_task(state.current_gpu_task.subprocess)
} else {
console.log('already running GPU task :(', state.current_gpu_task.pid)
return { type: 'error', error: 'task already running on gpu' }
}
- } else if (!interpreter.gpu && state.current_cpu_task) {
+ } else if (!interpreter.gpu && state.current_cpu_task.status !== 'IDLE') {
if (preempt) {
console.log('preempting currently running CPU task')
- kill_task(state.current_cpu_task)
+ kill_task(state.current_cpu_task.subprocess)
} else {
console.log('already running CPU task :(')
return { type: 'error', error: 'task already running on cpu' }
@@ -108,12 +137,20 @@ export function run_task(task, preempt, watch){
cwd: module.cwd,
})
if (interpreter.gpu) {
- state.current_gpu_task = subprocess
+ state.current_gpu_task = {
+ subprocess, task, status: 'RUNNING'
+ }
}
else {
- state.current_cpu_task = subprocess
+ state.current_cpu_task = {
+ subprocess, task, status: 'RUNNING'
+ }
}
+
+ task.uuid = task.uuid || uuidv1()
+ task.processor = interpreter.gpu ? 'gpu' : 'cpu'
remote.emit('task_res', { type: 'task_begin', task })
+
if (watch) {
console.log("watching stdout..")
subprocess.stdout.on('data', data => {
@@ -123,12 +160,22 @@ export function run_task(task, preempt, watch){
remote.emit('task_res', { type: 'stderr', data: data.toString('utf8') })
})
}
- subprocess.on('error', (err) => {
- console.log('task error', subprocess.pid, err)
+
+ let finished = false
+
+ subprocess.on('error', (err) => {
+ if (finished) return
+ finished = true
+ console.log('task error', subprocess.exitCode, err)
+ clear_task(interpreter.gpu, task)
remote.emit('task_res', { type: 'task_error', task, err })
})
+
subprocess.on('close', () => {
- console.log('task ended', subprocess.pid)
+ if (finished) return
+ finished = true
+ console.log('task ended', subprocess.exitCode || '')
+ clear_task(interpreter.gpu, task)
remote.emit('task_res', { type: 'task_finish', task })
})
}
diff --git a/app/server/index.js b/app/server/index.js
index b81ed57..775454f 100644
--- a/app/server/index.js
+++ b/app/server/index.js
@@ -45,17 +45,17 @@ function bind_relay(socket) {
client.emit('system_res', { type: 'relay_connected' })
socket.on('res', data => {
- console.log('Received response', data.cmd)
+ // console.log('Received response', data.cmd)
client.emit('res', data)
})
socket.on('status', data => {
- console.log('Received status', data.key)
+ // console.log('Received status', data.key)
client.emit('status', data)
})
socket.on('system_res', data => {
- console.log('System responded', data.type)
+ // console.log('System responded', data.type)
client.emit('system_res', data)
})
@@ -84,17 +84,17 @@ function bind_client(socket){
}
socket.on('cmd', data => {
- console.log('Client sent command', data)
+ // console.log('Client sent command', data)
relay.emit('cmd', data)
})
socket.on('system', data => {
- console.log('Client sent system command', data)
+ // console.log('Client sent system command', data)
relay.emit('system', data)
})
socket.on('task', data => {
- console.log('Client sent task command', data)
+ // console.log('Client sent task command', data)
relay.emit('task', data)
})