summaryrefslogtreecommitdiff
path: root/lib/worker
diff options
context:
space:
mode:
authorJules Laplace <julescarbon@gmail.com>2017-07-19 00:50:05 +0200
committerJules Laplace <julescarbon@gmail.com>2017-07-19 00:50:05 +0200
commit64e8c03dea044752bf3f2f228462721fe565f950 (patch)
tree41b48b67f69979bfc97be166129ee41c8dcb0c7f /lib/worker
parent11a70bc347587219b2ec7b63cf4a6ff69bb4199b (diff)
refactor all the worker stuff
Diffstat (limited to 'lib/worker')
-rw-r--r--lib/worker/index.js7
-rw-r--r--lib/worker/processFiles.js5
-rw-r--r--lib/worker/processTask.js115
3 files changed, 123 insertions, 4 deletions
diff --git a/lib/worker/index.js b/lib/worker/index.js
index 52f12e2..6733dd1 100644
--- a/lib/worker/index.js
+++ b/lib/worker/index.js
@@ -1,15 +1,16 @@
require('dotenv').config()
const ipc = require('node-ipc')
-const db = require('../db')
-const dbFile = db.crud(db.File)
const processFiles = require('./processFiles')
+const processTask = require('./processTask')
ipc.config.id = 'cortexworker'
ipc.config.retry = 1500
+ipc.config.silent = true
processFiles()
+processTask()
ipc.connectTo('cortex', () => {
ipc.of.cortex.on('connect', () => {
@@ -28,8 +29,10 @@ ipc.connectTo('cortex', () => {
})
ipc.of.cortex.on('job', (data) => {
ipc.log('received job: '.debug, data);
+ processTask()
})
ipc.of.cortex.on('task', (data) => {
ipc.log('received task: '.debug, data);
+ processTask()
})
})
diff --git a/lib/worker/processFiles.js b/lib/worker/processFiles.js
index 60e0ca5..c9b2724 100644
--- a/lib/worker/processFiles.js
+++ b/lib/worker/processFiles.js
@@ -1,11 +1,12 @@
const ipc = require('node-ipc')
const db = require('../db')
-const dbFile = db.crud(db.File)
+const dbFile = db.models.file
const path = require('path')
const ffprobe = require('node-ffprobe')
const execFile = require('child_process').execFile
const mimeFromExtension = require('./mimeFromExtension')
ffprobe.FFPROBE_PATH = process.env.FFPROBE_BINARY
+const Loader = require('../vendor/Loader')
let processing = false
@@ -13,7 +14,7 @@ module.exports = function processFiles() {
if (processing) return
processing = true
- dbFile.index({ processed: false }).then( (files) => {
+ dbFile.crud.index({ processed: false }).then( (files) => {
console.log(files.length + ' files left to process')
if (files.length === 0) {
processing = false
diff --git a/lib/worker/processTask.js b/lib/worker/processTask.js
new file mode 100644
index 0000000..3c7b789
--- /dev/null
+++ b/lib/worker/processTask.js
@@ -0,0 +1,115 @@
+const ipc = require('node-ipc')
+const db = require('../db')
+const tools = require('../tools')
+const fs = require('fs')
+const path = require('path')
+const execFile = require('child_process').execFile
+const Loader = require('../vendor/Loader')
+
+let processing = false
+
+module.exports = function processTask() {
+ if (processing) return
+ processing = true
+
+ db.models.task.index({ completed: false, limit: 1 }).then( (tasks) => {
+ // console.log(tasks.length + ' tasks left to process')
+ console.log('fetching tasks...')
+ if (! tasks || tasks.length === 0) {
+ console.log('> completed all tasks!')
+ processing = false
+ return
+ }
+
+ task = tasks.at ? tasks.at(0) : tasks[0]
+ console.log('running task #', task ? task.id : '??')
+ return processTaskPromise(task)
+ }).then( () => {
+ console.log('done')
+ if (processing) {
+ processing = false
+ setTimeout( processTask )
+ }
+ })
+}
+
+function processTaskPromise(task) {
+ return new Promise ( (resolve, reject) => {
+ initTask(task)
+ .then(() => { return runTask(task) })
+ .then(() => { return processDone(task) })
+ .then(() => { return resolve() })
+ })
+}
+
+function initTask() {
+ return new Promise ( (resolve, reject) => {
+ if (task.content_file.type !== 'audio') reject()
+ if (task.style_file.type !== 'audio') reject()
+
+ constructFilePath(task.content_file)
+ constructFilePath(task.style_file)
+
+ db.models.folder.findOrCreate({ name: 'output' })
+ .then( (folder) => {
+ task.output_file = {
+ type: 'audio',
+ name: (folder.name || 'output') + '_' + Date.now() + '.aiff',
+ folder_id: folder.id,
+ processed: false,
+ }
+ constructFilePath(task.output_file)
+ resolve()
+ })
+ })
+}
+
+function checkAccess(file) {
+ return new Promise ( (resolve, reject) => {
+ fs.access(file.aiffPath, fs.constants.R_OK, (err) => {
+ if (err) reject()
+ else resolve()
+ })
+ })
+}
+
+function runTask(fullPath) {
+ return new Promise ( (resolve, reject) => {
+ console.log('running task')
+
+ console.log('create output file record...')
+ console.log(task.output_file.path)
+
+ const tool = tools[ task.tool ]
+
+ execFile(tool.cmd, [
+ tool.script_path,
+ task.content_file.path,
+ task.style_file.path,
+ task.output_file.path,
+ ], (err, stdout, stderr) => {
+ console.log(stdout, stderr)
+ db.models.file.create(task.output_file).then( file => {
+ task.output_file = file
+ task.output_file_id = file.id
+ resolve()
+ })
+ })
+ })
+}
+
+function processDone(task) {
+ task.completed = true
+ // console.log(task)
+ ipc.of.cortex && ipc.of.cortex.emit("completed", { task: task })
+ return db.models.task.update(task.id, task)
+}
+
+function constructFilePath (file) {
+ file.path = path.join(__dirname, '../..', 'public/data', String(file.folder_id), String(file.name))
+ switch (file.type) {
+ case 'audio':
+ file.aiffPath = file.mime === 'audio/aiff' ? file.path : file.path + '.aiff'
+ break
+ }
+}