summaryrefslogtreecommitdiff
path: root/app/relay/runner.js
blob: e7aca721a02e3354283e6bddeac3af21fc1b966d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// monitors which process is currently running
// kills it if need be.... murder

import { execFile, spawn } from 'child_process'
import interpreters from './interpreters'
import modules from './modules'
import { kill } from 'tree-kill'
import { remote } from './remote'

export const state = {
	current_cpu_task: null,
	current_gpu_task: null,
}

export function get_current_cpu_task(){
	return state.current_cpu_task
}

export function get_current_gpu_task(){
	return state.current_gpu_task
}

export function get_current_task(processor) {
  if (processor === 'cpu') {
    return state.current_cpu_task
  } else {
    return state.current_gpu_task
  }
}

export function status () {
  return state
}

export function build_params(module, task) {
	const activity = module.activities[task.activity]
	const interpreter = interpreters[activity.type]
	if (typeof activity.params === 'function') {
		params = activity.params(task)
	}
	else {
		const opt = JSON.parse(task.opt)
		const opt_params = Object.keys(opt).map(key => {
			const flag = '--' + key.replace(/-/g, '_')
			const value = opt[key]
			if (value === 'true') {
				return [flag]
			}
			return [flag, value]
		}).reduce((acc, cur) => acc.concat(cur), [])
		params = [ activity.script ].concat(activity.params || []).concat(opt_params)
	}
	return {
		activity,
		params
	}
}

export function run_system_command(cmd, cb) {
  console.log('running system command:', cmd)
  switch(cmd) {
    case 'nvidia-smi':
    case 'ps':
    case 'uptime':
    case 'w':
      execFile(cmd, cb)
      break
    case 'df':
      execFile('df', ['-h'], cb)
      break
    default:
    	cb({ error: 'no such command' })
    	break
  }
}

export function run_task(task, preempt, watch){
	const module = modules[task.module]
	if (! module) throw new Error("No such module")
	const { activity, interpreter, params } = build_params(module, task)

  if (activity.cpu && state.current_cpu_task) {
    if (preempt) {
      console.log('preempting currently running GPU task')
    } else {
      return { type: 'error', error: 'task already running on cpu' }
    }
  } else {
    if (preempt) {
      console.log('preempting currently running CPU task')
    } else {
      return { type: 'error', error: 'task already running on cpu' }
    }
  }

	console.log('running task', activity.name)
	console.log(activity.interpreter, activity.script, params)
	const subprocess = spawn(activity.interpreter, params)
	if (activity.gpu) {
		state.current_gpu_task = subprocess
	}
	else {
		state.current_cpu_task = subprocess
	}
	subprocess.on('error', (err) => {
		console.log('process error', subprocess.pid, err)
    remote.emit('task_res', { type: 'task_error', task, err })
	})
	subprocess.on('close', () => {
		console.log('process ended', subprocess.pid)
    remote.emit('task_res', { type: 'task_finish', task })
	})
  if (watch) {
    response.stdout.on('data', data => {
      remote.emit('task_res', { type: 'stdout', data })
    })
    response.stderr.on('data', data => {
      remote.emit('task_res', { type: 'stderr', data })
    })
  }
}

export function kill_task(subprocess){
	kill(subprocess.pid)
}