summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/bridge/index.js71
-rw-r--r--lib/db/bookshelf.js4
-rw-r--r--lib/db/crud.js2
-rw-r--r--lib/db/model.js24
-rw-r--r--lib/db/models.js22
-rw-r--r--lib/server/index.js7
-rw-r--r--lib/tools/index.js14
-rw-r--r--lib/worker/index.js21
-rw-r--r--lib/worker/processFiles.js33
-rw-r--r--lib/worker/processTasks.js (renamed from lib/worker/processTask.js)73
10 files changed, 160 insertions, 111 deletions
diff --git a/lib/bridge/index.js b/lib/bridge/index.js
index 353ed46..a1bac0a 100644
--- a/lib/bridge/index.js
+++ b/lib/bridge/index.js
@@ -1,9 +1,10 @@
import { execFile, spawn } from 'child_process'
+const ipc = require('node-ipc')
class Bridge {
constructor() {
this.cpus = []
- this.getDevices()
+ // this.getDevices()
}
getDevices() {
this.run(['python/devices.py']).then( (stdout, stderr) => {
@@ -32,43 +33,47 @@ class Bridge {
monitor(args) {
return new Monitor(args)
}
- process(file) {
+ processFiles() {
+ console.log('>> sending process files')
ipc.server.sockets.forEach( (socket) => {
- console.log('>> sending process')
- ipc.server.emit(socket, 'process', true)
+ ipc.server.emit(socket, 'processFiles', true)
})
}
+ processTasks() {
+ console.log('>> sending process tasks')
+ ipc.server.sockets.forEach( (socket) => {
+ ipc.server.emit(socket, 'processTasks', true)
+ })
+ }
+ serve() {
+ ipc.config.id = 'cortex'
+ ipc.config.retry = 1500
+ ipc.serve( () => {
+ ipc.server.on('connect', (socket) => {
+ console.log('>>> worker connected')
+ ipc.server.emit(socket, 'message', true)
+ bridge.broadcast('worker', {connected: true})
+ })
+ ipc.server.on('message', (data, socket) => {
+ ipc.log('got a message : '.debug, data)
+ // ipc.server.emit(socket, 'message', 'hello world!')
+ })
+ ipc.server.on('updateFile', (data, socket) => {
+ console.log('updateFile')
+ bridge.broadcast('updateFile', data)
+ })
+ ipc.server.on('updateTask', (data, socket) => {
+ console.log('updateTask')
+ bridge.broadcast('updateTask', data)
+ })
+ ipc.server.on( 'socket.disconnected', (socket, destroyedSocketID) => {
+ ipc.log('client ' + destroyedSocketID + ' has disconnected!')
+ })
+ })
+ ipc.server.start()
+ }
}
const bridge = new Bridge
export default bridge
-
-var ipc = require('node-ipc')
-
-ipc.config.id = 'cortex'
-ipc.config.retry = 1500;
-
-ipc.serve( () => {
- ipc.server.on('connect', (socket) => {
- console.log('>>> worker connected')
- ipc.server.emit(socket, 'message', true)
- bridge.broadcast('worker', {connected: true})
- })
- ipc.server.on('message', (data, socket) => {
- ipc.log('got a message : '.debug, data);
-// ipc.server.emit(
-// socket,
-// 'message',
-// data+' world!'
-// )
- })
- ipc.server.on('processed', (data, socket) => {
- console.log('processed job')
- bridge.broadcast('processed', data)
- })
- ipc.server.on( 'socket.disconnected', (socket, destroyedSocketID) => {
- ipc.log('client ' + destroyedSocketID + ' has disconnected!');
- })
-})
-ipc.server.start()
diff --git a/lib/db/bookshelf.js b/lib/db/bookshelf.js
index 69157cc..27d9dbb 100644
--- a/lib/db/bookshelf.js
+++ b/lib/db/bookshelf.js
@@ -1,4 +1,4 @@
-var knex = require('knex')({
+const knex = require('knex')({
client: 'mysql2',
connection: {
host : process.env.DB_HOST,
@@ -16,7 +16,7 @@ var knex = require('knex')({
}
})
-var bookshelf = require('bookshelf')(knex)
+const bookshelf = require('bookshelf')(knex)
module.exports = {
bookshelf: bookshelf,
diff --git a/lib/db/crud.js b/lib/db/crud.js
index fb32690..c0e0e2c 100644
--- a/lib/db/crud.js
+++ b/lib/db/crud.js
@@ -34,7 +34,7 @@ module.exports = function(model) {
return new model({'id': id}).save(data)
},
destroy: (id) => {
- return new model({'id': id}).destroy(data)
+ return new model({'id': id}).destroy()
},
}
} \ No newline at end of file
diff --git a/lib/db/model.js b/lib/db/model.js
index 1f729f6..fae7691 100644
--- a/lib/db/model.js
+++ b/lib/db/model.js
@@ -27,14 +27,14 @@ module.exports = function modelScope(type, db_model, _props) {
let recs = data.toJSON()
const loader = new Loader ()
loader.onReady( () => {
- console.log(type, 'ready')
+ // console.log(type, 'ready')
resolve(recs)
})
- console.log('hasOne')
+ // console.log('hasOne')
loader.register('hasOne')
Object.keys(props.hasOne).forEach( (key,i) => {
loader.register(key)
- console.log('register', key)
+ // console.log('register', key)
const type = props.hasOne[key]
const id_lookup = {}
recs.forEach(r => {
@@ -50,7 +50,7 @@ module.exports = function modelScope(type, db_model, _props) {
sub_recs.toJSON().forEach(rec => {
id_lookup[rec.id].forEach( parent_rec => parent_rec[short_key] = rec )
})
- console.log('ready', key)
+ // console.log('ready', key)
loader.ready(key)
})
})
@@ -94,10 +94,10 @@ module.exports = function modelScope(type, db_model, _props) {
crud.index(query).then( (recs) => {
if (recs && recs.length) {
const rec = recs.at(0)
- console.log('found rec', data.name)
+ // console.log('found rec', data.name)
return resolve(rec)
}
- console.log('creating rec', data.name)
+ // console.log('creating rec', data.name)
model.create(data).then( (rec) => {
resolve(rec)
})
@@ -118,7 +118,7 @@ module.exports = function modelScope(type, db_model, _props) {
},
update: (id, data) => {
- console.log('update', id)
+ // console.log('update', id)
return new Promise( (resolve, reject) => {
crud.update(id, model.sanitize(data)).then( (data) => {
resolve(data.toJSON())
@@ -132,20 +132,22 @@ module.exports = function modelScope(type, db_model, _props) {
destroy: (id) => {
return new Promise( (resolve, reject) => {
crud.destroy(id).then( (data) => {
- res.json(data.toJSON())
+ resolve(data.toJSON())
})// .catch( () => res.sendStatus(500) )
})
},
sanitize: (data) => {
var valid = {}
- Object.keys(data).forEach(key => {
+ props.fields.forEach(key => {
if (props.hasOne[key]) {
return
}
- valid[key] = data[key]
+ if (key in data) {
+ valid[key] = data[key]
+ }
})
- console.log(valid)
+ // console.log(valid)
return valid
},
diff --git a/lib/db/models.js b/lib/db/models.js
index 588ce58..2108148 100644
--- a/lib/db/models.js
+++ b/lib/db/models.js
@@ -1,10 +1,8 @@
let fs = require('fs')
let model = require('./model')
-
-let connection = require("./bookshelf")
-let bookshelf = connection.bookshelf
-let knex = connection.knex
+let bookshelf = require("./bookshelf").bookshelf
+import bridge from '../bridge'
let Folder = bookshelf.Model.extend({
tableName: 'folders',
@@ -25,15 +23,27 @@ let Task = bookshelf.Model.extend({
module.exports = {
folder: model('folder', Folder, {
+ fields: "name username description".split(" "),
afterCreate: (folder) => {
fs.mkdir('data/' + folder.get('id') + '/', function(){
console.log('created folder', folder.get('id'), folder.get('name'))
})
}
}),
- file: model('file', File),
- job: model('job', Job),
+ file: model('file', File, {
+ fields: "folder_id username name mime type duration analysis size processed generated".split(" "),
+ afterCreate: (file) => {
+ bridge.processFiles()
+ }
+ }),
+ job: model('job', Job, {
+ fields: "name username completed tool".split(" "),
+ }),
task: model('task', Task, {
+ fields: "job_id username completed processing tool content_file_id style_file_id output_file_id alpha iterations stdout stderr".split(" "),
+ afterCreate: (task) => {
+ bridge.processTasks()
+ },
hasOne: {
content_file: File,
style_file: File,
diff --git a/lib/server/index.js b/lib/server/index.js
index f8cea0c..5c70cba 100644
--- a/lib/server/index.js
+++ b/lib/server/index.js
@@ -31,6 +31,8 @@ site.init = function(){
io = socketIo(server)
+ bridge.serve()
+ bridge.getDevices()
bridge.connectSocketIo(io)
const api_folders = api(app, 'folder')
@@ -51,7 +53,6 @@ site.init = function(){
if (Object.keys(data).some( el => !! el )) {
res.json( Object.keys(data).map(k=>data[k]).sort((a,b) => { b.id - a.id }) )
}
- bridge.process()
})
loader.register('upload')
@@ -62,7 +63,7 @@ site.init = function(){
data[fn] = false
loader.register(fn)
fs.rename(file.path, 'data/' + req.params.id + '/' + fn, function(err){
- api_files.crud.create({
+ api_files.create({
// table.string('username')
'folder_id': req.params.id,
'name': fn,
@@ -70,7 +71,7 @@ site.init = function(){
'generated': false,
'processed': false,
}).then( (file) => {
- data[fn] = file.toJSON()
+ data[fn] = file
loader.ready(fn)
}).catch( (err) => {
console.warn(err)
diff --git a/lib/tools/index.js b/lib/tools/index.js
index 8415e79..d2a797d 100644
--- a/lib/tools/index.js
+++ b/lib/tools/index.js
@@ -1,7 +1,15 @@
module.exports = {
+ sleep: {
+ cmd: process.env.PYTHON_BINARY,
+ script: 'python/sleep.py',
+ ary: 0,
+ output: false,
+ },
nsatf: {
cmd: process.env.PYTHON_BINARY,
- script: 'nsatf.py',
- }
-} \ No newline at end of file
+ script: 'python/nsatf.py',
+ ary: 2,
+ output: true,
+ },
+}
diff --git a/lib/worker/index.js b/lib/worker/index.js
index 6733dd1..0e2ff44 100644
--- a/lib/worker/index.js
+++ b/lib/worker/index.js
@@ -3,36 +3,35 @@ require('dotenv').config()
const ipc = require('node-ipc')
const processFiles = require('./processFiles')
-const processTask = require('./processTask')
+const processTasks = require('./processTasks')
ipc.config.id = 'cortexworker'
ipc.config.retry = 1500
ipc.config.silent = true
processFiles()
-processTask()
+processTasks()
ipc.connectTo('cortex', () => {
ipc.of.cortex.on('connect', () => {
ipc.log('## connected to cortex ##', ipc.config.delay);
- // ipc.of.cortex.emit('message', 'hello')
})
ipc.of.cortex.on('disconnect', () => {
ipc.log('disconnected'.notice)
})
ipc.of.cortex.on('message', (data) => {
- ipc.log('received message!'.debug, data)
+ ipc.log('received message'.debug, data)
})
- ipc.of.cortex.on('process', (data) => {
- ipc.log('received process: '.debug, data);
+ ipc.of.cortex.on('processFiles', (data) => {
+ ipc.log('>>> received processFiles'.debug, data);
processFiles()
})
ipc.of.cortex.on('job', (data) => {
- ipc.log('received job: '.debug, data);
- processTask()
+ ipc.log('received job'.debug, data);
+ processTasks()
})
- ipc.of.cortex.on('task', (data) => {
- ipc.log('received task: '.debug, data);
- processTask()
+ ipc.of.cortex.on('processTasks', (data) => {
+ ipc.log('>>> received processTasks: '.debug, data);
+ processTasks()
})
})
diff --git a/lib/worker/processFiles.js b/lib/worker/processFiles.js
index c9b2724..2b89aea 100644
--- a/lib/worker/processFiles.js
+++ b/lib/worker/processFiles.js
@@ -14,18 +14,17 @@ module.exports = function processFiles() {
if (processing) return
processing = true
- dbFile.crud.index({ processed: false }).then( (files) => {
+ dbFile.index({ processed: false, limit: 1 }).then( (files) => {
console.log(files.length + ' files left to process')
- if (files.length === 0) {
+ if (! files || files.length === 0) {
processing = false
return
}
- file = files.at(0)
- console.log('processing', file.get('name'))
- const mimeType = mimeFromExtension(file.get('name'))
- file.set('mime', mimeType.mime)
- file.set('type', mimeType.type)
+ let file = files.at ? files.at(0) : files[0]
+ const mimeType = mimeFromExtension(file.name)
+ file.mime = mimeType.mime
+ file.type = mimeType.type
switch (mimeType.type) {
case 'audio':
return processAudio(file)
@@ -42,9 +41,9 @@ module.exports = function processFiles() {
}
function processAudio(file) {
- const filePath = path.join(__dirname, '../..', 'public/data', String(file.get('folder_id')))
+ const filePath = path.join(__dirname, '../..', 'public/data', String(file.folder_id))
// console.log(filePath)
- const fullPath = path.join(filePath, file.get('name'))
+ const fullPath = path.join(filePath, file.name)
console.log(fullPath)
return new Promise ( (resolve, reject) => {
@@ -83,8 +82,14 @@ function cacheFfprobe(file, fullPath) {
return new Promise ( (resolve, reject) => {
console.log('ffprobe')
ffprobe(fullPath, function(err, probeData) {
- file.set('duration', probeData.format.duration)
- file.set('analysis', JSON.stringify(probeData))
+ if (err) {
+ console.error('\n------------------------- !!!\n\n')
+ console.error(err)
+ }
+ if (probeData) {
+ file.duration = probeData.format.duration
+ }
+ file.analysis = JSON.stringify(probeData)
resolve()
})
})
@@ -132,7 +137,7 @@ function processImage(file) {
}
function processDone(file) {
- file.set({ processed: true })
- ipc.of.cortex && ipc.of.cortex.emit("processed", { file: file })
- return file.save()
+ file.processed = true
+ ipc.of.cortex && ipc.of.cortex.emit("updateFile", { file: file })
+ return dbFile.update(file.id, file)
} \ No newline at end of file
diff --git a/lib/worker/processTask.js b/lib/worker/processTasks.js
index 8afeb6d..c60aaf5 100644
--- a/lib/worker/processTask.js
+++ b/lib/worker/processTasks.js
@@ -5,30 +5,38 @@ const fs = require('fs')
const path = require('path')
const execFile = require('child_process').execFile
const Loader = require('../vendor/Loader')
+const processFiles = require('./processFiles')
let processing = false
-module.exports = function processTask() {
+module.exports = function processTasks() {
if (processing) return
processing = true
+ console.log('fetching tasks...')
db.models.task.index({ completed: false, limit: 1 }).then( (tasks) => {
// console.log(tasks.length + ' tasks left to process')
- console.log('fetching tasks...')
if (! tasks || tasks.length === 0) {
console.log('> completed all tasks!')
processing = false
return
}
- task = tasks.at ? tasks.at(0) : tasks[0]
- console.log('running task #', task ? task.id : '??')
- return processTaskPromise(task)
+ let task = tasks.at ? tasks.at(0) : tasks[0]
+ if (task) {
+ console.log('running task #', task.id)
+ return processTaskPromise(task)
+ }
+ else {
+ console.log('no more tasks')
+ return new Promise( resolve => resolve() )
+ }
}).then( () => {
console.log('done')
if (processing) {
processing = false
- setTimeout( processTask )
+ setTimeout( processFiles )
+ setTimeout( processTasks )
}
})
}
@@ -42,7 +50,7 @@ function processTaskPromise(task) {
})
}
-function initTask() {
+function initTask(task) {
return new Promise ( (resolve, reject) => {
if (task.content_file.type !== 'audio') reject()
if (task.style_file.type !== 'audio') reject()
@@ -50,16 +58,21 @@ function initTask() {
constructFilePath(task.content_file)
constructFilePath(task.style_file)
- db.models.folder.findOrCreate({ name: 'output' })
- .then( (folder) => {
- task.output_file = {
- type: 'audio',
- name: (folder.name || 'output') + '_' + Date.now() + '.aiff',
- folder_id: folder.id,
- processed: false,
- }
- constructFilePath(task.output_file)
- resolve()
+ task.processing = true
+ db.models.task.update(task.id, task).then( () => {
+ ipc.of.cortex && ipc.of.cortex.emit("updateTask", { task })
+ db.models.folder.findOrCreate({ name: 'output' })
+ .then( (folder) => {
+ task.output_file = {
+ type: 'audio',
+ name: (folder.name || 'output') + '_' + Date.now() + '.aiff',
+ folder_id: folder.id,
+ processed: false,
+ generated: true,
+ }
+ constructFilePath(task.output_file)
+ resolve()
+ })
})
})
}
@@ -73,21 +86,26 @@ function checkAccess(file) {
})
}
-function runTask(fullPath) {
+function runTask(task) {
return new Promise ( (resolve, reject) => {
console.log('running task')
- console.log('create output file record...')
- console.log(task.output_file.path)
-
const tool = tools[ task.tool ]
execFile(tool.cmd, [
- tool.script_path,
- task.content_file.path,
- task.style_file.path,
+ path.join(__dirname, '../..', tool.script),
+ task.content_file.aiffPath,
+ task.style_file.aiffPath,
task.output_file.path,
], (err, stdout, stderr) => {
- console.log(stdout, stderr)
+ console.log("DONE")
+ console.log(err)
+ console.log(stdout)
+ console.log(stderr)
+ console.log('create output file record...')
+ console.log(task.output_file.path)
+ task.stdout = stdout
+ task.stderr = stderr
+
db.models.file.create(task.output_file).then( file => {
task.output_file = file
task.output_file_id = file.id
@@ -99,8 +117,9 @@ function runTask(fullPath) {
function processDone(task) {
task.completed = true
+ task.processing = false
console.log(task)
- ipc.of.cortex && ipc.of.cortex.emit("completed", { task: task })
+ ipc.of.cortex && ipc.of.cortex.emit("updateTask", { task })
return db.models.task.update(task.id, task)
}
@@ -108,7 +127,7 @@ function constructFilePath (file) {
file.path = path.join(__dirname, '../..', 'public/data', String(file.folder_id), String(file.name))
switch (file.type) {
case 'audio':
- file.aiffPath = file.mime === 'audio/aiff' ? file.path : file.path + '.aiff'
+ file.aiffPath = file.path + '.aiff'
break
}
}