summaryrefslogtreecommitdiff
path: root/app/relay
diff options
context:
space:
mode:
Diffstat (limited to 'app/relay')
-rw-r--r--app/relay/interpreters.js7
-rw-r--r--app/relay/modules/test.js2
-rw-r--r--app/relay/remote.js2
-rw-r--r--app/relay/runner.js79
4 files changed, 71 insertions, 19 deletions
diff --git a/app/relay/interpreters.js b/app/relay/interpreters.js
index 63a2c25..90dfcaa 100644
--- a/app/relay/interpreters.js
+++ b/app/relay/interpreters.js
@@ -1,3 +1,5 @@
+require('dotenv').config()
+
export default {
bash: {
cmd: process.env.BASH_BIN || '/bin/bash',
@@ -8,15 +10,18 @@ export default {
gpu: false,
},
python: {
- cmd: process.env.PYTHON_BIN || '/usr/bin/python3',
+ cmd: process.env.PYTHON_BIN || '/usr/bin/python',
+ params: ['-u'],
gpu: false,
},
pytorch: {
cmd: process.env.PYTORCH_BIN,
+ params: ['-u'],
gpu: true,
},
tensorflow: {
cmd: process.env.TENSORFLOW_BIN,
+ params: ['-u'],
gpu: true,
},
} \ No newline at end of file
diff --git a/app/relay/modules/test.js b/app/relay/modules/test.js
index 5619159..1d7042e 100644
--- a/app/relay/modules/test.js
+++ b/app/relay/modules/test.js
@@ -14,7 +14,7 @@ const gpu = {
params: '--test',
}
const live = {
- type: 'pytorch',
+ type: 'python',
script: 'test.py',
}
diff --git a/app/relay/remote.js b/app/relay/remote.js
index b8cfa15..5979c68 100644
--- a/app/relay/remote.js
+++ b/app/relay/remote.js
@@ -39,7 +39,7 @@ remote.on('cmd', (data) => {
remote.on('task', (data) => {
let response;
- console.log(data)
+ // console.log(data)
console.log('task', data.type)
switch(data.type) {
case 'start':
diff --git a/app/relay/runner.js b/app/relay/runner.js
index 039bfe7..699a343 100644
--- a/app/relay/runner.js
+++ b/app/relay/runner.js
@@ -1,15 +1,15 @@
-// monitors which process is currently running
-// kills it if need be.... murder
-
import { execFile, spawn } from 'child_process'
import interpreters from './interpreters'
import modules from './modules'
import kill from 'tree-kill'
import { remote } from './remote'
+import uuidv1 from 'uuid/v1'
+
+const idle_state = { status: 'IDLE', task: {} }
export const state = {
- current_cpu_task: null,
- current_gpu_task: null,
+ current_cpu_task: idle_state,
+ current_gpu_task: idle_state,
}
export function get_current_cpu_task(){
@@ -28,8 +28,34 @@ export function get_current_task(processor) {
}
}
+function serialize_task(t){
+ if (!t || t.status === 'IDLE') {
+ return {
+ status: 'IDLE',
+ }
+ }
+ return {
+ status: 'RUNNING',
+ task: t.task,
+ pid: t.subprocess.pid,
+ }
+}
+function clear_task(is_gpu, task){
+ if (is_gpu) {
+ if (state.current_gpu_task.task && state.current_gpu_task.task.uuid === task.uuid) {
+ state.current_gpu_task = idle_state
+ }
+ } else {
+ if (state.current_cpu_task.task && state.current_cpu_task.task.uuid === task.uuid) {
+ state.current_cpu_task = idle_state
+ }
+ }
+}
export function status () {
- return state
+ return {
+ cpu: serialize_task(state.current_cpu_task),
+ gpu: serialize_task(state.current_gpu_task),
+ }
}
export function build_params(module, task) {
@@ -50,7 +76,10 @@ export function build_params(module, task) {
return [flag, value]
}).reduce((acc, cur) => acc.concat(cur), [])
}
- const params = [ activity.script ].concat(activity.params || []).concat(opt_params)
+ const params = (interpreter.params || [])
+ .concat([ activity.script ])
+ .concat(activity.params || [])
+ .concat(opt_params)
return {
activity,
interpreter,
@@ -82,18 +111,18 @@ export function run_task(task, preempt, watch){
const { activity, interpreter, params } = build_params(module, task)
if (! interpreter) return { type: 'error', error: "No such interpreter: " + activity.interpreter }
- if (interpreter.gpu && state.current_gpu_task) {
+ if (interpreter.gpu && state.current_gpu_task.status !== 'IDLE') {
if (preempt) {
console.log('preempting currently running GPU task')
- kill_task(state.current_gpu_task)
+ kill_task(state.current_gpu_task.subprocess)
} else {
console.log('already running GPU task :(', state.current_gpu_task.pid)
return { type: 'error', error: 'task already running on gpu' }
}
- } else if (!interpreter.gpu && state.current_cpu_task) {
+ } else if (!interpreter.gpu && state.current_cpu_task.status !== 'IDLE') {
if (preempt) {
console.log('preempting currently running CPU task')
- kill_task(state.current_cpu_task)
+ kill_task(state.current_cpu_task.subprocess)
} else {
console.log('already running CPU task :(')
return { type: 'error', error: 'task already running on cpu' }
@@ -108,12 +137,20 @@ export function run_task(task, preempt, watch){
cwd: module.cwd,
})
if (interpreter.gpu) {
- state.current_gpu_task = subprocess
+ state.current_gpu_task = {
+ subprocess, task, status: 'RUNNING'
+ }
}
else {
- state.current_cpu_task = subprocess
+ state.current_cpu_task = {
+ subprocess, task, status: 'RUNNING'
+ }
}
+
+ task.uuid = task.uuid || uuidv1()
+ task.processor = interpreter.gpu ? 'gpu' : 'cpu'
remote.emit('task_res', { type: 'task_begin', task })
+
if (watch) {
console.log("watching stdout..")
subprocess.stdout.on('data', data => {
@@ -123,12 +160,22 @@ export function run_task(task, preempt, watch){
remote.emit('task_res', { type: 'stderr', data: data.toString('utf8') })
})
}
- subprocess.on('error', (err) => {
- console.log('task error', subprocess.pid, err)
+
+ let finished = false
+
+ subprocess.on('error', (err) => {
+ if (finished) return
+ finished = true
+ console.log('task error', subprocess.exitCode, err)
+ clear_task(interpreter.gpu, task)
remote.emit('task_res', { type: 'task_error', task, err })
})
+
subprocess.on('close', () => {
- console.log('task ended', subprocess.pid)
+ if (finished) return
+ finished = true
+ console.log('task ended', subprocess.exitCode || '')
+ clear_task(interpreter.gpu, task)
remote.emit('task_res', { type: 'task_finish', task })
})
}