summaryrefslogtreecommitdiff
path: root/lib/worker
diff options
context:
space:
mode:
Diffstat (limited to 'lib/worker')
-rw-r--r--lib/worker/index.js21
-rw-r--r--lib/worker/processFiles.js33
-rw-r--r--lib/worker/processTasks.js (renamed from lib/worker/processTask.js)73
3 files changed, 75 insertions, 52 deletions
diff --git a/lib/worker/index.js b/lib/worker/index.js
index 6733dd1..0e2ff44 100644
--- a/lib/worker/index.js
+++ b/lib/worker/index.js
@@ -3,36 +3,35 @@ require('dotenv').config()
const ipc = require('node-ipc')
const processFiles = require('./processFiles')
-const processTask = require('./processTask')
+const processTasks = require('./processTasks')
ipc.config.id = 'cortexworker'
ipc.config.retry = 1500
ipc.config.silent = true
processFiles()
-processTask()
+processTasks()
ipc.connectTo('cortex', () => {
ipc.of.cortex.on('connect', () => {
ipc.log('## connected to cortex ##', ipc.config.delay);
- // ipc.of.cortex.emit('message', 'hello')
})
ipc.of.cortex.on('disconnect', () => {
ipc.log('disconnected'.notice)
})
ipc.of.cortex.on('message', (data) => {
- ipc.log('received message!'.debug, data)
+ ipc.log('received message'.debug, data)
})
- ipc.of.cortex.on('process', (data) => {
- ipc.log('received process: '.debug, data);
+ ipc.of.cortex.on('processFiles', (data) => {
+ ipc.log('>>> received processFiles'.debug, data);
processFiles()
})
ipc.of.cortex.on('job', (data) => {
- ipc.log('received job: '.debug, data);
- processTask()
+ ipc.log('received job'.debug, data);
+ processTasks()
})
- ipc.of.cortex.on('task', (data) => {
- ipc.log('received task: '.debug, data);
- processTask()
+ ipc.of.cortex.on('processTasks', (data) => {
+ ipc.log('>>> received processTasks: '.debug, data);
+ processTasks()
})
})
diff --git a/lib/worker/processFiles.js b/lib/worker/processFiles.js
index c9b2724..2b89aea 100644
--- a/lib/worker/processFiles.js
+++ b/lib/worker/processFiles.js
@@ -14,18 +14,17 @@ module.exports = function processFiles() {
if (processing) return
processing = true
- dbFile.crud.index({ processed: false }).then( (files) => {
+ dbFile.index({ processed: false, limit: 1 }).then( (files) => {
console.log(files.length + ' files left to process')
- if (files.length === 0) {
+ if (! files || files.length === 0) {
processing = false
return
}
- file = files.at(0)
- console.log('processing', file.get('name'))
- const mimeType = mimeFromExtension(file.get('name'))
- file.set('mime', mimeType.mime)
- file.set('type', mimeType.type)
+ let file = files.at ? files.at(0) : files[0]
+ const mimeType = mimeFromExtension(file.name)
+ file.mime = mimeType.mime
+ file.type = mimeType.type
switch (mimeType.type) {
case 'audio':
return processAudio(file)
@@ -42,9 +41,9 @@ module.exports = function processFiles() {
}
function processAudio(file) {
- const filePath = path.join(__dirname, '../..', 'public/data', String(file.get('folder_id')))
+ const filePath = path.join(__dirname, '../..', 'public/data', String(file.folder_id))
// console.log(filePath)
- const fullPath = path.join(filePath, file.get('name'))
+ const fullPath = path.join(filePath, file.name)
console.log(fullPath)
return new Promise ( (resolve, reject) => {
@@ -83,8 +82,14 @@ function cacheFfprobe(file, fullPath) {
return new Promise ( (resolve, reject) => {
console.log('ffprobe')
ffprobe(fullPath, function(err, probeData) {
- file.set('duration', probeData.format.duration)
- file.set('analysis', JSON.stringify(probeData))
+ if (err) {
+ console.error('\n------------------------- !!!\n\n')
+ console.error(err)
+ }
+ if (probeData) {
+ file.duration = probeData.format.duration
+ }
+ file.analysis = JSON.stringify(probeData)
resolve()
})
})
@@ -132,7 +137,7 @@ function processImage(file) {
}
function processDone(file) {
- file.set({ processed: true })
- ipc.of.cortex && ipc.of.cortex.emit("processed", { file: file })
- return file.save()
+ file.processed = true
+ ipc.of.cortex && ipc.of.cortex.emit("updateFile", { file: file })
+ return dbFile.update(file.id, file)
} \ No newline at end of file
diff --git a/lib/worker/processTask.js b/lib/worker/processTasks.js
index 8afeb6d..c60aaf5 100644
--- a/lib/worker/processTask.js
+++ b/lib/worker/processTasks.js
@@ -5,30 +5,38 @@ const fs = require('fs')
const path = require('path')
const execFile = require('child_process').execFile
const Loader = require('../vendor/Loader')
+const processFiles = require('./processFiles')
let processing = false
-module.exports = function processTask() {
+module.exports = function processTasks() {
if (processing) return
processing = true
+ console.log('fetching tasks...')
db.models.task.index({ completed: false, limit: 1 }).then( (tasks) => {
// console.log(tasks.length + ' tasks left to process')
- console.log('fetching tasks...')
if (! tasks || tasks.length === 0) {
console.log('> completed all tasks!')
processing = false
return
}
- task = tasks.at ? tasks.at(0) : tasks[0]
- console.log('running task #', task ? task.id : '??')
- return processTaskPromise(task)
+ let task = tasks.at ? tasks.at(0) : tasks[0]
+ if (task) {
+ console.log('running task #', task.id)
+ return processTaskPromise(task)
+ }
+ else {
+ console.log('no more tasks')
+ return new Promise( resolve => resolve() )
+ }
}).then( () => {
console.log('done')
if (processing) {
processing = false
- setTimeout( processTask )
+ setTimeout( processFiles )
+ setTimeout( processTasks )
}
})
}
@@ -42,7 +50,7 @@ function processTaskPromise(task) {
})
}
-function initTask() {
+function initTask(task) {
return new Promise ( (resolve, reject) => {
if (task.content_file.type !== 'audio') reject()
if (task.style_file.type !== 'audio') reject()
@@ -50,16 +58,21 @@ function initTask() {
constructFilePath(task.content_file)
constructFilePath(task.style_file)
- db.models.folder.findOrCreate({ name: 'output' })
- .then( (folder) => {
- task.output_file = {
- type: 'audio',
- name: (folder.name || 'output') + '_' + Date.now() + '.aiff',
- folder_id: folder.id,
- processed: false,
- }
- constructFilePath(task.output_file)
- resolve()
+ task.processing = true
+ db.models.task.update(task.id, task).then( () => {
+ ipc.of.cortex && ipc.of.cortex.emit("updateTask", { task })
+ db.models.folder.findOrCreate({ name: 'output' })
+ .then( (folder) => {
+ task.output_file = {
+ type: 'audio',
+ name: (folder.name || 'output') + '_' + Date.now() + '.aiff',
+ folder_id: folder.id,
+ processed: false,
+ generated: true,
+ }
+ constructFilePath(task.output_file)
+ resolve()
+ })
})
})
}
@@ -73,21 +86,26 @@ function checkAccess(file) {
})
}
-function runTask(fullPath) {
+function runTask(task) {
return new Promise ( (resolve, reject) => {
console.log('running task')
- console.log('create output file record...')
- console.log(task.output_file.path)
-
const tool = tools[ task.tool ]
execFile(tool.cmd, [
- tool.script_path,
- task.content_file.path,
- task.style_file.path,
+ path.join(__dirname, '../..', tool.script),
+ task.content_file.aiffPath,
+ task.style_file.aiffPath,
task.output_file.path,
], (err, stdout, stderr) => {
- console.log(stdout, stderr)
+ console.log("DONE")
+ console.log(err)
+ console.log(stdout)
+ console.log(stderr)
+ console.log('create output file record...')
+ console.log(task.output_file.path)
+ task.stdout = stdout
+ task.stderr = stderr
+
db.models.file.create(task.output_file).then( file => {
task.output_file = file
task.output_file_id = file.id
@@ -99,8 +117,9 @@ function runTask(fullPath) {
function processDone(task) {
task.completed = true
+ task.processing = false
console.log(task)
- ipc.of.cortex && ipc.of.cortex.emit("completed", { task: task })
+ ipc.of.cortex && ipc.of.cortex.emit("updateTask", { task })
return db.models.task.update(task.id, task)
}
@@ -108,7 +127,7 @@ function constructFilePath (file) {
file.path = path.join(__dirname, '../..', 'public/data', String(file.folder_id), String(file.name))
switch (file.type) {
case 'audio':
- file.aiffPath = file.mime === 'audio/aiff' ? file.path : file.path + '.aiff'
+ file.aiffPath = file.path + '.aiff'
break
}
}