summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/bridge/index.js47
-rw-r--r--lib/bridge/monitor.js19
-rw-r--r--lib/db/index.js2
-rw-r--r--lib/server/index.js63
-rw-r--r--lib/vendor/Loader.js104
-rw-r--r--lib/worker/index.js32
-rw-r--r--lib/worker/mimeFromExtension.js44
-rw-r--r--lib/worker/processFiles.js110
8 files changed, 390 insertions, 31 deletions
diff --git a/lib/bridge/index.js b/lib/bridge/index.js
index e0ab441..096c3b9 100644
--- a/lib/bridge/index.js
+++ b/lib/bridge/index.js
@@ -1,16 +1,16 @@
-import { execFile } from 'child_process'
+import { execFile, spawn } from 'child_process'
export default class Bridge {
constructor() {
this.cpus = []
- this.getCPUs()
+ this.getDevices()
}
- getCPUs() {
+ getDevices() {
this.run(['python/devices.py']).then( (stdout, stderr) => {
- this.cpus = JSON.parse(stdout)
- console.log(this.cpus)
+ this.devices = JSON.parse(stdout)
+ console.log(this.devices)
}).catch( (err) => {
- console.error('error fetching cpus:', err)
+ console.error('error fetching devices:', err)
})
}
run(args) {
@@ -22,4 +22,37 @@ export default class Bridge {
})
})
}
-} \ No newline at end of file
+ monitor(args) {
+ return new Monitor(args)
+ }
+ process(file) {
+ ipc.server.sockets.forEach( (socket) => {
+ console.log('>> sending process')
+ ipc.server.emit(socket, 'process', true)
+ })
+ }
+}
+
+var ipc = require('node-ipc')
+
+ipc.config.id = 'cortex'
+ipc.config.retry = 1500;
+
+ipc.serve( () => {
+ ipc.server.on('connect', (socket) => {
+ console.log('>>> worker connected')
+ ipc.server.emit(socket, 'message', true)
+ })
+// ipc.server.on('message', (data, socket) => {
+// ipc.log('got a message : '.debug, data);
+// ipc.server.emit(
+// socket,
+// 'message',
+// data+' world!'
+// )
+// })
+// ipc.server.on( 'socket.disconnected', (socket, destroyedSocketID) => {
+// ipc.log('client ' + destroyedSocketID + ' has disconnected!');
+// })
+})
+ipc.server.start()
diff --git a/lib/bridge/monitor.js b/lib/bridge/monitor.js
new file mode 100644
index 0000000..d8d29ef
--- /dev/null
+++ b/lib/bridge/monitor.js
@@ -0,0 +1,19 @@
+class Monitor {
+ constructor(args) {
+ const cmd = spawn(process.env.PYTHON_BINARY, args);
+
+ let stdout = '', stderr = ''
+
+ cmd.stdout.on('data', (data) => {
+ this.stdout += data
+ })
+
+ cmd.stderr.on('data', (data) => {
+ this.stderr += data
+ })
+
+ cmd.on('exit', function (code) {
+ exit && exit( data.toString() )
+ })
+ }
+}
diff --git a/lib/db/index.js b/lib/db/index.js
index a74ba89..2e63f74 100644
--- a/lib/db/index.js
+++ b/lib/db/index.js
@@ -21,7 +21,7 @@ var Task = db.Task = bookshelf.Model.extend({
hasTimestamps: true,
})
-db.crud = function(type, model) {
+db.crud = function(model) {
return {
index: (q) => {
return model.query( (qb) => {
diff --git a/lib/server/index.js b/lib/server/index.js
index d062131..c32dd07 100644
--- a/lib/server/index.js
+++ b/lib/server/index.js
@@ -1,21 +1,22 @@
-var fs = require('fs')
-var app, express = require('express')
-var http = require('http')
-var bodyParser = require('body-parser')
-var path = require('path')
+const fs = require('fs')
+const express = require('express')
+const http = require('http')
+const bodyParser = require('body-parser')
+const path = require('path')
-var multer = require('multer')
-var upload = multer({ dest: 'uploads/' })
+const multer = require('multer')
+const upload = multer({ dest: 'uploads/' })
+const Loader = require('../vendor/Loader')
-var app, server
+let app, server
const db = require('../db')
-var site = module.exports = {}
+const site = module.exports = {}
site.init = function(bridge){
app = express()
- app.use(express.static(path.join(__dirname, './public')))
+ app.use(express.static(path.join(__dirname, '../../public')))
app.use(bodyParser.json())
app.use(bodyParser.urlencoded({ extended: false }))
@@ -27,22 +28,39 @@ site.init = function(bridge){
const api_folders = crud(app, 'folder', db.Folder, {
afterCreate: (folder) => {
- fs.mkdir('public/data/' + folder.id + '/', function(){
- console.log('created folder', folder.id, folder.name)
+ fs.mkdir('public/data/' + folder.get('id') + '/', function(){
+ console.log('created folder', folder.get('id'), folder.get('name'))
})
}
})
const api_files = crud(app, 'file', db.File)
const api_jobs = crud(app, 'job', db.Job)
const api_tasks = crud(app, 'task', db.Task)
+
+ app.get('/devices', (req, res) => {
+ res.json( bridge.devices )
+ })
- app.post('/folders/:id', upload.array('file'), function(req, res){
+ app.post('/folders/:id', upload.array('file'), (req, res) => {
if ( ! req.files ) return;
- let loaded = {};
- ( req.files || [] ).forEach( (file) => {
- loaded[file.filename] = false
+ let data = {}
+
+ let loader = new Loader()
+ loader.onReady( () => {
+ if (Object.keys(data).some( el => !! el )) {
+ res.json( Object.keys(data).map(k=>data[k]).sort((a,b) => { b.id - a.id }) )
+ }
+ bridge.process()
+ })
+
+ loader.register('upload')
+
+ const files = req.files || []
+ files.forEach( (file) => {
const fn = file.originalname
- fs.rename(file.path, 'public/data/' + req.params.id + '/' + fn, function(err){
+ data[fn] = false
+ loader.register(fn)
+ fs.rename(file.path, 'data/' + req.params.id + '/' + fn, function(err){
api_files.create({
// table.string('username')
'folder_id': req.params.id,
@@ -51,16 +69,15 @@ site.init = function(bridge){
'generated': false,
'processed': false,
}).then( (file) => {
- loaded[file.filename] = file.toJSON()
- if (Object.keys(loaded).some( el => !! el )) {
- res.json( Object.keys(loaded).map(k=>loaded[k]).sort((a,b) => { b.id - a.id }) )
- }
+ data[fn] = file.toJSON()
+ loader.ready(fn)
}).catch( (err) => {
console.warn(err)
res.sendStatus(500)
})
})
})
+ loader.ready('upload')
})
function crud(app, type_s, model, callbacks){
@@ -68,7 +85,7 @@ site.init = function(bridge){
const type = '/' + type_s + 's/'
const type_id = type + ':id'
- const crud = db.crud(type_s, model)
+ const crud = db.crud(model)
// index
app.get(type, (req, res) => {
@@ -91,7 +108,7 @@ site.init = function(bridge){
console.log('create', type)
crud.create(req.body).then( (data) => {
res.json(data.toJSON())
- callbacks.afterCreate && callbacks.afterCreate(data.toJSON())
+ callbacks.afterCreate && callbacks.afterCreate(data)
})// .catch( () => res.sendStatus(500) )
})
diff --git a/lib/vendor/Loader.js b/lib/vendor/Loader.js
new file mode 100644
index 0000000..2668267
--- /dev/null
+++ b/lib/vendor/Loader.js
@@ -0,0 +1,104 @@
+module.exports = (function(){
+ function Loader (readyCallback, view){
+ this.assets = {};
+ this.images = [];
+ this.readyCallback = readyCallback || function(){};
+ this.count = 0
+ this.view = view
+ this.loaded = false
+ }
+
+ // Set the callback when the loader is ready
+ Loader.prototype.onReady = function(readyCallback){
+ this.readyCallback = readyCallback || function(){};
+ }
+
+ // Register an asset as loading
+ Loader.prototype.register = function(s){
+ this.assets[s] = false;
+ this.count += 1
+ }
+
+ // Signal that an asset has loaded
+ Loader.prototype.ready = function(s){
+ // window.debug && console.log("ready >> " + s);
+
+ this.assets[s] = true;
+ if (this.loaded) return;
+
+ this.view && this.view.update( this.percentRemaining() )
+
+ if (! this.isReady()) return;
+
+ this.loaded = true;
+ if (this.view) {
+ this.view && this.view.finish(this.readyCallback)
+ }
+ else {
+ this.readyCallback && this.readyCallback();
+ }
+ }
+
+ // (boolean) Is the loader ready?
+ Loader.prototype.isReady = function(){
+ for (var s in this.assets) {
+ if (this.assets.hasOwnProperty(s) && this.assets[s] != true) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // (float) Percentage of assets remaining
+ Loader.prototype.percentRemaining = function(){
+ return this.remainingAssets() / this.count
+ }
+
+ // (int) Number of assets remaining
+ Loader.prototype.remainingAssets = function(){
+ var n = 0;
+ for (var s in this.assets) {
+ if (this.assets.hasOwnProperty(s) && this.assets[s] != true) {
+ n++;
+ // console.log('remaining: ' + s);
+ }
+ }
+ return n;
+ }
+
+ // Preload the images in config.images
+ Loader.prototype.preloadImages = function(images){
+ this.register("preload");
+ for (var i = 0; i < images.length; i++) {
+ this.preloadImage(images[i]);
+ }
+ this.ready("preload");
+ }
+ Loader.prototype.preloadImage = function(src, register, cb){
+ if (! src || src == "none") return;
+ var _this = this;
+ if (! cb && typeof register === "function") {
+ cb = register
+ register = null
+ }
+ if (register) {
+ this.register(src);
+ }
+ var img = new Image(), loaded = false;
+ img.onload = function(){
+ if (loaded) return
+ loaded = true
+ if (cb) {
+ cb(img);
+ }
+ if (register) {
+ _this.ready(src);
+ }
+ }
+ img.src = src;
+ if (img.complete) img.onload();
+ _this.images.push(img);
+ }
+
+ return Loader;
+})();
diff --git a/lib/worker/index.js b/lib/worker/index.js
new file mode 100644
index 0000000..604756b
--- /dev/null
+++ b/lib/worker/index.js
@@ -0,0 +1,32 @@
+require('dotenv').config()
+
+const ipc = require('node-ipc')
+const db = require('../db')
+const dbFile = db.crud(db.File)
+
+const processFiles = require('./processFiles')
+
+ipc.config.id = 'cortexworker'
+ipc.config.retry = 1500
+
+processFiles()
+
+ipc.connectTo('cortex', () => {
+ ipc.of.cortex.on('connect', () => {
+ ipc.log('## connected to cortex ##', ipc.config.delay);
+ // ipc.of.cortex.emit('message', 'hello')
+ })
+ ipc.of.cortex.on('disconnect', () => {
+ ipc.log('disconnected'.notice)
+ })
+ ipc.of.cortex.on('message', (data) => {
+ ipc.log('received message!'.debug, data)
+ })
+ ipc.of.cortex.on('process', (data) => {
+ ipc.log('received process: '.debug, data);
+ processFiles()
+ })
+ ipc.of.cortex.on('job', (data) => {
+ ipc.log('received job: '.debug, data);
+ })
+})
diff --git a/lib/worker/mimeFromExtension.js b/lib/worker/mimeFromExtension.js
new file mode 100644
index 0000000..1619491
--- /dev/null
+++ b/lib/worker/mimeFromExtension.js
@@ -0,0 +1,44 @@
+module.exports = function mimeFromExtension (file) {
+ const fpartz = file.split('.')
+ const ext = fpartz[fpartz.length-1]
+ let mime, type;
+ switch (ext.toLowerCase()) {
+ case 'mp3':
+ mime = 'audio/mp3'
+ type = 'audio'
+ break
+ case 'wav':
+ mime = 'audio/wav'
+ type = 'audio'
+ break
+ case 'aif':
+ case 'aiff':
+ mime = 'audio/aiff'
+ type = 'audio'
+ break
+
+ case 'jpg':
+ case 'jpeg':
+ mime = 'image/jpeg'
+ type = 'image'
+ break
+ case 'gif':
+ mime = 'image/gif'
+ type = 'image'
+ break
+ case 'png':
+ mime = 'image/png'
+ type = 'image'
+ break
+
+ case 'txt':
+ mime = 'text/plain'
+ type = 'text'
+ break
+ case 'html':
+ mime = 'text/html'
+ type = 'text'
+ break
+ }
+ return { type, mime }
+} \ No newline at end of file
diff --git a/lib/worker/processFiles.js b/lib/worker/processFiles.js
new file mode 100644
index 0000000..fd97928
--- /dev/null
+++ b/lib/worker/processFiles.js
@@ -0,0 +1,110 @@
+const db = require('../db')
+const dbFile = db.crud(db.File)
+const path = require('path')
+const ffprobe = require('node-ffprobe')
+const execFile = require('child_process').execFile
+const mimeFromExtension = require('./mimeFromExtension')
+ffprobe.FFPROBE_PATH = process.env.FFPROBE_BINARY
+
+let processing = false
+
+module.exports = function processFiles() {
+ if (processing) return
+ processing = true
+
+ dbFile.index({ processed: false }).then( (files) => {
+ console.log(files.length + ' files left to process')
+ if (files.length === 0) {
+ processing = false
+ return
+ }
+
+ file = files.at(0)
+ console.log('processing', file.get('name'))
+ const mimeType = mimeFromExtension(file.get('name'))
+ file.set('mime', mimeType.mime)
+ file.set('type', mimeType.type)
+ switch (mimeType.type) {
+ case 'audio':
+ return processAudio(file)
+ default:
+ return processDone(file)
+ }
+ }).then( () => {
+ console.log('done')
+ if (processing) {
+ processing = false
+ setTimeout( processFiles )
+ }
+ })
+}
+
+function processAudio(file) {
+ const filePath = path.join(__dirname, '../..', 'public/data', String(file.get('folder_id')))
+ // console.log(filePath)
+ const fullPath = path.join(filePath, file.get('name'))
+ console.log(fullPath)
+
+ return new Promise ( (resolve, reject) => {
+ cacheFfprobe(file, fullPath)
+ .then(() => { return processSpectrum(fullPath) })
+ .then(() => { return trimSpectrum(fullPath) })
+ .then(() => { return convertToMp3(fullPath) })
+ .then(() => { return processDone(file) })
+ .then(() => { return resolve() })
+ })
+}
+
+function processSpectrum(fullPath) {
+ return new Promise ( (resolve, reject) => {
+ console.log('process spectrum')
+ execFile(process.env.PYTHON_BINARY, ['./python/spectrum.py', fullPath], (err, stdout, stderr) => {
+ // console.log('spectrum done')
+ resolve()
+ })
+ })
+}
+function trimSpectrum(fullPath) {
+ return new Promise ( (resolve, reject) => {
+ console.log('trim spectrum')
+ const thumbPath = fullPath + '.png'
+ execFile(process.env.CONVERT_BINARY, [thumbPath, '-trim', thumbPath], (err, stdout, stderr) => {
+ // console.log('trim done')
+ resolve()
+ })
+ })
+}
+function cacheFfprobe(file, fullPath) {
+ return new Promise ( (resolve, reject) => {
+ console.log('ffprobe')
+ ffprobe(fullPath, function(err, probeData) {
+ // console.log(probeData)
+ file.set('duration', probeData.format.duration)
+ resolve()
+ })
+ })
+}
+function convertToMp3(fullPath) {
+ return new Promise ( (resolve, reject) => {
+ console.log('convert to mp3')
+ if (fullPath.match(/\.mp3$/i)) {
+ return resolve()
+ }
+ const mp3Path = fullPath + '.mp3'
+ execFile(process.env.FFMPEG_BINARY, ['-y', '-i', fullPath, '-q:a', '6', mp3Path], (err, stdout, stderr) => {
+ // console.log(stdout, stderr)
+ resolve()
+ })
+ })
+}
+
+function processImage(file) {
+ return new Promise ( (resolve, reject) => {
+ // process image?
+ resolve()
+ })
+}
+
+function processDone(file) {
+ return file.save({ processed: true })
+} \ No newline at end of file