diff options
Diffstat (limited to 'lib/worker')
| -rw-r--r-- | lib/worker/index.js | 21 | ||||
| -rw-r--r-- | lib/worker/processFiles.js | 33 | ||||
| -rw-r--r-- | lib/worker/processTasks.js (renamed from lib/worker/processTask.js) | 73 |
3 files changed, 75 insertions, 52 deletions
diff --git a/lib/worker/index.js b/lib/worker/index.js index 6733dd1..0e2ff44 100644 --- a/lib/worker/index.js +++ b/lib/worker/index.js @@ -3,36 +3,35 @@ require('dotenv').config() const ipc = require('node-ipc') const processFiles = require('./processFiles') -const processTask = require('./processTask') +const processTasks = require('./processTasks') ipc.config.id = 'cortexworker' ipc.config.retry = 1500 ipc.config.silent = true processFiles() -processTask() +processTasks() ipc.connectTo('cortex', () => { ipc.of.cortex.on('connect', () => { ipc.log('## connected to cortex ##', ipc.config.delay); - // ipc.of.cortex.emit('message', 'hello') }) ipc.of.cortex.on('disconnect', () => { ipc.log('disconnected'.notice) }) ipc.of.cortex.on('message', (data) => { - ipc.log('received message!'.debug, data) + ipc.log('received message'.debug, data) }) - ipc.of.cortex.on('process', (data) => { - ipc.log('received process: '.debug, data); + ipc.of.cortex.on('processFiles', (data) => { + ipc.log('>>> received processFiles'.debug, data); processFiles() }) ipc.of.cortex.on('job', (data) => { - ipc.log('received job: '.debug, data); - processTask() + ipc.log('received job'.debug, data); + processTasks() }) - ipc.of.cortex.on('task', (data) => { - ipc.log('received task: '.debug, data); - processTask() + ipc.of.cortex.on('processTasks', (data) => { + ipc.log('>>> received processTasks: '.debug, data); + processTasks() }) }) diff --git a/lib/worker/processFiles.js b/lib/worker/processFiles.js index c9b2724..2b89aea 100644 --- a/lib/worker/processFiles.js +++ b/lib/worker/processFiles.js @@ -14,18 +14,17 @@ module.exports = function processFiles() { if (processing) return processing = true - dbFile.crud.index({ processed: false }).then( (files) => { + dbFile.index({ processed: false, limit: 1 }).then( (files) => { console.log(files.length + ' files left to process') - if (files.length === 0) { + if (! files || files.length === 0) { processing = false return } - file = files.at(0) - console.log('processing', file.get('name')) - const mimeType = mimeFromExtension(file.get('name')) - file.set('mime', mimeType.mime) - file.set('type', mimeType.type) + let file = files.at ? files.at(0) : files[0] + const mimeType = mimeFromExtension(file.name) + file.mime = mimeType.mime + file.type = mimeType.type switch (mimeType.type) { case 'audio': return processAudio(file) @@ -42,9 +41,9 @@ module.exports = function processFiles() { } function processAudio(file) { - const filePath = path.join(__dirname, '../..', 'public/data', String(file.get('folder_id'))) + const filePath = path.join(__dirname, '../..', 'public/data', String(file.folder_id)) // console.log(filePath) - const fullPath = path.join(filePath, file.get('name')) + const fullPath = path.join(filePath, file.name) console.log(fullPath) return new Promise ( (resolve, reject) => { @@ -83,8 +82,14 @@ function cacheFfprobe(file, fullPath) { return new Promise ( (resolve, reject) => { console.log('ffprobe') ffprobe(fullPath, function(err, probeData) { - file.set('duration', probeData.format.duration) - file.set('analysis', JSON.stringify(probeData)) + if (err) { + console.error('\n------------------------- !!!\n\n') + console.error(err) + } + if (probeData) { + file.duration = probeData.format.duration + } + file.analysis = JSON.stringify(probeData) resolve() }) }) @@ -132,7 +137,7 @@ function processImage(file) { } function processDone(file) { - file.set({ processed: true }) - ipc.of.cortex && ipc.of.cortex.emit("processed", { file: file }) - return file.save() + file.processed = true + ipc.of.cortex && ipc.of.cortex.emit("updateFile", { file: file }) + return dbFile.update(file.id, file) }
\ No newline at end of file diff --git a/lib/worker/processTask.js b/lib/worker/processTasks.js index 8afeb6d..c60aaf5 100644 --- a/lib/worker/processTask.js +++ b/lib/worker/processTasks.js @@ -5,30 +5,38 @@ const fs = require('fs') const path = require('path') const execFile = require('child_process').execFile const Loader = require('../vendor/Loader') +const processFiles = require('./processFiles') let processing = false -module.exports = function processTask() { +module.exports = function processTasks() { if (processing) return processing = true + console.log('fetching tasks...') db.models.task.index({ completed: false, limit: 1 }).then( (tasks) => { // console.log(tasks.length + ' tasks left to process') - console.log('fetching tasks...') if (! tasks || tasks.length === 0) { console.log('> completed all tasks!') processing = false return } - task = tasks.at ? tasks.at(0) : tasks[0] - console.log('running task #', task ? task.id : '??') - return processTaskPromise(task) + let task = tasks.at ? tasks.at(0) : tasks[0] + if (task) { + console.log('running task #', task.id) + return processTaskPromise(task) + } + else { + console.log('no more tasks') + return new Promise( resolve => resolve() ) + } }).then( () => { console.log('done') if (processing) { processing = false - setTimeout( processTask ) + setTimeout( processFiles ) + setTimeout( processTasks ) } }) } @@ -42,7 +50,7 @@ function processTaskPromise(task) { }) } -function initTask() { +function initTask(task) { return new Promise ( (resolve, reject) => { if (task.content_file.type !== 'audio') reject() if (task.style_file.type !== 'audio') reject() @@ -50,16 +58,21 @@ function initTask() { constructFilePath(task.content_file) constructFilePath(task.style_file) - db.models.folder.findOrCreate({ name: 'output' }) - .then( (folder) => { - task.output_file = { - type: 'audio', - name: (folder.name || 'output') + '_' + Date.now() + '.aiff', - folder_id: folder.id, - processed: false, - } - constructFilePath(task.output_file) - resolve() + task.processing = true + db.models.task.update(task.id, task).then( () => { + ipc.of.cortex && ipc.of.cortex.emit("updateTask", { task }) + db.models.folder.findOrCreate({ name: 'output' }) + .then( (folder) => { + task.output_file = { + type: 'audio', + name: (folder.name || 'output') + '_' + Date.now() + '.aiff', + folder_id: folder.id, + processed: false, + generated: true, + } + constructFilePath(task.output_file) + resolve() + }) }) }) } @@ -73,21 +86,26 @@ function checkAccess(file) { }) } -function runTask(fullPath) { +function runTask(task) { return new Promise ( (resolve, reject) => { console.log('running task') - console.log('create output file record...') - console.log(task.output_file.path) - const tool = tools[ task.tool ] execFile(tool.cmd, [ - tool.script_path, - task.content_file.path, - task.style_file.path, + path.join(__dirname, '../..', tool.script), + task.content_file.aiffPath, + task.style_file.aiffPath, task.output_file.path, ], (err, stdout, stderr) => { - console.log(stdout, stderr) + console.log("DONE") + console.log(err) + console.log(stdout) + console.log(stderr) + console.log('create output file record...') + console.log(task.output_file.path) + task.stdout = stdout + task.stderr = stderr + db.models.file.create(task.output_file).then( file => { task.output_file = file task.output_file_id = file.id @@ -99,8 +117,9 @@ function runTask(fullPath) { function processDone(task) { task.completed = true + task.processing = false console.log(task) - ipc.of.cortex && ipc.of.cortex.emit("completed", { task: task }) + ipc.of.cortex && ipc.of.cortex.emit("updateTask", { task }) return db.models.task.update(task.id, task) } @@ -108,7 +127,7 @@ function constructFilePath (file) { file.path = path.join(__dirname, '../..', 'public/data', String(file.folder_id), String(file.name)) switch (file.type) { case 'audio': - file.aiffPath = file.mime === 'audio/aiff' ? file.path : file.path + '.aiff' + file.aiffPath = file.path + '.aiff' break } } |
