diff options
Diffstat (limited to 'app')
| -rw-r--r-- | app/relay/index.js | 53 | ||||
| -rw-r--r-- | app/server/index.js | 38 |
2 files changed, 81 insertions, 10 deletions
diff --git a/app/relay/index.js b/app/relay/index.js index 22dc85f..2ed8e10 100644 --- a/app/relay/index.js +++ b/app/relay/index.js @@ -1,6 +1,33 @@ -const zerorpc = require('zerorpc') require('dotenv').config() +const io = require('socket.io-client') +const ss = require('socket.io-stream') +const zerorpc = require('zerorpc') +const Readable = require('stream').Readable; + +let remote = io.connect(process.env.SOCKETIO_REMOTE); + +remote.on('cmd', (data) => { + if (! data.cmd || ! data.payload) { + console.log('malformed param...?') + return + } + console.log('got', data.cmd) + switch (data.cmd) { + case 'send_param': + rpc.invoke(data.cmd, data.payload.key, data.payload.value, (err, res, more) => { + console.log('sent param', res) + }) + break + case 'get_params': + rpc.invoke(data.cmd, (err, res, more) => { + console.log('got params', res) + remote.emit('params', res) + }) + break + } +}) + let rpc = new zerorpc.Client() rpc.connect("tcp://127.0.0.1:" + process.env.RPC_PORT) rpc.on("error", function(error) { @@ -9,17 +36,24 @@ rpc.on("error", function(error) { console.log('RPC listening on port ' + process.env.RPC_PORT) let relay = new zerorpc.Server({ - send_file: function (file, reply) { - reply() - console.log('got file, ' + file.length + ' bytes') - }, + // Called when the worker starts up and is ready to receive params. connected: function (msg, reply) { reply() - console.log("got connect, trying to call back.") - rpc.invoke("send_param", "foo", "bar", (err, res, more) => { - console.log('sent param') - }) + console.log("got connect from " + msg) return true + }, + + send_frame: function (fn, frame, reply) { + reply() + console.log('got frame, ' + frame.length + ' bytes') + var stream = ss.createStream(); + ss(remote).emit('frame', stream, {name: fn}) + + var s = new Readable(); + s._read = function noop() {} + s.push(frame) + s.push(null); + s.pipe(stream) } }) relay.on("error", function(error) { @@ -27,4 +61,3 @@ relay.on("error", function(error) { }) relay.bind("tcp://0.0.0.0:" + process.env.RELAY_PORT); console.log('Relay listening on port ' + process.env.RELAY_PORT) - diff --git a/app/server/index.js b/app/server/index.js new file mode 100644 index 0000000..3fcb8ee --- /dev/null +++ b/app/server/index.js @@ -0,0 +1,38 @@ +require('dotenv').config() +const app = require('express')() +const server = require('http').createServer(app) +const io = require('socket.io')(server) +const ss = require('socket.io-stream') + +const client = io.of('/client') +const relay = io.of('/relay') + +client.on('connect', socket => { + console.log('client connected') + socket.on('cmd', data => { + relay.emit('cmd', data) + }) + socket.on('disconnect', () => { + console.log("client disconnected") + }) +}) + +relay.on('connect', socket => { + console.log('relay connected') + socket.on('params', data => { + client.emit('params', data) + }) + + ss(relay).on('frame', (stream, data) => { + console.log(data) + Object.values(client.connected).map(socket => { + ss(socket).emit('frame', stream, data) + }) + }) + + socket.on('disconnect', () => { + console.log("relay disconnected") + }) +}) + +server.listen(process.env.EXPRESS_PORT)
\ No newline at end of file |
