diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/bridge/index.js | 47 | ||||
| -rw-r--r-- | lib/bridge/monitor.js | 19 | ||||
| -rw-r--r-- | lib/db/index.js | 2 | ||||
| -rw-r--r-- | lib/server/index.js | 63 | ||||
| -rw-r--r-- | lib/vendor/Loader.js | 104 | ||||
| -rw-r--r-- | lib/worker/index.js | 32 | ||||
| -rw-r--r-- | lib/worker/mimeFromExtension.js | 44 | ||||
| -rw-r--r-- | lib/worker/processFiles.js | 110 |
8 files changed, 390 insertions, 31 deletions
diff --git a/lib/bridge/index.js b/lib/bridge/index.js index e0ab441..096c3b9 100644 --- a/lib/bridge/index.js +++ b/lib/bridge/index.js @@ -1,16 +1,16 @@ -import { execFile } from 'child_process' +import { execFile, spawn } from 'child_process' export default class Bridge { constructor() { this.cpus = [] - this.getCPUs() + this.getDevices() } - getCPUs() { + getDevices() { this.run(['python/devices.py']).then( (stdout, stderr) => { - this.cpus = JSON.parse(stdout) - console.log(this.cpus) + this.devices = JSON.parse(stdout) + console.log(this.devices) }).catch( (err) => { - console.error('error fetching cpus:', err) + console.error('error fetching devices:', err) }) } run(args) { @@ -22,4 +22,37 @@ export default class Bridge { }) }) } -}
\ No newline at end of file + monitor(args) { + return new Monitor(args) + } + process(file) { + ipc.server.sockets.forEach( (socket) => { + console.log('>> sending process') + ipc.server.emit(socket, 'process', true) + }) + } +} + +var ipc = require('node-ipc') + +ipc.config.id = 'cortex' +ipc.config.retry = 1500; + +ipc.serve( () => { + ipc.server.on('connect', (socket) => { + console.log('>>> worker connected') + ipc.server.emit(socket, 'message', true) + }) +// ipc.server.on('message', (data, socket) => { +// ipc.log('got a message : '.debug, data); +// ipc.server.emit( +// socket, +// 'message', +// data+' world!' +// ) +// }) +// ipc.server.on( 'socket.disconnected', (socket, destroyedSocketID) => { +// ipc.log('client ' + destroyedSocketID + ' has disconnected!'); +// }) +}) +ipc.server.start() diff --git a/lib/bridge/monitor.js b/lib/bridge/monitor.js new file mode 100644 index 0000000..d8d29ef --- /dev/null +++ b/lib/bridge/monitor.js @@ -0,0 +1,19 @@ +class Monitor { + constructor(args) { + const cmd = spawn(process.env.PYTHON_BINARY, args); + + let stdout = '', stderr = '' + + cmd.stdout.on('data', (data) => { + this.stdout += data + }) + + cmd.stderr.on('data', (data) => { + this.stderr += data + }) + + cmd.on('exit', function (code) { + exit && exit( data.toString() ) + }) + } +} diff --git a/lib/db/index.js b/lib/db/index.js index a74ba89..2e63f74 100644 --- a/lib/db/index.js +++ b/lib/db/index.js @@ -21,7 +21,7 @@ var Task = db.Task = bookshelf.Model.extend({ hasTimestamps: true, }) -db.crud = function(type, model) { +db.crud = function(model) { return { index: (q) => { return model.query( (qb) => { diff --git a/lib/server/index.js b/lib/server/index.js index d062131..c32dd07 100644 --- a/lib/server/index.js +++ b/lib/server/index.js @@ -1,21 +1,22 @@ -var fs = require('fs') -var app, express = require('express') -var http = require('http') -var bodyParser = require('body-parser') -var path = require('path') +const fs = require('fs') +const express = require('express') +const http = require('http') +const bodyParser = require('body-parser') +const path = require('path') -var multer = require('multer') -var upload = multer({ dest: 'uploads/' }) +const multer = require('multer') +const upload = multer({ dest: 'uploads/' }) +const Loader = require('../vendor/Loader') -var app, server +let app, server const db = require('../db') -var site = module.exports = {} +const site = module.exports = {} site.init = function(bridge){ app = express() - app.use(express.static(path.join(__dirname, './public'))) + app.use(express.static(path.join(__dirname, '../../public'))) app.use(bodyParser.json()) app.use(bodyParser.urlencoded({ extended: false })) @@ -27,22 +28,39 @@ site.init = function(bridge){ const api_folders = crud(app, 'folder', db.Folder, { afterCreate: (folder) => { - fs.mkdir('public/data/' + folder.id + '/', function(){ - console.log('created folder', folder.id, folder.name) + fs.mkdir('public/data/' + folder.get('id') + '/', function(){ + console.log('created folder', folder.get('id'), folder.get('name')) }) } }) const api_files = crud(app, 'file', db.File) const api_jobs = crud(app, 'job', db.Job) const api_tasks = crud(app, 'task', db.Task) + + app.get('/devices', (req, res) => { + res.json( bridge.devices ) + }) - app.post('/folders/:id', upload.array('file'), function(req, res){ + app.post('/folders/:id', upload.array('file'), (req, res) => { if ( ! req.files ) return; - let loaded = {}; - ( req.files || [] ).forEach( (file) => { - loaded[file.filename] = false + let data = {} + + let loader = new Loader() + loader.onReady( () => { + if (Object.keys(data).some( el => !! el )) { + res.json( Object.keys(data).map(k=>data[k]).sort((a,b) => { b.id - a.id }) ) + } + bridge.process() + }) + + loader.register('upload') + + const files = req.files || [] + files.forEach( (file) => { const fn = file.originalname - fs.rename(file.path, 'public/data/' + req.params.id + '/' + fn, function(err){ + data[fn] = false + loader.register(fn) + fs.rename(file.path, 'data/' + req.params.id + '/' + fn, function(err){ api_files.create({ // table.string('username') 'folder_id': req.params.id, @@ -51,16 +69,15 @@ site.init = function(bridge){ 'generated': false, 'processed': false, }).then( (file) => { - loaded[file.filename] = file.toJSON() - if (Object.keys(loaded).some( el => !! el )) { - res.json( Object.keys(loaded).map(k=>loaded[k]).sort((a,b) => { b.id - a.id }) ) - } + data[fn] = file.toJSON() + loader.ready(fn) }).catch( (err) => { console.warn(err) res.sendStatus(500) }) }) }) + loader.ready('upload') }) function crud(app, type_s, model, callbacks){ @@ -68,7 +85,7 @@ site.init = function(bridge){ const type = '/' + type_s + 's/' const type_id = type + ':id' - const crud = db.crud(type_s, model) + const crud = db.crud(model) // index app.get(type, (req, res) => { @@ -91,7 +108,7 @@ site.init = function(bridge){ console.log('create', type) crud.create(req.body).then( (data) => { res.json(data.toJSON()) - callbacks.afterCreate && callbacks.afterCreate(data.toJSON()) + callbacks.afterCreate && callbacks.afterCreate(data) })// .catch( () => res.sendStatus(500) ) }) diff --git a/lib/vendor/Loader.js b/lib/vendor/Loader.js new file mode 100644 index 0000000..2668267 --- /dev/null +++ b/lib/vendor/Loader.js @@ -0,0 +1,104 @@ +module.exports = (function(){ + function Loader (readyCallback, view){ + this.assets = {}; + this.images = []; + this.readyCallback = readyCallback || function(){}; + this.count = 0 + this.view = view + this.loaded = false + } + + // Set the callback when the loader is ready + Loader.prototype.onReady = function(readyCallback){ + this.readyCallback = readyCallback || function(){}; + } + + // Register an asset as loading + Loader.prototype.register = function(s){ + this.assets[s] = false; + this.count += 1 + } + + // Signal that an asset has loaded + Loader.prototype.ready = function(s){ + // window.debug && console.log("ready >> " + s); + + this.assets[s] = true; + if (this.loaded) return; + + this.view && this.view.update( this.percentRemaining() ) + + if (! this.isReady()) return; + + this.loaded = true; + if (this.view) { + this.view && this.view.finish(this.readyCallback) + } + else { + this.readyCallback && this.readyCallback(); + } + } + + // (boolean) Is the loader ready? + Loader.prototype.isReady = function(){ + for (var s in this.assets) { + if (this.assets.hasOwnProperty(s) && this.assets[s] != true) { + return false; + } + } + return true; + } + + // (float) Percentage of assets remaining + Loader.prototype.percentRemaining = function(){ + return this.remainingAssets() / this.count + } + + // (int) Number of assets remaining + Loader.prototype.remainingAssets = function(){ + var n = 0; + for (var s in this.assets) { + if (this.assets.hasOwnProperty(s) && this.assets[s] != true) { + n++; + // console.log('remaining: ' + s); + } + } + return n; + } + + // Preload the images in config.images + Loader.prototype.preloadImages = function(images){ + this.register("preload"); + for (var i = 0; i < images.length; i++) { + this.preloadImage(images[i]); + } + this.ready("preload"); + } + Loader.prototype.preloadImage = function(src, register, cb){ + if (! src || src == "none") return; + var _this = this; + if (! cb && typeof register === "function") { + cb = register + register = null + } + if (register) { + this.register(src); + } + var img = new Image(), loaded = false; + img.onload = function(){ + if (loaded) return + loaded = true + if (cb) { + cb(img); + } + if (register) { + _this.ready(src); + } + } + img.src = src; + if (img.complete) img.onload(); + _this.images.push(img); + } + + return Loader; +})(); diff --git a/lib/worker/index.js b/lib/worker/index.js new file mode 100644 index 0000000..604756b --- /dev/null +++ b/lib/worker/index.js @@ -0,0 +1,32 @@ +require('dotenv').config() + +const ipc = require('node-ipc') +const db = require('../db') +const dbFile = db.crud(db.File) + +const processFiles = require('./processFiles') + +ipc.config.id = 'cortexworker' +ipc.config.retry = 1500 + +processFiles() + +ipc.connectTo('cortex', () => { + ipc.of.cortex.on('connect', () => { + ipc.log('## connected to cortex ##', ipc.config.delay); + // ipc.of.cortex.emit('message', 'hello') + }) + ipc.of.cortex.on('disconnect', () => { + ipc.log('disconnected'.notice) + }) + ipc.of.cortex.on('message', (data) => { + ipc.log('received message!'.debug, data) + }) + ipc.of.cortex.on('process', (data) => { + ipc.log('received process: '.debug, data); + processFiles() + }) + ipc.of.cortex.on('job', (data) => { + ipc.log('received job: '.debug, data); + }) +}) diff --git a/lib/worker/mimeFromExtension.js b/lib/worker/mimeFromExtension.js new file mode 100644 index 0000000..1619491 --- /dev/null +++ b/lib/worker/mimeFromExtension.js @@ -0,0 +1,44 @@ +module.exports = function mimeFromExtension (file) { + const fpartz = file.split('.') + const ext = fpartz[fpartz.length-1] + let mime, type; + switch (ext.toLowerCase()) { + case 'mp3': + mime = 'audio/mp3' + type = 'audio' + break + case 'wav': + mime = 'audio/wav' + type = 'audio' + break + case 'aif': + case 'aiff': + mime = 'audio/aiff' + type = 'audio' + break + + case 'jpg': + case 'jpeg': + mime = 'image/jpeg' + type = 'image' + break + case 'gif': + mime = 'image/gif' + type = 'image' + break + case 'png': + mime = 'image/png' + type = 'image' + break + + case 'txt': + mime = 'text/plain' + type = 'text' + break + case 'html': + mime = 'text/html' + type = 'text' + break + } + return { type, mime } +}
\ No newline at end of file diff --git a/lib/worker/processFiles.js b/lib/worker/processFiles.js new file mode 100644 index 0000000..fd97928 --- /dev/null +++ b/lib/worker/processFiles.js @@ -0,0 +1,110 @@ +const db = require('../db') +const dbFile = db.crud(db.File) +const path = require('path') +const ffprobe = require('node-ffprobe') +const execFile = require('child_process').execFile +const mimeFromExtension = require('./mimeFromExtension') +ffprobe.FFPROBE_PATH = process.env.FFPROBE_BINARY + +let processing = false + +module.exports = function processFiles() { + if (processing) return + processing = true + + dbFile.index({ processed: false }).then( (files) => { + console.log(files.length + ' files left to process') + if (files.length === 0) { + processing = false + return + } + + file = files.at(0) + console.log('processing', file.get('name')) + const mimeType = mimeFromExtension(file.get('name')) + file.set('mime', mimeType.mime) + file.set('type', mimeType.type) + switch (mimeType.type) { + case 'audio': + return processAudio(file) + default: + return processDone(file) + } + }).then( () => { + console.log('done') + if (processing) { + processing = false + setTimeout( processFiles ) + } + }) +} + +function processAudio(file) { + const filePath = path.join(__dirname, '../..', 'public/data', String(file.get('folder_id'))) + // console.log(filePath) + const fullPath = path.join(filePath, file.get('name')) + console.log(fullPath) + + return new Promise ( (resolve, reject) => { + cacheFfprobe(file, fullPath) + .then(() => { return processSpectrum(fullPath) }) + .then(() => { return trimSpectrum(fullPath) }) + .then(() => { return convertToMp3(fullPath) }) + .then(() => { return processDone(file) }) + .then(() => { return resolve() }) + }) +} + +function processSpectrum(fullPath) { + return new Promise ( (resolve, reject) => { + console.log('process spectrum') + execFile(process.env.PYTHON_BINARY, ['./python/spectrum.py', fullPath], (err, stdout, stderr) => { + // console.log('spectrum done') + resolve() + }) + }) +} +function trimSpectrum(fullPath) { + return new Promise ( (resolve, reject) => { + console.log('trim spectrum') + const thumbPath = fullPath + '.png' + execFile(process.env.CONVERT_BINARY, [thumbPath, '-trim', thumbPath], (err, stdout, stderr) => { + // console.log('trim done') + resolve() + }) + }) +} +function cacheFfprobe(file, fullPath) { + return new Promise ( (resolve, reject) => { + console.log('ffprobe') + ffprobe(fullPath, function(err, probeData) { + // console.log(probeData) + file.set('duration', probeData.format.duration) + resolve() + }) + }) +} +function convertToMp3(fullPath) { + return new Promise ( (resolve, reject) => { + console.log('convert to mp3') + if (fullPath.match(/\.mp3$/i)) { + return resolve() + } + const mp3Path = fullPath + '.mp3' + execFile(process.env.FFMPEG_BINARY, ['-y', '-i', fullPath, '-q:a', '6', mp3Path], (err, stdout, stderr) => { + // console.log(stdout, stderr) + resolve() + }) + }) +} + +function processImage(file) { + return new Promise ( (resolve, reject) => { + // process image? + resolve() + }) +} + +function processDone(file) { + return file.save({ processed: true }) +}
\ No newline at end of file |
