diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/bridge/index.js | 71 | ||||
| -rw-r--r-- | lib/db/bookshelf.js | 4 | ||||
| -rw-r--r-- | lib/db/crud.js | 2 | ||||
| -rw-r--r-- | lib/db/model.js | 24 | ||||
| -rw-r--r-- | lib/db/models.js | 22 | ||||
| -rw-r--r-- | lib/server/index.js | 7 | ||||
| -rw-r--r-- | lib/tools/index.js | 14 | ||||
| -rw-r--r-- | lib/worker/index.js | 21 | ||||
| -rw-r--r-- | lib/worker/processFiles.js | 33 | ||||
| -rw-r--r-- | lib/worker/processTasks.js (renamed from lib/worker/processTask.js) | 73 |
10 files changed, 160 insertions, 111 deletions
diff --git a/lib/bridge/index.js b/lib/bridge/index.js index 353ed46..a1bac0a 100644 --- a/lib/bridge/index.js +++ b/lib/bridge/index.js @@ -1,9 +1,10 @@ import { execFile, spawn } from 'child_process' +const ipc = require('node-ipc') class Bridge { constructor() { this.cpus = [] - this.getDevices() + // this.getDevices() } getDevices() { this.run(['python/devices.py']).then( (stdout, stderr) => { @@ -32,43 +33,47 @@ class Bridge { monitor(args) { return new Monitor(args) } - process(file) { + processFiles() { + console.log('>> sending process files') ipc.server.sockets.forEach( (socket) => { - console.log('>> sending process') - ipc.server.emit(socket, 'process', true) + ipc.server.emit(socket, 'processFiles', true) }) } + processTasks() { + console.log('>> sending process tasks') + ipc.server.sockets.forEach( (socket) => { + ipc.server.emit(socket, 'processTasks', true) + }) + } + serve() { + ipc.config.id = 'cortex' + ipc.config.retry = 1500 + ipc.serve( () => { + ipc.server.on('connect', (socket) => { + console.log('>>> worker connected') + ipc.server.emit(socket, 'message', true) + bridge.broadcast('worker', {connected: true}) + }) + ipc.server.on('message', (data, socket) => { + ipc.log('got a message : '.debug, data) + // ipc.server.emit(socket, 'message', 'hello world!') + }) + ipc.server.on('updateFile', (data, socket) => { + console.log('updateFile') + bridge.broadcast('updateFile', data) + }) + ipc.server.on('updateTask', (data, socket) => { + console.log('updateTask') + bridge.broadcast('updateTask', data) + }) + ipc.server.on( 'socket.disconnected', (socket, destroyedSocketID) => { + ipc.log('client ' + destroyedSocketID + ' has disconnected!') + }) + }) + ipc.server.start() + } } const bridge = new Bridge export default bridge - -var ipc = require('node-ipc') - -ipc.config.id = 'cortex' -ipc.config.retry = 1500; - -ipc.serve( () => { - ipc.server.on('connect', (socket) => { - console.log('>>> worker connected') - ipc.server.emit(socket, 'message', true) - bridge.broadcast('worker', {connected: true}) - }) - ipc.server.on('message', (data, socket) => { - ipc.log('got a message : '.debug, data); -// ipc.server.emit( -// socket, -// 'message', -// data+' world!' -// ) - }) - ipc.server.on('processed', (data, socket) => { - console.log('processed job') - bridge.broadcast('processed', data) - }) - ipc.server.on( 'socket.disconnected', (socket, destroyedSocketID) => { - ipc.log('client ' + destroyedSocketID + ' has disconnected!'); - }) -}) -ipc.server.start() diff --git a/lib/db/bookshelf.js b/lib/db/bookshelf.js index 69157cc..27d9dbb 100644 --- a/lib/db/bookshelf.js +++ b/lib/db/bookshelf.js @@ -1,4 +1,4 @@ -var knex = require('knex')({ +const knex = require('knex')({ client: 'mysql2', connection: { host : process.env.DB_HOST, @@ -16,7 +16,7 @@ var knex = require('knex')({ } }) -var bookshelf = require('bookshelf')(knex) +const bookshelf = require('bookshelf')(knex) module.exports = { bookshelf: bookshelf, diff --git a/lib/db/crud.js b/lib/db/crud.js index fb32690..c0e0e2c 100644 --- a/lib/db/crud.js +++ b/lib/db/crud.js @@ -34,7 +34,7 @@ module.exports = function(model) { return new model({'id': id}).save(data) }, destroy: (id) => { - return new model({'id': id}).destroy(data) + return new model({'id': id}).destroy() }, } }
\ No newline at end of file diff --git a/lib/db/model.js b/lib/db/model.js index 1f729f6..fae7691 100644 --- a/lib/db/model.js +++ b/lib/db/model.js @@ -27,14 +27,14 @@ module.exports = function modelScope(type, db_model, _props) { let recs = data.toJSON() const loader = new Loader () loader.onReady( () => { - console.log(type, 'ready') + // console.log(type, 'ready') resolve(recs) }) - console.log('hasOne') + // console.log('hasOne') loader.register('hasOne') Object.keys(props.hasOne).forEach( (key,i) => { loader.register(key) - console.log('register', key) + // console.log('register', key) const type = props.hasOne[key] const id_lookup = {} recs.forEach(r => { @@ -50,7 +50,7 @@ module.exports = function modelScope(type, db_model, _props) { sub_recs.toJSON().forEach(rec => { id_lookup[rec.id].forEach( parent_rec => parent_rec[short_key] = rec ) }) - console.log('ready', key) + // console.log('ready', key) loader.ready(key) }) }) @@ -94,10 +94,10 @@ module.exports = function modelScope(type, db_model, _props) { crud.index(query).then( (recs) => { if (recs && recs.length) { const rec = recs.at(0) - console.log('found rec', data.name) + // console.log('found rec', data.name) return resolve(rec) } - console.log('creating rec', data.name) + // console.log('creating rec', data.name) model.create(data).then( (rec) => { resolve(rec) }) @@ -118,7 +118,7 @@ module.exports = function modelScope(type, db_model, _props) { }, update: (id, data) => { - console.log('update', id) + // console.log('update', id) return new Promise( (resolve, reject) => { crud.update(id, model.sanitize(data)).then( (data) => { resolve(data.toJSON()) @@ -132,20 +132,22 @@ module.exports = function modelScope(type, db_model, _props) { destroy: (id) => { return new Promise( (resolve, reject) => { crud.destroy(id).then( (data) => { - res.json(data.toJSON()) + resolve(data.toJSON()) })// .catch( () => res.sendStatus(500) ) }) }, sanitize: (data) => { var valid = {} - Object.keys(data).forEach(key => { + props.fields.forEach(key => { if (props.hasOne[key]) { return } - valid[key] = data[key] + if (key in data) { + valid[key] = data[key] + } }) - console.log(valid) + // console.log(valid) return valid }, diff --git a/lib/db/models.js b/lib/db/models.js index 588ce58..2108148 100644 --- a/lib/db/models.js +++ b/lib/db/models.js @@ -1,10 +1,8 @@ let fs = require('fs') let model = require('./model') - -let connection = require("./bookshelf") -let bookshelf = connection.bookshelf -let knex = connection.knex +let bookshelf = require("./bookshelf").bookshelf +import bridge from '../bridge' let Folder = bookshelf.Model.extend({ tableName: 'folders', @@ -25,15 +23,27 @@ let Task = bookshelf.Model.extend({ module.exports = { folder: model('folder', Folder, { + fields: "name username description".split(" "), afterCreate: (folder) => { fs.mkdir('data/' + folder.get('id') + '/', function(){ console.log('created folder', folder.get('id'), folder.get('name')) }) } }), - file: model('file', File), - job: model('job', Job), + file: model('file', File, { + fields: "folder_id username name mime type duration analysis size processed generated".split(" "), + afterCreate: (file) => { + bridge.processFiles() + } + }), + job: model('job', Job, { + fields: "name username completed tool".split(" "), + }), task: model('task', Task, { + fields: "job_id username completed processing tool content_file_id style_file_id output_file_id alpha iterations stdout stderr".split(" "), + afterCreate: (task) => { + bridge.processTasks() + }, hasOne: { content_file: File, style_file: File, diff --git a/lib/server/index.js b/lib/server/index.js index f8cea0c..5c70cba 100644 --- a/lib/server/index.js +++ b/lib/server/index.js @@ -31,6 +31,8 @@ site.init = function(){ io = socketIo(server) + bridge.serve() + bridge.getDevices() bridge.connectSocketIo(io) const api_folders = api(app, 'folder') @@ -51,7 +53,6 @@ site.init = function(){ if (Object.keys(data).some( el => !! el )) { res.json( Object.keys(data).map(k=>data[k]).sort((a,b) => { b.id - a.id }) ) } - bridge.process() }) loader.register('upload') @@ -62,7 +63,7 @@ site.init = function(){ data[fn] = false loader.register(fn) fs.rename(file.path, 'data/' + req.params.id + '/' + fn, function(err){ - api_files.crud.create({ + api_files.create({ // table.string('username') 'folder_id': req.params.id, 'name': fn, @@ -70,7 +71,7 @@ site.init = function(){ 'generated': false, 'processed': false, }).then( (file) => { - data[fn] = file.toJSON() + data[fn] = file loader.ready(fn) }).catch( (err) => { console.warn(err) diff --git a/lib/tools/index.js b/lib/tools/index.js index 8415e79..d2a797d 100644 --- a/lib/tools/index.js +++ b/lib/tools/index.js @@ -1,7 +1,15 @@ module.exports = { + sleep: { + cmd: process.env.PYTHON_BINARY, + script: 'python/sleep.py', + ary: 0, + output: false, + }, nsatf: { cmd: process.env.PYTHON_BINARY, - script: 'nsatf.py', - } -}
\ No newline at end of file + script: 'python/nsatf.py', + ary: 2, + output: true, + }, +} diff --git a/lib/worker/index.js b/lib/worker/index.js index 6733dd1..0e2ff44 100644 --- a/lib/worker/index.js +++ b/lib/worker/index.js @@ -3,36 +3,35 @@ require('dotenv').config() const ipc = require('node-ipc') const processFiles = require('./processFiles') -const processTask = require('./processTask') +const processTasks = require('./processTasks') ipc.config.id = 'cortexworker' ipc.config.retry = 1500 ipc.config.silent = true processFiles() -processTask() +processTasks() ipc.connectTo('cortex', () => { ipc.of.cortex.on('connect', () => { ipc.log('## connected to cortex ##', ipc.config.delay); - // ipc.of.cortex.emit('message', 'hello') }) ipc.of.cortex.on('disconnect', () => { ipc.log('disconnected'.notice) }) ipc.of.cortex.on('message', (data) => { - ipc.log('received message!'.debug, data) + ipc.log('received message'.debug, data) }) - ipc.of.cortex.on('process', (data) => { - ipc.log('received process: '.debug, data); + ipc.of.cortex.on('processFiles', (data) => { + ipc.log('>>> received processFiles'.debug, data); processFiles() }) ipc.of.cortex.on('job', (data) => { - ipc.log('received job: '.debug, data); - processTask() + ipc.log('received job'.debug, data); + processTasks() }) - ipc.of.cortex.on('task', (data) => { - ipc.log('received task: '.debug, data); - processTask() + ipc.of.cortex.on('processTasks', (data) => { + ipc.log('>>> received processTasks: '.debug, data); + processTasks() }) }) diff --git a/lib/worker/processFiles.js b/lib/worker/processFiles.js index c9b2724..2b89aea 100644 --- a/lib/worker/processFiles.js +++ b/lib/worker/processFiles.js @@ -14,18 +14,17 @@ module.exports = function processFiles() { if (processing) return processing = true - dbFile.crud.index({ processed: false }).then( (files) => { + dbFile.index({ processed: false, limit: 1 }).then( (files) => { console.log(files.length + ' files left to process') - if (files.length === 0) { + if (! files || files.length === 0) { processing = false return } - file = files.at(0) - console.log('processing', file.get('name')) - const mimeType = mimeFromExtension(file.get('name')) - file.set('mime', mimeType.mime) - file.set('type', mimeType.type) + let file = files.at ? files.at(0) : files[0] + const mimeType = mimeFromExtension(file.name) + file.mime = mimeType.mime + file.type = mimeType.type switch (mimeType.type) { case 'audio': return processAudio(file) @@ -42,9 +41,9 @@ module.exports = function processFiles() { } function processAudio(file) { - const filePath = path.join(__dirname, '../..', 'public/data', String(file.get('folder_id'))) + const filePath = path.join(__dirname, '../..', 'public/data', String(file.folder_id)) // console.log(filePath) - const fullPath = path.join(filePath, file.get('name')) + const fullPath = path.join(filePath, file.name) console.log(fullPath) return new Promise ( (resolve, reject) => { @@ -83,8 +82,14 @@ function cacheFfprobe(file, fullPath) { return new Promise ( (resolve, reject) => { console.log('ffprobe') ffprobe(fullPath, function(err, probeData) { - file.set('duration', probeData.format.duration) - file.set('analysis', JSON.stringify(probeData)) + if (err) { + console.error('\n------------------------- !!!\n\n') + console.error(err) + } + if (probeData) { + file.duration = probeData.format.duration + } + file.analysis = JSON.stringify(probeData) resolve() }) }) @@ -132,7 +137,7 @@ function processImage(file) { } function processDone(file) { - file.set({ processed: true }) - ipc.of.cortex && ipc.of.cortex.emit("processed", { file: file }) - return file.save() + file.processed = true + ipc.of.cortex && ipc.of.cortex.emit("updateFile", { file: file }) + return dbFile.update(file.id, file) }
\ No newline at end of file diff --git a/lib/worker/processTask.js b/lib/worker/processTasks.js index 8afeb6d..c60aaf5 100644 --- a/lib/worker/processTask.js +++ b/lib/worker/processTasks.js @@ -5,30 +5,38 @@ const fs = require('fs') const path = require('path') const execFile = require('child_process').execFile const Loader = require('../vendor/Loader') +const processFiles = require('./processFiles') let processing = false -module.exports = function processTask() { +module.exports = function processTasks() { if (processing) return processing = true + console.log('fetching tasks...') db.models.task.index({ completed: false, limit: 1 }).then( (tasks) => { // console.log(tasks.length + ' tasks left to process') - console.log('fetching tasks...') if (! tasks || tasks.length === 0) { console.log('> completed all tasks!') processing = false return } - task = tasks.at ? tasks.at(0) : tasks[0] - console.log('running task #', task ? task.id : '??') - return processTaskPromise(task) + let task = tasks.at ? tasks.at(0) : tasks[0] + if (task) { + console.log('running task #', task.id) + return processTaskPromise(task) + } + else { + console.log('no more tasks') + return new Promise( resolve => resolve() ) + } }).then( () => { console.log('done') if (processing) { processing = false - setTimeout( processTask ) + setTimeout( processFiles ) + setTimeout( processTasks ) } }) } @@ -42,7 +50,7 @@ function processTaskPromise(task) { }) } -function initTask() { +function initTask(task) { return new Promise ( (resolve, reject) => { if (task.content_file.type !== 'audio') reject() if (task.style_file.type !== 'audio') reject() @@ -50,16 +58,21 @@ function initTask() { constructFilePath(task.content_file) constructFilePath(task.style_file) - db.models.folder.findOrCreate({ name: 'output' }) - .then( (folder) => { - task.output_file = { - type: 'audio', - name: (folder.name || 'output') + '_' + Date.now() + '.aiff', - folder_id: folder.id, - processed: false, - } - constructFilePath(task.output_file) - resolve() + task.processing = true + db.models.task.update(task.id, task).then( () => { + ipc.of.cortex && ipc.of.cortex.emit("updateTask", { task }) + db.models.folder.findOrCreate({ name: 'output' }) + .then( (folder) => { + task.output_file = { + type: 'audio', + name: (folder.name || 'output') + '_' + Date.now() + '.aiff', + folder_id: folder.id, + processed: false, + generated: true, + } + constructFilePath(task.output_file) + resolve() + }) }) }) } @@ -73,21 +86,26 @@ function checkAccess(file) { }) } -function runTask(fullPath) { +function runTask(task) { return new Promise ( (resolve, reject) => { console.log('running task') - console.log('create output file record...') - console.log(task.output_file.path) - const tool = tools[ task.tool ] execFile(tool.cmd, [ - tool.script_path, - task.content_file.path, - task.style_file.path, + path.join(__dirname, '../..', tool.script), + task.content_file.aiffPath, + task.style_file.aiffPath, task.output_file.path, ], (err, stdout, stderr) => { - console.log(stdout, stderr) + console.log("DONE") + console.log(err) + console.log(stdout) + console.log(stderr) + console.log('create output file record...') + console.log(task.output_file.path) + task.stdout = stdout + task.stderr = stderr + db.models.file.create(task.output_file).then( file => { task.output_file = file task.output_file_id = file.id @@ -99,8 +117,9 @@ function runTask(fullPath) { function processDone(task) { task.completed = true + task.processing = false console.log(task) - ipc.of.cortex && ipc.of.cortex.emit("completed", { task: task }) + ipc.of.cortex && ipc.of.cortex.emit("updateTask", { task }) return db.models.task.update(task.id, task) } @@ -108,7 +127,7 @@ function constructFilePath (file) { file.path = path.join(__dirname, '../..', 'public/data', String(file.folder_id), String(file.name)) switch (file.type) { case 'audio': - file.aiffPath = file.mime === 'audio/aiff' ? file.path : file.path + '.aiff' + file.aiffPath = file.path + '.aiff' break } } |
