summaryrefslogtreecommitdiff
path: root/lib/worker/processTasks.js
diff options
context:
space:
mode:
Diffstat (limited to 'lib/worker/processTasks.js')
-rw-r--r--lib/worker/processTasks.js133
1 files changed, 133 insertions, 0 deletions
diff --git a/lib/worker/processTasks.js b/lib/worker/processTasks.js
new file mode 100644
index 0000000..c60aaf5
--- /dev/null
+++ b/lib/worker/processTasks.js
@@ -0,0 +1,133 @@
+const ipc = require('node-ipc')
+const db = require('../db')
+const tools = require('../tools')
+const fs = require('fs')
+const path = require('path')
+const execFile = require('child_process').execFile
+const Loader = require('../vendor/Loader')
+const processFiles = require('./processFiles')
+
+let processing = false
+
+module.exports = function processTasks() {
+ if (processing) return
+ processing = true
+
+ console.log('fetching tasks...')
+ db.models.task.index({ completed: false, limit: 1 }).then( (tasks) => {
+ // console.log(tasks.length + ' tasks left to process')
+ if (! tasks || tasks.length === 0) {
+ console.log('> completed all tasks!')
+ processing = false
+ return
+ }
+
+ let task = tasks.at ? tasks.at(0) : tasks[0]
+ if (task) {
+ console.log('running task #', task.id)
+ return processTaskPromise(task)
+ }
+ else {
+ console.log('no more tasks')
+ return new Promise( resolve => resolve() )
+ }
+ }).then( () => {
+ console.log('done')
+ if (processing) {
+ processing = false
+ setTimeout( processFiles )
+ setTimeout( processTasks )
+ }
+ })
+}
+
+function processTaskPromise(task) {
+ return new Promise ( (resolve, reject) => {
+ initTask(task)
+ .then(() => { return runTask(task) })
+ .then(() => { return processDone(task) })
+ .then(() => { return resolve() })
+ })
+}
+
+function initTask(task) {
+ return new Promise ( (resolve, reject) => {
+ if (task.content_file.type !== 'audio') reject()
+ if (task.style_file.type !== 'audio') reject()
+
+ constructFilePath(task.content_file)
+ constructFilePath(task.style_file)
+
+ task.processing = true
+ db.models.task.update(task.id, task).then( () => {
+ ipc.of.cortex && ipc.of.cortex.emit("updateTask", { task })
+ db.models.folder.findOrCreate({ name: 'output' })
+ .then( (folder) => {
+ task.output_file = {
+ type: 'audio',
+ name: (folder.name || 'output') + '_' + Date.now() + '.aiff',
+ folder_id: folder.id,
+ processed: false,
+ generated: true,
+ }
+ constructFilePath(task.output_file)
+ resolve()
+ })
+ })
+ })
+}
+
+function checkAccess(file) {
+ return new Promise ( (resolve, reject) => {
+ fs.access(file.aiffPath, fs.constants.R_OK, (err) => {
+ if (err) reject()
+ else resolve()
+ })
+ })
+}
+
+function runTask(task) {
+ return new Promise ( (resolve, reject) => {
+ console.log('running task')
+
+ const tool = tools[ task.tool ]
+ execFile(tool.cmd, [
+ path.join(__dirname, '../..', tool.script),
+ task.content_file.aiffPath,
+ task.style_file.aiffPath,
+ task.output_file.path,
+ ], (err, stdout, stderr) => {
+ console.log("DONE")
+ console.log(err)
+ console.log(stdout)
+ console.log(stderr)
+ console.log('create output file record...')
+ console.log(task.output_file.path)
+ task.stdout = stdout
+ task.stderr = stderr
+
+ db.models.file.create(task.output_file).then( file => {
+ task.output_file = file
+ task.output_file_id = file.id
+ resolve()
+ })
+ })
+ })
+}
+
+function processDone(task) {
+ task.completed = true
+ task.processing = false
+ console.log(task)
+ ipc.of.cortex && ipc.of.cortex.emit("updateTask", { task })
+ return db.models.task.update(task.id, task)
+}
+
+function constructFilePath (file) {
+ file.path = path.join(__dirname, '../..', 'public/data', String(file.folder_id), String(file.name))
+ switch (file.type) {
+ case 'audio':
+ file.aiffPath = file.path + '.aiff'
+ break
+ }
+}