From d41070c7b00fafc974a1a6e7b6d1b42391fa57ed Mon Sep 17 00:00:00 2001 From: Jules Laplace Date: Fri, 21 Jul 2017 04:48:52 +0200 Subject: all async paths working --- lib/worker/index.js | 21 ++++--- lib/worker/processFiles.js | 33 ++++++----- lib/worker/processTask.js | 114 -------------------------------------- lib/worker/processTasks.js | 133 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 162 insertions(+), 139 deletions(-) delete mode 100644 lib/worker/processTask.js create mode 100644 lib/worker/processTasks.js (limited to 'lib/worker') diff --git a/lib/worker/index.js b/lib/worker/index.js index 6733dd1..0e2ff44 100644 --- a/lib/worker/index.js +++ b/lib/worker/index.js @@ -3,36 +3,35 @@ require('dotenv').config() const ipc = require('node-ipc') const processFiles = require('./processFiles') -const processTask = require('./processTask') +const processTasks = require('./processTasks') ipc.config.id = 'cortexworker' ipc.config.retry = 1500 ipc.config.silent = true processFiles() -processTask() +processTasks() ipc.connectTo('cortex', () => { ipc.of.cortex.on('connect', () => { ipc.log('## connected to cortex ##', ipc.config.delay); - // ipc.of.cortex.emit('message', 'hello') }) ipc.of.cortex.on('disconnect', () => { ipc.log('disconnected'.notice) }) ipc.of.cortex.on('message', (data) => { - ipc.log('received message!'.debug, data) + ipc.log('received message'.debug, data) }) - ipc.of.cortex.on('process', (data) => { - ipc.log('received process: '.debug, data); + ipc.of.cortex.on('processFiles', (data) => { + ipc.log('>>> received processFiles'.debug, data); processFiles() }) ipc.of.cortex.on('job', (data) => { - ipc.log('received job: '.debug, data); - processTask() + ipc.log('received job'.debug, data); + processTasks() }) - ipc.of.cortex.on('task', (data) => { - ipc.log('received task: '.debug, data); - processTask() + ipc.of.cortex.on('processTasks', (data) => { + ipc.log('>>> received processTasks: '.debug, data); + processTasks() }) }) diff --git a/lib/worker/processFiles.js b/lib/worker/processFiles.js index c9b2724..2b89aea 100644 --- a/lib/worker/processFiles.js +++ b/lib/worker/processFiles.js @@ -14,18 +14,17 @@ module.exports = function processFiles() { if (processing) return processing = true - dbFile.crud.index({ processed: false }).then( (files) => { + dbFile.index({ processed: false, limit: 1 }).then( (files) => { console.log(files.length + ' files left to process') - if (files.length === 0) { + if (! files || files.length === 0) { processing = false return } - file = files.at(0) - console.log('processing', file.get('name')) - const mimeType = mimeFromExtension(file.get('name')) - file.set('mime', mimeType.mime) - file.set('type', mimeType.type) + let file = files.at ? files.at(0) : files[0] + const mimeType = mimeFromExtension(file.name) + file.mime = mimeType.mime + file.type = mimeType.type switch (mimeType.type) { case 'audio': return processAudio(file) @@ -42,9 +41,9 @@ module.exports = function processFiles() { } function processAudio(file) { - const filePath = path.join(__dirname, '../..', 'public/data', String(file.get('folder_id'))) + const filePath = path.join(__dirname, '../..', 'public/data', String(file.folder_id)) // console.log(filePath) - const fullPath = path.join(filePath, file.get('name')) + const fullPath = path.join(filePath, file.name) console.log(fullPath) return new Promise ( (resolve, reject) => { @@ -83,8 +82,14 @@ function cacheFfprobe(file, fullPath) { return new Promise ( (resolve, reject) => { console.log('ffprobe') ffprobe(fullPath, function(err, probeData) { - file.set('duration', probeData.format.duration) - file.set('analysis', JSON.stringify(probeData)) + if (err) { + console.error('\n------------------------- !!!\n\n') + console.error(err) + } + if (probeData) { + file.duration = probeData.format.duration + } + file.analysis = JSON.stringify(probeData) resolve() }) }) @@ -132,7 +137,7 @@ function processImage(file) { } function processDone(file) { - file.set({ processed: true }) - ipc.of.cortex && ipc.of.cortex.emit("processed", { file: file }) - return file.save() + file.processed = true + ipc.of.cortex && ipc.of.cortex.emit("updateFile", { file: file }) + return dbFile.update(file.id, file) } \ No newline at end of file diff --git a/lib/worker/processTask.js b/lib/worker/processTask.js deleted file mode 100644 index 8afeb6d..0000000 --- a/lib/worker/processTask.js +++ /dev/null @@ -1,114 +0,0 @@ -const ipc = require('node-ipc') -const db = require('../db') -const tools = require('../tools') -const fs = require('fs') -const path = require('path') -const execFile = require('child_process').execFile -const Loader = require('../vendor/Loader') - -let processing = false - -module.exports = function processTask() { - if (processing) return - processing = true - - db.models.task.index({ completed: false, limit: 1 }).then( (tasks) => { - // console.log(tasks.length + ' tasks left to process') - console.log('fetching tasks...') - if (! tasks || tasks.length === 0) { - console.log('> completed all tasks!') - processing = false - return - } - - task = tasks.at ? tasks.at(0) : tasks[0] - console.log('running task #', task ? task.id : '??') - return processTaskPromise(task) - }).then( () => { - console.log('done') - if (processing) { - processing = false - setTimeout( processTask ) - } - }) -} - -function processTaskPromise(task) { - return new Promise ( (resolve, reject) => { - initTask(task) - .then(() => { return runTask(task) }) - .then(() => { return processDone(task) }) - .then(() => { return resolve() }) - }) -} - -function initTask() { - return new Promise ( (resolve, reject) => { - if (task.content_file.type !== 'audio') reject() - if (task.style_file.type !== 'audio') reject() - - constructFilePath(task.content_file) - constructFilePath(task.style_file) - - db.models.folder.findOrCreate({ name: 'output' }) - .then( (folder) => { - task.output_file = { - type: 'audio', - name: (folder.name || 'output') + '_' + Date.now() + '.aiff', - folder_id: folder.id, - processed: false, - } - constructFilePath(task.output_file) - resolve() - }) - }) -} - -function checkAccess(file) { - return new Promise ( (resolve, reject) => { - fs.access(file.aiffPath, fs.constants.R_OK, (err) => { - if (err) reject() - else resolve() - }) - }) -} - -function runTask(fullPath) { - return new Promise ( (resolve, reject) => { - console.log('running task') - - console.log('create output file record...') - console.log(task.output_file.path) - - const tool = tools[ task.tool ] - execFile(tool.cmd, [ - tool.script_path, - task.content_file.path, - task.style_file.path, - task.output_file.path, - ], (err, stdout, stderr) => { - console.log(stdout, stderr) - db.models.file.create(task.output_file).then( file => { - task.output_file = file - task.output_file_id = file.id - resolve() - }) - }) - }) -} - -function processDone(task) { - task.completed = true - console.log(task) - ipc.of.cortex && ipc.of.cortex.emit("completed", { task: task }) - return db.models.task.update(task.id, task) -} - -function constructFilePath (file) { - file.path = path.join(__dirname, '../..', 'public/data', String(file.folder_id), String(file.name)) - switch (file.type) { - case 'audio': - file.aiffPath = file.mime === 'audio/aiff' ? file.path : file.path + '.aiff' - break - } -} diff --git a/lib/worker/processTasks.js b/lib/worker/processTasks.js new file mode 100644 index 0000000..c60aaf5 --- /dev/null +++ b/lib/worker/processTasks.js @@ -0,0 +1,133 @@ +const ipc = require('node-ipc') +const db = require('../db') +const tools = require('../tools') +const fs = require('fs') +const path = require('path') +const execFile = require('child_process').execFile +const Loader = require('../vendor/Loader') +const processFiles = require('./processFiles') + +let processing = false + +module.exports = function processTasks() { + if (processing) return + processing = true + + console.log('fetching tasks...') + db.models.task.index({ completed: false, limit: 1 }).then( (tasks) => { + // console.log(tasks.length + ' tasks left to process') + if (! tasks || tasks.length === 0) { + console.log('> completed all tasks!') + processing = false + return + } + + let task = tasks.at ? tasks.at(0) : tasks[0] + if (task) { + console.log('running task #', task.id) + return processTaskPromise(task) + } + else { + console.log('no more tasks') + return new Promise( resolve => resolve() ) + } + }).then( () => { + console.log('done') + if (processing) { + processing = false + setTimeout( processFiles ) + setTimeout( processTasks ) + } + }) +} + +function processTaskPromise(task) { + return new Promise ( (resolve, reject) => { + initTask(task) + .then(() => { return runTask(task) }) + .then(() => { return processDone(task) }) + .then(() => { return resolve() }) + }) +} + +function initTask(task) { + return new Promise ( (resolve, reject) => { + if (task.content_file.type !== 'audio') reject() + if (task.style_file.type !== 'audio') reject() + + constructFilePath(task.content_file) + constructFilePath(task.style_file) + + task.processing = true + db.models.task.update(task.id, task).then( () => { + ipc.of.cortex && ipc.of.cortex.emit("updateTask", { task }) + db.models.folder.findOrCreate({ name: 'output' }) + .then( (folder) => { + task.output_file = { + type: 'audio', + name: (folder.name || 'output') + '_' + Date.now() + '.aiff', + folder_id: folder.id, + processed: false, + generated: true, + } + constructFilePath(task.output_file) + resolve() + }) + }) + }) +} + +function checkAccess(file) { + return new Promise ( (resolve, reject) => { + fs.access(file.aiffPath, fs.constants.R_OK, (err) => { + if (err) reject() + else resolve() + }) + }) +} + +function runTask(task) { + return new Promise ( (resolve, reject) => { + console.log('running task') + + const tool = tools[ task.tool ] + execFile(tool.cmd, [ + path.join(__dirname, '../..', tool.script), + task.content_file.aiffPath, + task.style_file.aiffPath, + task.output_file.path, + ], (err, stdout, stderr) => { + console.log("DONE") + console.log(err) + console.log(stdout) + console.log(stderr) + console.log('create output file record...') + console.log(task.output_file.path) + task.stdout = stdout + task.stderr = stderr + + db.models.file.create(task.output_file).then( file => { + task.output_file = file + task.output_file_id = file.id + resolve() + }) + }) + }) +} + +function processDone(task) { + task.completed = true + task.processing = false + console.log(task) + ipc.of.cortex && ipc.of.cortex.emit("updateTask", { task }) + return db.models.task.update(task.id, task) +} + +function constructFilePath (file) { + file.path = path.join(__dirname, '../..', 'public/data', String(file.folder_id), String(file.name)) + switch (file.type) { + case 'audio': + file.aiffPath = file.path + '.aiff' + break + } +} -- cgit v1.2.3-70-g09d2