summaryrefslogtreecommitdiff
path: root/app
diff options
context:
space:
mode:
authorJules Laplace <julescarbon@gmail.com>2018-05-26 23:15:09 +0200
committerJules Laplace <julescarbon@gmail.com>2018-05-26 23:15:09 +0200
commit9be9249f7168e1799b1c6689da44d1efb15667ae (patch)
treecc428236256f35f559b767b5eeca9953fd483d2f /app
parentbe3b2bd56550b71a2ffb7eb1604c1b8c1d2dd4a2 (diff)
modularize... circular dependencies... whatever
Diffstat (limited to 'app')
-rw-r--r--app/client/socket/index.js2
-rw-r--r--app/client/socket/socket.task.js7
-rw-r--r--app/client/system/system.component.js24
-rw-r--r--app/client/task/task.actions.js10
-rw-r--r--app/relay/index.js166
-rw-r--r--app/relay/runner.js35
6 files changed, 63 insertions, 181 deletions
diff --git a/app/client/socket/index.js b/app/client/socket/index.js
index c0fed61..1430ac9 100644
--- a/app/client/socket/index.js
+++ b/app/client/socket/index.js
@@ -4,11 +4,13 @@ import types from '../types'
import { socket } from './socket.connection'
import * as system from './socket.system'
import * as live from './socket.live'
+import * as task from './socket.task'
export default {
socket,
system,
live,
+ task,
}
socket.on('status', (data) => {
diff --git a/app/client/socket/socket.task.js b/app/client/socket/socket.task.js
index 936d2bc..d65ed13 100644
--- a/app/client/socket/socket.task.js
+++ b/app/client/socket/socket.task.js
@@ -13,16 +13,17 @@ socket.on('task_res', (data) => {
}
})
-export function start_task(task) {
+export function start_task(task, preempt) {
socket.emit('task', {
- cmd: 'start_task',
+ type: 'start',
task,
+ preempt,
})
}
export function stop_task(task) {
socket.emit('task', {
- cmd: 'stop_task',
+ type: 'stop',
task,
})
}
diff --git a/app/client/system/system.component.js b/app/client/system/system.component.js
index 00c5395..5d75964 100644
--- a/app/client/system/system.component.js
+++ b/app/client/system/system.component.js
@@ -9,15 +9,25 @@ import * as systemActions from './system.actions'
import * as taskActions from '../task/task.actions'
const cpu_test_task = {
- id: 1073,
- activity: 'train',
+ activity: 'cpu',
library: 'test',
dataset: 'test',
epochs: 1,
opt: {}
}
const gpu_test_task = {
-
+ activity: 'gpu',
+ library: 'test',
+ dataset: 'test',
+ epochs: 1,
+ opt: {}
+}
+const live_test_task = {
+ activity: 'live',
+ library: 'test',
+ dataset: 'test',
+ epochs: 1,
+ opt: {}
}
class System extends Component {
@@ -61,13 +71,17 @@ class System extends Component {
</Group>
<Group title="Test">
<Param title='CPU Test Task'>
- <button onClick={() => actions.task.start_task(cpu_test_task)}>Start</button>
+ <button onClick={() => actions.task.start_task(cpu_test_task, { preempt: true, watch: true })}>Start</button>
<button onClick={() => actions.task.stop_task(cpu_test_task)}>Stop</button>
</Param>
<Param title='GPU Test Task'>
- <button onClick={() => actions.task.start_task(gpu_test_task)}>Start</button>
+ <button onClick={() => actions.task.start_task(gpu_test_task, { preempt: true, watch: true })}>Start</button>
<button onClick={() => actions.task.stop_task(gpu_test_task)}>Stop</button>
</Param>
+ <Param title='Live Test Task'>
+ <button onClick={() => actions.task.start_task(live_test_task, { preempt: true, watch: true })}>Start</button>
+ <button onClick={() => actions.task.stop_task(live_test_task)}>Stop</button>
+ </Param>
</Group>
</div>
{this.renderCommandOutput()}
diff --git a/app/client/task/task.actions.js b/app/client/task/task.actions.js
index 466b8d7..ea3dfff 100644
--- a/app/client/task/task.actions.js
+++ b/app/client/task/task.actions.js
@@ -1,10 +1,12 @@
import socket from '../socket'
+import types from '../types'
-export const start_task = (task) => {
+export const start_task = (task, opt={}) => {
socket.task.start_task(task)
- return { type: types.task.starting_task, task }
+ return { type: types.task.starting_task, task, ...opt }
}
-export const stop_task = (task) => {
+
+export const stop_task = (task, opt={}) => {
socket.task.stop_task(task)
- return { type: types.task.stopping_task, task }
+ return { type: types.task.stopping_task, task, ...opt }
}
diff --git a/app/relay/index.js b/app/relay/index.js
index ed59c44..e176d77 100644
--- a/app/relay/index.js
+++ b/app/relay/index.js
@@ -1,165 +1,5 @@
require('dotenv').config()
-const io = require('socket.io-client')
-const zerorpc = require('zerorpc')
-const Readable = require('stream').Readable
-const runner = require('./runner')
-
-let remote, relay, rpc, rpc_connected = false
-
-remote = io.connect(process.env.SOCKETIO_REMOTE)
-remote.on('cmd', (data) => {
- console.log('cmd data', data)
- if (! data.cmd) return console.log('malformed param...?')
- console.log('got', data.cmd)
- switch (data.cmd) {
- case 'set_param':
- if (! data.payload) return
- rpc.invoke(data.cmd, data.payload.key, data.payload.value, (err, res, more) => {
- console.log('sent param, got response', res)
- })
- break
- case 'get_params':
- case 'get_last_frame':
- rpc.invoke(data.cmd, (err, res, more) => {
- console.log('got params', res)
- remote.emit('res', {
- cmd: data.cmd,
- res: res,
- })
- })
- break
- default:
- rpc.invoke('send_command', data.cmd, data.payload || null, (err, res, more) => {
- console.log('sent command', res)
- remote.emit('res', {
- cmd: data.cmd,
- res: res,
- })
- })
- break
- }
-})
-
-remote.on('task', (data) => {
- console.log('task:', data.task)
- switch(data.cmd) {
- case 'start':
- break
- case 'stop':
- break
- case 'kill':
- break
- case 'add':
- break
- case 'remove':
- break
- case 'start_queue':
- break
- case 'stop_queue':
- break
- case 'list':
- break
- case 'set_priority':
- break
- // case 'get_status':
- // remote.emit('system_res', {
- // type: 'relay_status',
- // rpc_connected: rpc_connected,
- // runner: runner.status(),
- // })
- // break
- default:
- remote.emit('system_res', { cmd: 'error', error: 'unknown task command' })
- break
- }
-})
-
-remote.on('system', (data) => {
- console.log('system:', data.cmd)
- switch(data.cmd) {
- case 'run_system_command':
- runner.run_system_command(data.payload, (error, stdout, stderr) => {
- remote.emit('system_res', {
- type: 'command_output',
- cmd: data.payload,
- error, stdout, stderr
- })
- })
- break
- case 'get_status':
- remote.emit('system_res', {
- type: 'relay_status',
- rpc_connected: rpc_connected,
- runner: runner.status(),
- })
- break
- default:
- remote.emit('system_res', { cmd: 'error', error: 'unknown system command' })
- break
- }
-})
-
-rpc = new zerorpc.Client()
-rpc.connect('tcp://127.0.0.1:' + process.env.RPC_PORT)
-rpc.on('error', function(error) {
- console.error('RPC server error:', error)
-})
-console.log('RPC listening on port ' + process.env.RPC_PORT)
-
-relay = new zerorpc.Server({
- // Called when the worker starts up and is ready to receive params.
- connected: function(msg, reply) {
- reply()
- console.log('got connect from ' + msg)
- remote.emit('system_res', {
- type: 'rpc_connected',
- runner: runner.status(),
- })
- rpc_connected = true
- return true
- },
-
- send_frame: function(fn, meta, frame, reply) {
- reply()
- // console.log('got frame, ' + frame.length + ' bytes')
- remote.emit('frame', { fn: fn, meta: meta, frame: frame })
- },
-
- send_status: function(key, value, reply) {
- reply()
- remote.emit('status', { key: key, value: value })
- },
-
- disconnecting: function(){
- reply()
- remote.emit('system_res', {
- type: 'rpc_disconnected',
- })
- rpc_connected = false
- return true
- },
-})
-relay.on('error', function(error) {
- console.error('Relay server error:', error)
-})
-relay.bind('tcp://0.0.0.0:' + process.env.RELAY_PORT)
-console.log('Relay listening on port ' + process.env.RELAY_PORT)
-
-rpc.invoke('ping', (err, res, more) => {
- console.log('sent ping', res)
- if (res === 'pong') {
- remote.emit('system_res', {
- type: 'rpc_connected',
- runner: runner.status()
- })
- rpc_connected = true
- } else {
- remote.emit('system_res', {
- type: 'rpc_disconnected',
- })
- rpc_connected = false
- }
-})
-
-module.exports = { relay: relay, remote: remote, } \ No newline at end of file
+import { remote } from './remote'
+import { relay } from './relay'
+import { rpc } from './rpc'
diff --git a/app/relay/runner.js b/app/relay/runner.js
index 9fc2a38..fbc7ad7 100644
--- a/app/relay/runner.js
+++ b/app/relay/runner.js
@@ -6,15 +6,11 @@ import interpreters from './interpreters'
import modules from './modules'
import { kill } from 'tree-kill'
-var state = {
+export const state = {
current_cpu_task: null,
current_gpu_task: null,
}
-export function status(){
- return {}
-}
-
export function get_current_cpu_task(){
return state.current_cpu_task
}
@@ -23,6 +19,18 @@ export function get_current_gpu_task(){
return state.current_gpu_task
}
+export function get_current_task(processor) {
+ if (processor === 'cpu') {
+ return state.current_cpu_task
+ } else {
+ return state.current_gpu_task
+ }
+}
+
+export function status () {
+ return state
+}
+
export function build_params(module, task) {
const activity = module.activities[task.activity]
const interpreter = interpreters[activity.type]
@@ -65,10 +73,25 @@ export function run_system_command(cmd, cb) {
}
}
-export function run_task(task){
+export function run_task(task, preempt){
const module = modules[task.module]
if (! module) throw new Error("No such module")
const { activity, interpreter, params } = build_params(module, task)
+
+ if (activity.cpu && state.current_cpu_task) {
+ if (preempt) {
+ console.log('preempting currently running GPU task')
+ } else {
+ return { type: 'error', error: 'task already running on cpu' }
+ }
+ } else {
+ if (preempt) {
+ console.log('preempting currently running CPU task')
+ } else {
+ return { type: 'error', error: 'task already running on cpu' }
+ }
+ }
+
console.log('running task', activity.name)
console.log(activity.interpreter, activity.script, params)
const subprocess = spawn(activity.interpreter, params)