diff options
| author | Jules Laplace <julescarbon@gmail.com> | 2017-07-19 00:50:05 +0200 |
|---|---|---|
| committer | Jules Laplace <julescarbon@gmail.com> | 2017-07-19 00:50:05 +0200 |
| commit | 64e8c03dea044752bf3f2f228462721fe565f950 (patch) | |
| tree | 41b48b67f69979bfc97be166129ee41c8dcb0c7f /lib/worker | |
| parent | 11a70bc347587219b2ec7b63cf4a6ff69bb4199b (diff) | |
refactor all the worker stuff
Diffstat (limited to 'lib/worker')
| -rw-r--r-- | lib/worker/index.js | 7 | ||||
| -rw-r--r-- | lib/worker/processFiles.js | 5 | ||||
| -rw-r--r-- | lib/worker/processTask.js | 115 |
3 files changed, 123 insertions, 4 deletions
diff --git a/lib/worker/index.js b/lib/worker/index.js index 52f12e2..6733dd1 100644 --- a/lib/worker/index.js +++ b/lib/worker/index.js @@ -1,15 +1,16 @@ require('dotenv').config() const ipc = require('node-ipc') -const db = require('../db') -const dbFile = db.crud(db.File) const processFiles = require('./processFiles') +const processTask = require('./processTask') ipc.config.id = 'cortexworker' ipc.config.retry = 1500 +ipc.config.silent = true processFiles() +processTask() ipc.connectTo('cortex', () => { ipc.of.cortex.on('connect', () => { @@ -28,8 +29,10 @@ ipc.connectTo('cortex', () => { }) ipc.of.cortex.on('job', (data) => { ipc.log('received job: '.debug, data); + processTask() }) ipc.of.cortex.on('task', (data) => { ipc.log('received task: '.debug, data); + processTask() }) }) diff --git a/lib/worker/processFiles.js b/lib/worker/processFiles.js index 60e0ca5..c9b2724 100644 --- a/lib/worker/processFiles.js +++ b/lib/worker/processFiles.js @@ -1,11 +1,12 @@ const ipc = require('node-ipc') const db = require('../db') -const dbFile = db.crud(db.File) +const dbFile = db.models.file const path = require('path') const ffprobe = require('node-ffprobe') const execFile = require('child_process').execFile const mimeFromExtension = require('./mimeFromExtension') ffprobe.FFPROBE_PATH = process.env.FFPROBE_BINARY +const Loader = require('../vendor/Loader') let processing = false @@ -13,7 +14,7 @@ module.exports = function processFiles() { if (processing) return processing = true - dbFile.index({ processed: false }).then( (files) => { + dbFile.crud.index({ processed: false }).then( (files) => { console.log(files.length + ' files left to process') if (files.length === 0) { processing = false diff --git a/lib/worker/processTask.js b/lib/worker/processTask.js new file mode 100644 index 0000000..3c7b789 --- /dev/null +++ b/lib/worker/processTask.js @@ -0,0 +1,115 @@ +const ipc = require('node-ipc') +const db = require('../db') +const tools = require('../tools') +const fs = require('fs') +const path = require('path') +const execFile = require('child_process').execFile +const Loader = require('../vendor/Loader') + +let processing = false + +module.exports = function processTask() { + if (processing) return + processing = true + + db.models.task.index({ completed: false, limit: 1 }).then( (tasks) => { + // console.log(tasks.length + ' tasks left to process') + console.log('fetching tasks...') + if (! tasks || tasks.length === 0) { + console.log('> completed all tasks!') + processing = false + return + } + + task = tasks.at ? tasks.at(0) : tasks[0] + console.log('running task #', task ? task.id : '??') + return processTaskPromise(task) + }).then( () => { + console.log('done') + if (processing) { + processing = false + setTimeout( processTask ) + } + }) +} + +function processTaskPromise(task) { + return new Promise ( (resolve, reject) => { + initTask(task) + .then(() => { return runTask(task) }) + .then(() => { return processDone(task) }) + .then(() => { return resolve() }) + }) +} + +function initTask() { + return new Promise ( (resolve, reject) => { + if (task.content_file.type !== 'audio') reject() + if (task.style_file.type !== 'audio') reject() + + constructFilePath(task.content_file) + constructFilePath(task.style_file) + + db.models.folder.findOrCreate({ name: 'output' }) + .then( (folder) => { + task.output_file = { + type: 'audio', + name: (folder.name || 'output') + '_' + Date.now() + '.aiff', + folder_id: folder.id, + processed: false, + } + constructFilePath(task.output_file) + resolve() + }) + }) +} + +function checkAccess(file) { + return new Promise ( (resolve, reject) => { + fs.access(file.aiffPath, fs.constants.R_OK, (err) => { + if (err) reject() + else resolve() + }) + }) +} + +function runTask(fullPath) { + return new Promise ( (resolve, reject) => { + console.log('running task') + + console.log('create output file record...') + console.log(task.output_file.path) + + const tool = tools[ task.tool ] + + execFile(tool.cmd, [ + tool.script_path, + task.content_file.path, + task.style_file.path, + task.output_file.path, + ], (err, stdout, stderr) => { + console.log(stdout, stderr) + db.models.file.create(task.output_file).then( file => { + task.output_file = file + task.output_file_id = file.id + resolve() + }) + }) + }) +} + +function processDone(task) { + task.completed = true + // console.log(task) + ipc.of.cortex && ipc.of.cortex.emit("completed", { task: task }) + return db.models.task.update(task.id, task) +} + +function constructFilePath (file) { + file.path = path.join(__dirname, '../..', 'public/data', String(file.folder_id), String(file.name)) + switch (file.type) { + case 'audio': + file.aiffPath = file.mime === 'audio/aiff' ? file.path : file.path + '.aiff' + break + } +} |
