From d41070c7b00fafc974a1a6e7b6d1b42391fa57ed Mon Sep 17 00:00:00 2001 From: Jules Laplace Date: Fri, 21 Jul 2017 04:48:52 +0200 Subject: all async paths working --- lib/worker/processTasks.js | 133 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 lib/worker/processTasks.js (limited to 'lib/worker/processTasks.js') diff --git a/lib/worker/processTasks.js b/lib/worker/processTasks.js new file mode 100644 index 0000000..c60aaf5 --- /dev/null +++ b/lib/worker/processTasks.js @@ -0,0 +1,133 @@ +const ipc = require('node-ipc') +const db = require('../db') +const tools = require('../tools') +const fs = require('fs') +const path = require('path') +const execFile = require('child_process').execFile +const Loader = require('../vendor/Loader') +const processFiles = require('./processFiles') + +let processing = false + +module.exports = function processTasks() { + if (processing) return + processing = true + + console.log('fetching tasks...') + db.models.task.index({ completed: false, limit: 1 }).then( (tasks) => { + // console.log(tasks.length + ' tasks left to process') + if (! tasks || tasks.length === 0) { + console.log('> completed all tasks!') + processing = false + return + } + + let task = tasks.at ? tasks.at(0) : tasks[0] + if (task) { + console.log('running task #', task.id) + return processTaskPromise(task) + } + else { + console.log('no more tasks') + return new Promise( resolve => resolve() ) + } + }).then( () => { + console.log('done') + if (processing) { + processing = false + setTimeout( processFiles ) + setTimeout( processTasks ) + } + }) +} + +function processTaskPromise(task) { + return new Promise ( (resolve, reject) => { + initTask(task) + .then(() => { return runTask(task) }) + .then(() => { return processDone(task) }) + .then(() => { return resolve() }) + }) +} + +function initTask(task) { + return new Promise ( (resolve, reject) => { + if (task.content_file.type !== 'audio') reject() + if (task.style_file.type !== 'audio') reject() + + constructFilePath(task.content_file) + constructFilePath(task.style_file) + + task.processing = true + db.models.task.update(task.id, task).then( () => { + ipc.of.cortex && ipc.of.cortex.emit("updateTask", { task }) + db.models.folder.findOrCreate({ name: 'output' }) + .then( (folder) => { + task.output_file = { + type: 'audio', + name: (folder.name || 'output') + '_' + Date.now() + '.aiff', + folder_id: folder.id, + processed: false, + generated: true, + } + constructFilePath(task.output_file) + resolve() + }) + }) + }) +} + +function checkAccess(file) { + return new Promise ( (resolve, reject) => { + fs.access(file.aiffPath, fs.constants.R_OK, (err) => { + if (err) reject() + else resolve() + }) + }) +} + +function runTask(task) { + return new Promise ( (resolve, reject) => { + console.log('running task') + + const tool = tools[ task.tool ] + execFile(tool.cmd, [ + path.join(__dirname, '../..', tool.script), + task.content_file.aiffPath, + task.style_file.aiffPath, + task.output_file.path, + ], (err, stdout, stderr) => { + console.log("DONE") + console.log(err) + console.log(stdout) + console.log(stderr) + console.log('create output file record...') + console.log(task.output_file.path) + task.stdout = stdout + task.stderr = stderr + + db.models.file.create(task.output_file).then( file => { + task.output_file = file + task.output_file_id = file.id + resolve() + }) + }) + }) +} + +function processDone(task) { + task.completed = true + task.processing = false + console.log(task) + ipc.of.cortex && ipc.of.cortex.emit("updateTask", { task }) + return db.models.task.update(task.id, task) +} + +function constructFilePath (file) { + file.path = path.join(__dirname, '../..', 'public/data', String(file.folder_id), String(file.name)) + switch (file.type) { + case 'audio': + file.aiffPath = file.path + '.aiff' + break + } +} -- cgit v1.2.3-70-g09d2