From 2fa89d40071e4afffa2aeb1805eecf2f1c148cf0 Mon Sep 17 00:00:00 2001 From: Jules Laplace Date: Sat, 26 May 2018 23:17:19 +0200 Subject: modularize... circular dependencies... whatever --- app/relay/relay.js | 45 ++++++++++++++++++++++ app/relay/remote.js | 107 ++++++++++++++++++++++++++++++++++++++++++++++++++++ app/relay/rpc.js | 37 ++++++++++++++++++ 3 files changed, 189 insertions(+) create mode 100644 app/relay/relay.js create mode 100644 app/relay/remote.js create mode 100644 app/relay/rpc.js diff --git a/app/relay/relay.js b/app/relay/relay.js new file mode 100644 index 0000000..bb63e0b --- /dev/null +++ b/app/relay/relay.js @@ -0,0 +1,45 @@ +import { remote } from './remote' +import * as rpc from './rpc' + +const zerorpc = require('zerorpc') +const Readable = require('stream').Readable +const runner = require('./runner') + +export const relay = new zerorpc.Server({ + // Called when the worker starts up and is ready to receive params. + connected: function(msg, reply) { + reply() + console.log('got connect from ' + msg) + remote.emit('system_res', { + type: 'rpc_connected', + runner: runner.status(), + }) + rpc.set_connected(true) + return true + }, + + send_frame: function(fn, meta, frame, reply) { + reply() + // console.log('got frame, ' + frame.length + ' bytes') + remote.emit('frame', { fn: fn, meta: meta, frame: frame }) + }, + + send_status: function(key, value, reply) { + reply() + remote.emit('status', { key: key, value: value }) + }, + + disconnecting: function(){ + reply() + remote.emit('system_res', { + type: 'rpc_disconnected', + }) + rpc.set_connected(false) + return true + }, +}) +relay.on('error', function(error) { + console.error('Relay server error:', error) +}) +relay.bind('tcp://0.0.0.0:' + process.env.RELAY_PORT) +console.log('Relay listening on port ' + process.env.RELAY_PORT) diff --git a/app/relay/remote.js b/app/relay/remote.js new file mode 100644 index 0000000..b8cfa15 --- /dev/null +++ b/app/relay/remote.js @@ -0,0 +1,107 @@ +import { rpc, get_connected } from './rpc' + +const io = require('socket.io-client') +const runner = require('./runner') + +export const remote = io.connect(process.env.SOCKETIO_REMOTE) +remote.on('cmd', (data) => { + console.log('cmd data', data) + if (! data.cmd) return console.log('malformed param...?') + console.log('got', data.cmd) + switch (data.cmd) { + case 'set_param': + if (! data.payload) return + rpc.invoke(data.cmd, data.payload.key, data.payload.value, (err, res, more) => { + console.log('sent param, got response', res) + }) + break + case 'get_params': + case 'get_last_frame': + rpc.invoke(data.cmd, (err, res, more) => { + console.log('got params', res) + remote.emit('res', { + cmd: data.cmd, + res: res, + }) + }) + break + default: + rpc.invoke('send_command', data.cmd, data.payload || null, (err, res, more) => { + console.log('sent command', res) + remote.emit('res', { + cmd: data.cmd, + res: res, + }) + }) + break + } +}) + +remote.on('task', (data) => { + let response; + console.log(data) + console.log('task', data.type) + switch(data.type) { + case 'start': + response = runner.run_task(data.task, data.preempt, data.watch) + break + case 'stop': + break + case 'kill': + break + case 'add': + break + case 'remove': + break + case 'start_queue': + break + case 'stop_queue': + break + case 'list': + break + case 'set_priority': + break + // case 'get_status': + // remote.emit('task_res', { + // type: 'relay_status', + // rpc_connected: rpc_connected, + // runner: runner.status(), + // }) + // break + default: + response = { type: 'error', error: 'unknown task command' } + break + } + if (response) { + if (response.type) { + remote.emit('task_res', response) + } else { + remote.emit('task_res', { type: data.type, response }) + } + } +}) + +remote.on('system', (data) => { + console.log('system:', data.cmd) + switch(data.cmd) { + case 'run_system_command': + runner.run_system_command(data.payload, (error, stdout, stderr) => { + remote.emit('system_res', { + type: 'command_output', + cmd: data.payload, + error, stdout, stderr + }) + }) + break + case 'get_status': + remote.emit('system_res', { + type: 'relay_status', + rpc_connected: get_connected(), + runner: runner.status(), + }) + break + default: + remote.emit('system_res', { type: 'error', error: 'unknown system command' }) + break + } +}) diff --git a/app/relay/rpc.js b/app/relay/rpc.js new file mode 100644 index 0000000..d21dacc --- /dev/null +++ b/app/relay/rpc.js @@ -0,0 +1,37 @@ +require('dotenv').config() + +import { remote } from './remote' +import * as runner from './runner' + +const zerorpc = require('zerorpc') + +let connected = false +export const rpc = new zerorpc.Client() +rpc.connect('tcp://127.0.0.1:' + process.env.RPC_PORT) +rpc.on('error', function(error) { + console.error('RPC server error:', error) +}) +console.log('RPC listening on port ' + process.env.RPC_PORT) + +rpc.invoke('ping', (err, res, more) => { + console.log('sent ping to rpc, got', res) + if (res === 'pong') { + remote.emit('system_res', { + type: 'rpc_connected', + runner: runner.status() + }) + connected = true + } else { + remote.emit('system_res', { + type: 'rpc_disconnected', + }) + connected = false + } +}) + +export function get_connected() { + return connected +} +export function set_connected(option) { + connected = !! option +} \ No newline at end of file -- cgit v1.2.3-70-g09d2