diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/db/crud.js | 40 | ||||
| -rw-r--r-- | lib/db/index.js | 79 | ||||
| -rw-r--r-- | lib/db/model.js | 160 | ||||
| -rw-r--r-- | lib/db/models.js | 43 | ||||
| -rw-r--r-- | lib/server/api.js | 48 | ||||
| -rw-r--r-- | lib/server/index.js | 124 | ||||
| -rw-r--r-- | lib/tools/index.js | 7 | ||||
| -rw-r--r-- | lib/worker/index.js | 7 | ||||
| -rw-r--r-- | lib/worker/processFiles.js | 5 | ||||
| -rw-r--r-- | lib/worker/processTask.js | 115 |
10 files changed, 431 insertions, 197 deletions
diff --git a/lib/db/crud.js b/lib/db/crud.js new file mode 100644 index 0000000..fb32690 --- /dev/null +++ b/lib/db/crud.js @@ -0,0 +1,40 @@ +module.exports = function(model) { + return { + index: (q) => { + return model.query( (qb) => { + const limit = q.limit || 100 + const offset = q.offset || 0 + if (limit) { + delete q.limit + } + if (q.offset) { + delete q.offset + } + if (Object.keys(q).length > 0) qb.where(q) + qb.orderBy("id", "desc") + if (limit) qb.limit( limit ) + if (offset) qb.offset( offset ) + // console.log(qb) + return qb + }).fetchAll() + }, + show: (id) => { + return new model({'id': id}).fetch() + }, + show_ids: (ids) => { + return model.query( (qb) => { + qb.whereIn('id', ids) + return qb + }).fetchAll() + }, + create: (data) => { + return new model(data).save() + }, + update: (id, data) => { + return new model({'id': id}).save(data) + }, + destroy: (id) => { + return new model({'id': id}).destroy(data) + }, + } +}
\ No newline at end of file diff --git a/lib/db/index.js b/lib/db/index.js index 1c7cc15..c89afc3 100644 --- a/lib/db/index.js +++ b/lib/db/index.js @@ -1,76 +1,5 @@ -var db = module.exports +let db = module.exports -var connection = require("./bookshelf") -var bookshelf = connection.bookshelf -var knex = connection.knex - -var Folder = db.Folder = bookshelf.Model.extend({ - tableName: 'folders', - hasTimestamps: true, -}) -var File = db.File = bookshelf.Model.extend({ - tableName: 'files', - hasTimestamps: true, -}) -var Job = db.Job = bookshelf.Model.extend({ - tableName: 'jobs', - hasTimestamps: true, -}) -var Task = db.Task = bookshelf.Model.extend({ - tableName: 'tasks', - hasTimestamps: true, -}) - -db.crud = function(model) { - return { - index: (q) => { - return model.query( (qb) => { - const limit = q.limit || 100 - const offset = q.offset || 0 - if (limit) { - delete q.limit - } - if (q.offset) { - delete q.offset - } - if (Object.keys(q).length > 0) qb.where(q) - qb.orderBy("id", "desc") - if (limit) qb.limit( limit ) - if (offset) qb.offset( offset ) - // console.log(qb) - return qb - }).fetchAll() - }, - show: (id) => { - return new model({'id': id}).fetch() - }, - show_ids: (ids) => { - return model.query( (qb) => { - qb.whereIn('id', ids) - return qb - }).fetchAll() - }, - create: (data) => { - return new model(data).save() - }, - update: (id, data) => { - return new model({'id': id}).save(data) - }, - destroy: (id) => { - return new model({'id': id}).destroy(data) - }, - } -} - -function memoize (f) { - const o = {} - return (model) => { - console.log(model) - t = model.toString() - if (o[t]) { - return o[t] - } - o[t] = f(model) - return o[t] - } -}
\ No newline at end of file +db.crud = require('./crud') +db.model = require('./model') +db.models = require('./models') diff --git a/lib/db/model.js b/lib/db/model.js new file mode 100644 index 0000000..f174656 --- /dev/null +++ b/lib/db/model.js @@ -0,0 +1,160 @@ +const Loader = require('../vendor/Loader') +const db_crud = require('./crud') + +module.exports = function modelScope(type, db_model, _props) { + + const props = Object.assign({ + hasOne: {}, + afterCreate: () => {}, + }, _props) + + const crud = db_crud(db_model) + + const model = { + type: type, + db_model: db_model, + crud: crud, + + index: (query) => { + + return new Promise( (resolve, reject) => { + crud.index(query).then( (data) => { + + if (! props.hasOne) { + resolve(data ? data.toJSON() : []) + } + else { + let recs = data.toJSON() + const loader = new Loader () + loader.onReady( () => { + console.log(type, 'ready') + resolve(recs) + }) + console.log('hasOne') + loader.register('hasOne') + Object.keys(props.hasOne).forEach( (key,i) => { + loader.register(key) + console.log('register', key) + const type = props.hasOne[key] + const id_lookup = {} + recs.forEach(r => { + const id = r[key + '_id'] + id_lookup[id] = id_lookup[id] || [] + id_lookup[id].push(r) + }) + // console.log('\n\n%%%%%%%%%%%%%%%%%%%%%%%% index > hasOne ' + key + '\n\n\n') + // console.log(recs.length, Object.keys(id_lookup).length) + db_crud(type).show_ids(Object.keys(id_lookup)).then( (sub_recs) => { + // console.log(key, 'sub_recs', sub_recs) + const short_key = key.replace('_id','') + sub_recs.toJSON().forEach(rec => { + id_lookup[rec.id].forEach( parent_rec => parent_rec[short_key] = rec ) + }) + console.log('ready', key) + loader.ready(key) + }) + }) + loader.ready('hasOne') + } + }) // }).catch( () => res.sendStatus(500) ) + }) + }, + + show: (id) => { + return new Promise( (resolve, reject) => { + crud.show(id).then( (data) => { + if (! props.hasOne) { + resolve(data.toJSON()) + } + else { + let rec = data.toJSON() + const loader = new Loader () + loader.onReady( () => { + resolve(rec) + }) + loader.register('hasOne') + Object.keys(props.hasOne).forEach( (key,i) => { + loader.register(key) + const type = props.hasOne[key] + db_crud(type).show(rec[key + '_id']).then( (sub_rec) => { + rec[key] = sub_rec + loader.ready(key) + }) + }) + loader.ready('hasOne') + } + }) // .catch( (err) => res.sendStatus(500) ) + }) + }, + + findOrCreate: (data) => { + return new Promise( (resolve, reject) => { + let query = Object.assign({}, data) + query.limit = 1 + crud.index(query).then( (recs) => { + if (recs && recs.length) { + const rec = recs.at(0) + console.log('found rec', data.name) + return resolve(rec) + } + console.log('creating rec', data.name) + model.create(data).then( (rec) => { + resolve(rec) + }) + }) + }) + }, + + create: (data) => { + return new Promise( (resolve, reject) => { + crud.create( model.sanitize(data) ).then( (data) => { + console.log('yooooooo') + resolve(data.toJSON()) + props.afterCreate && props.afterCreate(data) + }).catch( (e) => { + console.error('error creating', e) + reject() + }) + }) + }, + + update: (id, data) => { + console.log('update', id) + return new Promise( (resolve, reject) => { + crud.update(id, model.sanitize(data)).then( (data) => { + resolve(data.toJSON()) + }).catch( (e) => { + console.error('error updating', e) + reject() + }) + }) + }, + + destroy: (id) => { + return new Promise( (resolve, reject) => { + crud.destroy(id).then( (data) => { + res.json(data.toJSON()) + })// .catch( () => res.sendStatus(500) ) + }) + }, + + sanitize: (data) => { + var valid = {} + console.log('yooooooo') + Object.keys(data).forEach(key => { + console.log('yooooooo2', key) + if (props.hasOne[key]) { + return + } + console.log('yooooooo3') + valid[key] = data[key] + }) + console.log('yooooooo4') + console.log(valid) + return valid + }, + + } + + return model +} diff --git a/lib/db/models.js b/lib/db/models.js new file mode 100644 index 0000000..588ce58 --- /dev/null +++ b/lib/db/models.js @@ -0,0 +1,43 @@ + +let fs = require('fs') +let model = require('./model') + +let connection = require("./bookshelf") +let bookshelf = connection.bookshelf +let knex = connection.knex + +let Folder = bookshelf.Model.extend({ + tableName: 'folders', + hasTimestamps: true, +}) +let File = bookshelf.Model.extend({ + tableName: 'files', + hasTimestamps: true, +}) +let Job = bookshelf.Model.extend({ + tableName: 'jobs', + hasTimestamps: true, +}) +let Task = bookshelf.Model.extend({ + tableName: 'tasks', + hasTimestamps: true, +}) + +module.exports = { + folder: model('folder', Folder, { + afterCreate: (folder) => { + fs.mkdir('data/' + folder.get('id') + '/', function(){ + console.log('created folder', folder.get('id'), folder.get('name')) + }) + } + }), + file: model('file', File), + job: model('job', Job), + task: model('task', Task, { + hasOne: { + content_file: File, + style_file: File, + output_file: File, + } + }), +} diff --git a/lib/server/api.js b/lib/server/api.js new file mode 100644 index 0000000..12adada --- /dev/null +++ b/lib/server/api.js @@ -0,0 +1,48 @@ +const db = require('../db') + +module.exports = function api (app, type) { + const type_s = '/' + type + 's/' + const type_id = type_s + ':id' + + const model = db.models[type] + + // index + app.get(type_s, (req, res) => { + console.log('index', type) + model.index(req.query).then( data => res.json(data) ) + }) + + // show + app.get(type_id, (req, res) => { + console.log('show', type, req.params.id) + model.show(req.params.id).then( (data) => { + res.json(data) + }) + }) + + // create + app.post(type_s, (req, res) => { + console.log('create', type) + model.create(req.body).then( (data) => { + res.json(data) + })// .catch( () => res.sendStatus(500) ) + }) + + // update + app.put(type_id, (req, res) => { + console.log('update', type, req.params.id) + model.update(req.body.id, req.body).then( (data) => { + res.json(data) + })// .catch( () => res.sendStatus(500) ) + }) + + // destroy + app.delete(type_id, (req, res) => { + console.log('destroy', type, req.params.id) + model.destroy(req.params.id).then( (data) => { + res.json(data) + })// .catch( () => res.sendStatus(500) ) + }) + + return model +}
\ No newline at end of file diff --git a/lib/server/index.js b/lib/server/index.js index c6176fe..f8cea0c 100644 --- a/lib/server/index.js +++ b/lib/server/index.js @@ -13,7 +13,7 @@ const Loader = require('../vendor/Loader') let app, server, io -const db = require('../db') +const api = require('./api') const site = module.exports = {} @@ -33,18 +33,10 @@ site.init = function(){ bridge.connectSocketIo(io) - const api_folders = crud(app, 'folder', db.Folder, { - afterCreate: (folder) => { - fs.mkdir('data/' + folder.get('id') + '/', function(){ - console.log('created folder', folder.get('id'), folder.get('name')) - }) - } - }) - const api_files = crud(app, 'file', db.File) - const api_jobs = crud(app, 'job', db.Job) - const api_tasks = crud(app, 'task', db.Task, { - hasOne: { content_file_id: db.File, style_file_id: db.File } - }) + const api_folders = api(app, 'folder') + const api_files = api(app, 'file') + const api_jobs = api(app, 'job') + const api_tasks = api(app, 'task') app.get('/devices', (req, res) => { res.json( bridge.devices ) @@ -70,7 +62,7 @@ site.init = function(){ data[fn] = false loader.register(fn) fs.rename(file.path, 'data/' + req.params.id + '/' + fn, function(err){ - api_files.create({ + api_files.crud.create({ // table.string('username') 'folder_id': req.params.id, 'name': fn, @@ -88,108 +80,4 @@ site.init = function(){ }) loader.ready('upload') }) - - function crud(app, type_s, model, callbacks){ - callbacks = callbacks || {} - const type = '/' + type_s + 's/' - const type_id = type + ':id' - - const crud = db.crud(model) - - // index - app.get(type, (req, res) => { - console.log('index', type) - crud.index(req.query).then( (data) => { - - if (! callbacks.hasOne) { - res.json(data ? data.toJSON() : []) - } - else { - let recs = data.toJSON() - const loader = new Loader () - loader.onReady( () => { - console.log(type, 'ready') - res.json(recs) - }) - console.log('hasOne') - loader.register('hasOne') - Object.keys(callbacks.hasOne).forEach( (key,i) => { - loader.register(key) - console.log('key') - const type = callbacks.hasOne[key] - const id_lookup = {} - recs.forEach(r => { - const id = r[key] - id_lookup[id] = id_lookup[id] || [] - id_lookup[id].push(r) - }) - console.log(key, recs.length, Object.keys(id_lookup).length) - db.crud(type).show_ids(Object.keys(id_lookup)).then( (sub_recs) => { - console.log(key, 'sub_recs', sub_recs) - const short_key = key.replace('_id','') - sub_recs.toJSON().forEach(rec => { - id_lookup[rec.id].forEach( parent_rec => parent_rec[short_key] = rec ) - }) - loader.ready(key) - }) - }) - loader.ready('hasOne') - } - }) // }).catch( () => res.sendStatus(500) ) - }) - - // show - app.get(type_id, (req, res) => { - console.log('show', type, req.params.id) - crud.show(req.params.id).then( (data) => { - if (! callbacks.hasOne) { - res.json(data.toJSON()) - } - else { - rec = data.toJSON() - const loader = new Loader () - loader.onReady( () => { - res.json(rec) - }) - loader.register('hasOne') - Object.keys(callbacks.hasOne).forEach( (key,i) => { - loader.register(key) - const type = callbacks.hasOne[key] - db.crud(type).show(rec[key]).then( (sub_rec) => { - rec[key] = sub_rec - loader.ready(key) - }) - }) - loader.ready('hasOne') - } - }) // .catch( (err) => res.sendStatus(500) ) - }) - - // create - app.post(type, (req, res) => { - console.log('create', type) - crud.create(req.body).then( (data) => { - res.json(data.toJSON()) - callbacks.afterCreate && callbacks.afterCreate(data) - })// .catch( () => res.sendStatus(500) ) - }) - - // update - app.put(type_id, (req, res) => { - console.log('update', type, req.params.id) - crud.update(req.body.id, req.body).then( (data) => { - res.json(data.toJSON()) - })// .catch( () => res.sendStatus(500) ) - }) - - // destroy - app.delete(type_id, (req, res) => { - console.log('destroy', type, req.params.id) - crud.destroy(req.params.id).then( (data) => { - res.json(data.toJSON()) - })// .catch( () => res.sendStatus(500) ) - }) - - return crud - } } diff --git a/lib/tools/index.js b/lib/tools/index.js new file mode 100644 index 0000000..062b20e --- /dev/null +++ b/lib/tools/index.js @@ -0,0 +1,7 @@ + +export { + nsatf: { + cmd: process.env.PYTHON_BINARY, + script: 'nsatf.py', + } +}
\ No newline at end of file diff --git a/lib/worker/index.js b/lib/worker/index.js index 52f12e2..6733dd1 100644 --- a/lib/worker/index.js +++ b/lib/worker/index.js @@ -1,15 +1,16 @@ require('dotenv').config() const ipc = require('node-ipc') -const db = require('../db') -const dbFile = db.crud(db.File) const processFiles = require('./processFiles') +const processTask = require('./processTask') ipc.config.id = 'cortexworker' ipc.config.retry = 1500 +ipc.config.silent = true processFiles() +processTask() ipc.connectTo('cortex', () => { ipc.of.cortex.on('connect', () => { @@ -28,8 +29,10 @@ ipc.connectTo('cortex', () => { }) ipc.of.cortex.on('job', (data) => { ipc.log('received job: '.debug, data); + processTask() }) ipc.of.cortex.on('task', (data) => { ipc.log('received task: '.debug, data); + processTask() }) }) diff --git a/lib/worker/processFiles.js b/lib/worker/processFiles.js index 60e0ca5..c9b2724 100644 --- a/lib/worker/processFiles.js +++ b/lib/worker/processFiles.js @@ -1,11 +1,12 @@ const ipc = require('node-ipc') const db = require('../db') -const dbFile = db.crud(db.File) +const dbFile = db.models.file const path = require('path') const ffprobe = require('node-ffprobe') const execFile = require('child_process').execFile const mimeFromExtension = require('./mimeFromExtension') ffprobe.FFPROBE_PATH = process.env.FFPROBE_BINARY +const Loader = require('../vendor/Loader') let processing = false @@ -13,7 +14,7 @@ module.exports = function processFiles() { if (processing) return processing = true - dbFile.index({ processed: false }).then( (files) => { + dbFile.crud.index({ processed: false }).then( (files) => { console.log(files.length + ' files left to process') if (files.length === 0) { processing = false diff --git a/lib/worker/processTask.js b/lib/worker/processTask.js new file mode 100644 index 0000000..3c7b789 --- /dev/null +++ b/lib/worker/processTask.js @@ -0,0 +1,115 @@ +const ipc = require('node-ipc') +const db = require('../db') +const tools = require('../tools') +const fs = require('fs') +const path = require('path') +const execFile = require('child_process').execFile +const Loader = require('../vendor/Loader') + +let processing = false + +module.exports = function processTask() { + if (processing) return + processing = true + + db.models.task.index({ completed: false, limit: 1 }).then( (tasks) => { + // console.log(tasks.length + ' tasks left to process') + console.log('fetching tasks...') + if (! tasks || tasks.length === 0) { + console.log('> completed all tasks!') + processing = false + return + } + + task = tasks.at ? tasks.at(0) : tasks[0] + console.log('running task #', task ? task.id : '??') + return processTaskPromise(task) + }).then( () => { + console.log('done') + if (processing) { + processing = false + setTimeout( processTask ) + } + }) +} + +function processTaskPromise(task) { + return new Promise ( (resolve, reject) => { + initTask(task) + .then(() => { return runTask(task) }) + .then(() => { return processDone(task) }) + .then(() => { return resolve() }) + }) +} + +function initTask() { + return new Promise ( (resolve, reject) => { + if (task.content_file.type !== 'audio') reject() + if (task.style_file.type !== 'audio') reject() + + constructFilePath(task.content_file) + constructFilePath(task.style_file) + + db.models.folder.findOrCreate({ name: 'output' }) + .then( (folder) => { + task.output_file = { + type: 'audio', + name: (folder.name || 'output') + '_' + Date.now() + '.aiff', + folder_id: folder.id, + processed: false, + } + constructFilePath(task.output_file) + resolve() + }) + }) +} + +function checkAccess(file) { + return new Promise ( (resolve, reject) => { + fs.access(file.aiffPath, fs.constants.R_OK, (err) => { + if (err) reject() + else resolve() + }) + }) +} + +function runTask(fullPath) { + return new Promise ( (resolve, reject) => { + console.log('running task') + + console.log('create output file record...') + console.log(task.output_file.path) + + const tool = tools[ task.tool ] + + execFile(tool.cmd, [ + tool.script_path, + task.content_file.path, + task.style_file.path, + task.output_file.path, + ], (err, stdout, stderr) => { + console.log(stdout, stderr) + db.models.file.create(task.output_file).then( file => { + task.output_file = file + task.output_file_id = file.id + resolve() + }) + }) + }) +} + +function processDone(task) { + task.completed = true + // console.log(task) + ipc.of.cortex && ipc.of.cortex.emit("completed", { task: task }) + return db.models.task.update(task.id, task) +} + +function constructFilePath (file) { + file.path = path.join(__dirname, '../..', 'public/data', String(file.folder_id), String(file.name)) + switch (file.type) { + case 'audio': + file.aiffPath = file.mime === 'audio/aiff' ? file.path : file.path + '.aiff' + break + } +} |
