summaryrefslogtreecommitdiff
path: root/app
diff options
context:
space:
mode:
Diffstat (limited to 'app')
-rw-r--r--app/relay/index.js53
-rw-r--r--app/server/index.js38
2 files changed, 81 insertions, 10 deletions
diff --git a/app/relay/index.js b/app/relay/index.js
index 22dc85f..2ed8e10 100644
--- a/app/relay/index.js
+++ b/app/relay/index.js
@@ -1,6 +1,33 @@
-const zerorpc = require('zerorpc')
require('dotenv').config()
+const io = require('socket.io-client')
+const ss = require('socket.io-stream')
+const zerorpc = require('zerorpc')
+const Readable = require('stream').Readable;
+
+let remote = io.connect(process.env.SOCKETIO_REMOTE);
+
+remote.on('cmd', (data) => {
+ if (! data.cmd || ! data.payload) {
+ console.log('malformed param...?')
+ return
+ }
+ console.log('got', data.cmd)
+ switch (data.cmd) {
+ case 'send_param':
+ rpc.invoke(data.cmd, data.payload.key, data.payload.value, (err, res, more) => {
+ console.log('sent param', res)
+ })
+ break
+ case 'get_params':
+ rpc.invoke(data.cmd, (err, res, more) => {
+ console.log('got params', res)
+ remote.emit('params', res)
+ })
+ break
+ }
+})
+
let rpc = new zerorpc.Client()
rpc.connect("tcp://127.0.0.1:" + process.env.RPC_PORT)
rpc.on("error", function(error) {
@@ -9,17 +36,24 @@ rpc.on("error", function(error) {
console.log('RPC listening on port ' + process.env.RPC_PORT)
let relay = new zerorpc.Server({
- send_file: function (file, reply) {
- reply()
- console.log('got file, ' + file.length + ' bytes')
- },
+ // Called when the worker starts up and is ready to receive params.
connected: function (msg, reply) {
reply()
- console.log("got connect, trying to call back.")
- rpc.invoke("send_param", "foo", "bar", (err, res, more) => {
- console.log('sent param')
- })
+ console.log("got connect from " + msg)
return true
+ },
+
+ send_frame: function (fn, frame, reply) {
+ reply()
+ console.log('got frame, ' + frame.length + ' bytes')
+ var stream = ss.createStream();
+ ss(remote).emit('frame', stream, {name: fn})
+
+ var s = new Readable();
+ s._read = function noop() {}
+ s.push(frame)
+ s.push(null);
+ s.pipe(stream)
}
})
relay.on("error", function(error) {
@@ -27,4 +61,3 @@ relay.on("error", function(error) {
})
relay.bind("tcp://0.0.0.0:" + process.env.RELAY_PORT);
console.log('Relay listening on port ' + process.env.RELAY_PORT)
-
diff --git a/app/server/index.js b/app/server/index.js
new file mode 100644
index 0000000..3fcb8ee
--- /dev/null
+++ b/app/server/index.js
@@ -0,0 +1,38 @@
+require('dotenv').config()
+const app = require('express')()
+const server = require('http').createServer(app)
+const io = require('socket.io')(server)
+const ss = require('socket.io-stream')
+
+const client = io.of('/client')
+const relay = io.of('/relay')
+
+client.on('connect', socket => {
+ console.log('client connected')
+ socket.on('cmd', data => {
+ relay.emit('cmd', data)
+ })
+ socket.on('disconnect', () => {
+ console.log("client disconnected")
+ })
+})
+
+relay.on('connect', socket => {
+ console.log('relay connected')
+ socket.on('params', data => {
+ client.emit('params', data)
+ })
+
+ ss(relay).on('frame', (stream, data) => {
+ console.log(data)
+ Object.values(client.connected).map(socket => {
+ ss(socket).emit('frame', stream, data)
+ })
+ })
+
+ socket.on('disconnect', () => {
+ console.log("relay disconnected")
+ })
+})
+
+server.listen(process.env.EXPRESS_PORT) \ No newline at end of file