diff options
| author | Jules Laplace <julescarbon@gmail.com> | 2018-05-26 23:15:09 +0200 |
|---|---|---|
| committer | Jules Laplace <julescarbon@gmail.com> | 2018-05-26 23:15:09 +0200 |
| commit | 9be9249f7168e1799b1c6689da44d1efb15667ae (patch) | |
| tree | cc428236256f35f559b767b5eeca9953fd483d2f /app | |
| parent | be3b2bd56550b71a2ffb7eb1604c1b8c1d2dd4a2 (diff) | |
modularize... circular dependencies... whatever
Diffstat (limited to 'app')
| -rw-r--r-- | app/client/socket/index.js | 2 | ||||
| -rw-r--r-- | app/client/socket/socket.task.js | 7 | ||||
| -rw-r--r-- | app/client/system/system.component.js | 24 | ||||
| -rw-r--r-- | app/client/task/task.actions.js | 10 | ||||
| -rw-r--r-- | app/relay/index.js | 166 | ||||
| -rw-r--r-- | app/relay/runner.js | 35 |
6 files changed, 63 insertions, 181 deletions
diff --git a/app/client/socket/index.js b/app/client/socket/index.js index c0fed61..1430ac9 100644 --- a/app/client/socket/index.js +++ b/app/client/socket/index.js @@ -4,11 +4,13 @@ import types from '../types' import { socket } from './socket.connection' import * as system from './socket.system' import * as live from './socket.live' +import * as task from './socket.task' export default { socket, system, live, + task, } socket.on('status', (data) => { diff --git a/app/client/socket/socket.task.js b/app/client/socket/socket.task.js index 936d2bc..d65ed13 100644 --- a/app/client/socket/socket.task.js +++ b/app/client/socket/socket.task.js @@ -13,16 +13,17 @@ socket.on('task_res', (data) => { } }) -export function start_task(task) { +export function start_task(task, preempt) { socket.emit('task', { - cmd: 'start_task', + type: 'start', task, + preempt, }) } export function stop_task(task) { socket.emit('task', { - cmd: 'stop_task', + type: 'stop', task, }) } diff --git a/app/client/system/system.component.js b/app/client/system/system.component.js index 00c5395..5d75964 100644 --- a/app/client/system/system.component.js +++ b/app/client/system/system.component.js @@ -9,15 +9,25 @@ import * as systemActions from './system.actions' import * as taskActions from '../task/task.actions' const cpu_test_task = { - id: 1073, - activity: 'train', + activity: 'cpu', library: 'test', dataset: 'test', epochs: 1, opt: {} } const gpu_test_task = { - + activity: 'gpu', + library: 'test', + dataset: 'test', + epochs: 1, + opt: {} +} +const live_test_task = { + activity: 'live', + library: 'test', + dataset: 'test', + epochs: 1, + opt: {} } class System extends Component { @@ -61,13 +71,17 @@ class System extends Component { </Group> <Group title="Test"> <Param title='CPU Test Task'> - <button onClick={() => actions.task.start_task(cpu_test_task)}>Start</button> + <button onClick={() => actions.task.start_task(cpu_test_task, { preempt: true, watch: true })}>Start</button> <button onClick={() => actions.task.stop_task(cpu_test_task)}>Stop</button> </Param> <Param title='GPU Test Task'> - <button onClick={() => actions.task.start_task(gpu_test_task)}>Start</button> + <button onClick={() => actions.task.start_task(gpu_test_task, { preempt: true, watch: true })}>Start</button> <button onClick={() => actions.task.stop_task(gpu_test_task)}>Stop</button> </Param> + <Param title='Live Test Task'> + <button onClick={() => actions.task.start_task(live_test_task, { preempt: true, watch: true })}>Start</button> + <button onClick={() => actions.task.stop_task(live_test_task)}>Stop</button> + </Param> </Group> </div> {this.renderCommandOutput()} diff --git a/app/client/task/task.actions.js b/app/client/task/task.actions.js index 466b8d7..ea3dfff 100644 --- a/app/client/task/task.actions.js +++ b/app/client/task/task.actions.js @@ -1,10 +1,12 @@ import socket from '../socket' +import types from '../types' -export const start_task = (task) => { +export const start_task = (task, opt={}) => { socket.task.start_task(task) - return { type: types.task.starting_task, task } + return { type: types.task.starting_task, task, ...opt } } -export const stop_task = (task) => { + +export const stop_task = (task, opt={}) => { socket.task.stop_task(task) - return { type: types.task.stopping_task, task } + return { type: types.task.stopping_task, task, ...opt } } diff --git a/app/relay/index.js b/app/relay/index.js index ed59c44..e176d77 100644 --- a/app/relay/index.js +++ b/app/relay/index.js @@ -1,165 +1,5 @@ require('dotenv').config() -const io = require('socket.io-client') -const zerorpc = require('zerorpc') -const Readable = require('stream').Readable -const runner = require('./runner') - -let remote, relay, rpc, rpc_connected = false - -remote = io.connect(process.env.SOCKETIO_REMOTE) -remote.on('cmd', (data) => { - console.log('cmd data', data) - if (! data.cmd) return console.log('malformed param...?') - console.log('got', data.cmd) - switch (data.cmd) { - case 'set_param': - if (! data.payload) return - rpc.invoke(data.cmd, data.payload.key, data.payload.value, (err, res, more) => { - console.log('sent param, got response', res) - }) - break - case 'get_params': - case 'get_last_frame': - rpc.invoke(data.cmd, (err, res, more) => { - console.log('got params', res) - remote.emit('res', { - cmd: data.cmd, - res: res, - }) - }) - break - default: - rpc.invoke('send_command', data.cmd, data.payload || null, (err, res, more) => { - console.log('sent command', res) - remote.emit('res', { - cmd: data.cmd, - res: res, - }) - }) - break - } -}) - -remote.on('task', (data) => { - console.log('task:', data.task) - switch(data.cmd) { - case 'start': - break - case 'stop': - break - case 'kill': - break - case 'add': - break - case 'remove': - break - case 'start_queue': - break - case 'stop_queue': - break - case 'list': - break - case 'set_priority': - break - // case 'get_status': - // remote.emit('system_res', { - // type: 'relay_status', - // rpc_connected: rpc_connected, - // runner: runner.status(), - // }) - // break - default: - remote.emit('system_res', { cmd: 'error', error: 'unknown task command' }) - break - } -}) - -remote.on('system', (data) => { - console.log('system:', data.cmd) - switch(data.cmd) { - case 'run_system_command': - runner.run_system_command(data.payload, (error, stdout, stderr) => { - remote.emit('system_res', { - type: 'command_output', - cmd: data.payload, - error, stdout, stderr - }) - }) - break - case 'get_status': - remote.emit('system_res', { - type: 'relay_status', - rpc_connected: rpc_connected, - runner: runner.status(), - }) - break - default: - remote.emit('system_res', { cmd: 'error', error: 'unknown system command' }) - break - } -}) - -rpc = new zerorpc.Client() -rpc.connect('tcp://127.0.0.1:' + process.env.RPC_PORT) -rpc.on('error', function(error) { - console.error('RPC server error:', error) -}) -console.log('RPC listening on port ' + process.env.RPC_PORT) - -relay = new zerorpc.Server({ - // Called when the worker starts up and is ready to receive params. - connected: function(msg, reply) { - reply() - console.log('got connect from ' + msg) - remote.emit('system_res', { - type: 'rpc_connected', - runner: runner.status(), - }) - rpc_connected = true - return true - }, - - send_frame: function(fn, meta, frame, reply) { - reply() - // console.log('got frame, ' + frame.length + ' bytes') - remote.emit('frame', { fn: fn, meta: meta, frame: frame }) - }, - - send_status: function(key, value, reply) { - reply() - remote.emit('status', { key: key, value: value }) - }, - - disconnecting: function(){ - reply() - remote.emit('system_res', { - type: 'rpc_disconnected', - }) - rpc_connected = false - return true - }, -}) -relay.on('error', function(error) { - console.error('Relay server error:', error) -}) -relay.bind('tcp://0.0.0.0:' + process.env.RELAY_PORT) -console.log('Relay listening on port ' + process.env.RELAY_PORT) - -rpc.invoke('ping', (err, res, more) => { - console.log('sent ping', res) - if (res === 'pong') { - remote.emit('system_res', { - type: 'rpc_connected', - runner: runner.status() - }) - rpc_connected = true - } else { - remote.emit('system_res', { - type: 'rpc_disconnected', - }) - rpc_connected = false - } -}) - -module.exports = { relay: relay, remote: remote, }
\ No newline at end of file +import { remote } from './remote' +import { relay } from './relay' +import { rpc } from './rpc' diff --git a/app/relay/runner.js b/app/relay/runner.js index 9fc2a38..fbc7ad7 100644 --- a/app/relay/runner.js +++ b/app/relay/runner.js @@ -6,15 +6,11 @@ import interpreters from './interpreters' import modules from './modules' import { kill } from 'tree-kill' -var state = { +export const state = { current_cpu_task: null, current_gpu_task: null, } -export function status(){ - return {} -} - export function get_current_cpu_task(){ return state.current_cpu_task } @@ -23,6 +19,18 @@ export function get_current_gpu_task(){ return state.current_gpu_task } +export function get_current_task(processor) { + if (processor === 'cpu') { + return state.current_cpu_task + } else { + return state.current_gpu_task + } +} + +export function status () { + return state +} + export function build_params(module, task) { const activity = module.activities[task.activity] const interpreter = interpreters[activity.type] @@ -65,10 +73,25 @@ export function run_system_command(cmd, cb) { } } -export function run_task(task){ +export function run_task(task, preempt){ const module = modules[task.module] if (! module) throw new Error("No such module") const { activity, interpreter, params } = build_params(module, task) + + if (activity.cpu && state.current_cpu_task) { + if (preempt) { + console.log('preempting currently running GPU task') + } else { + return { type: 'error', error: 'task already running on cpu' } + } + } else { + if (preempt) { + console.log('preempting currently running CPU task') + } else { + return { type: 'error', error: 'task already running on cpu' } + } + } + console.log('running task', activity.name) console.log(activity.interpreter, activity.script, params) const subprocess = spawn(activity.interpreter, params) |
