Skip to content

Commit

Permalink
Support Job Engine status and 2 way coms
Browse files Browse the repository at this point in the history
This implements the changes made by Fry to support a more complete interface between the Job Engine and a users browser. 

It retains the proxy between the raw socket on DexRun and the browser on web socket port 3000, but moves the Job Engine communication to port 3001 to support separate running of a job and direct connection to DexRun at the same time. 

Job Engine job status and data is communicated. show window commands in the job should cause a dialog to appear in the browser. Actions in that dialog will be communicated back to the job just as they would be if the job is running in DDE. 

Note that this needs the new "jobs.html" file and that needs the je_and_browser_code.js file from the dde/core folder. You can copy that file from dde/core to the share/www folder, or use the following command from share/www to create a symlink: 
`ln -s /root/Documents/dde/core/je_and_browser_code.js je_and_browser_code.js`
  • Loading branch information
JamesNewton authored Mar 17, 2020
1 parent 081cd08 commit 27ca559
Showing 1 changed file with 206 additions and 95 deletions.
301 changes: 206 additions & 95 deletions Firmware/www/httpd.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

var http = require('http');
var url = require('url'); //url parsing
var fs = require('fs'); //file system
Expand All @@ -16,58 +17,193 @@ const ws = require('ws'); //websocket
//var mime = require('mime'); //translate extensions into mime types
//skip that,it's stupidly big
var mimeTypes = {
"css": "text/css",
"html": "text/html",
"mp3":"audio/mpeg",
"mp4":"video/mp4",
"gif": "image/gif",
"jpeg": "image/jpeg",
"jpg": "image/jpeg",
"js": "text/javascript",
"mp3": "audio/mpeg",
"mp4": "video/mp4",
"png": "image/png",
"svg": "image/svg+xml",
"gif": "image/gif",
"js": "text/javascript",
"css": "text/css",
"txt": "text/plain"
};

const { spawn } = require('child_process');
var job

function spawn_job_proc(jobfile, out, end) {
//https://nodejs.org/api/child_process.html
job = spawn('node'
, ["core define_and_start_job "+jobfile]
, {cwd: '/root/Documents/dde', shell: true}
)
job.stdout.on('data', function(data) {
console.log(">"+data)
out(data.toString().replace('\n',"<BR/>").replace(" ","&nbsp;&nbsp"))
})
job.stderr.on('data', function(data) {
console.log("!"+data)
out("ERROR: "+data)
})
job.on('close', function(code) {
console.log("closed:"+code)
out("Finished "+jobfile+". Exit code:"+code)
return end()

var job_name_to_process = {}
function get_job_name_to_process(job_name) {
if(job_name_to_process.keep_alive) { //if there is such a process, then keep_alive is true
return job_name_to_process.keep_alive
}
else {
return job_name_to_process[job_name]
}
}
function set_job_name_to_process(job_name, process) { job_name_to_process[job_name] = process }
function remove_job_name_to_process(job_name) { delete job_name_to_process[job_name] }

//arg looks like "myjob.js", "myjob.dde", "myjob"
function extract_job_name(job_name_with_extension){
let dot_pos = job_name_with_extension.indexOf(".")
let job_name
if(dot_pos === -1) { job_name = job_name_with_extension }
else { job_name = job_name_with_extension.substring(0, dot_pos) }
return job_name
}

function serve_init_jobs(q, req, res){
//console.log("top of serve_init_jobs in server")
fs.readdir("/srv/samba/share/dde_apps/",
function(err, items){
let items_str = JSON.stringify(items)
//console.log("serve_init_jobs writing: " + items_str)
res.write(items_str)
res.end()
})
out("Started "+jobfile+" PID:"+job.pid+"<BR/>")
return job
}

//https://www.npmjs.com/package/ws
console.log("now making wss")
const wss = new ws.Server({port: 3001}) //server: http_server });
console.log("done making wss: " + wss)


function serve_job_button_click(browser_socket, mess_obj){
let job_name_with_extension = mess_obj.job_name_with_extension //includes ".js" suffix
console.log("\n\nserver top of serve_job_button_click with job_name_with_extension: " + job_name_with_extension)
let jobfile = "/srv/samba/share/dde_apps/" + job_name_with_extension //q.search.substr(1)
//console.log("serve_job_button_click with jobfile: " + jobfile)
let job_name = extract_job_name(job_name_with_extension)
//console.log("top of serve_job_button_click with job_name: " + job_name)
//console.log("serve_job_button_click with existing status_code: " + status_code)
let job_process = get_job_name_to_process(job_name) //warning: might be undefined.
//let server_response = res //to help close over
if(!job_process){
//https://nodejs.org/api/child_process.html
//https://blog.cloudboost.io/node-js-child-process-spawn-178eaaf8e1f9
job_process = spawn('node',
["core define_and_start_job " + jobfile], //a jobfile than ends in "/keep_alive" is handled specially in core/index.js
{cwd: '/root/Documents/dde', shell: true}
)
set_job_name_to_process(job_name, job_process)
console.log("just set job_name: " + job_name + " to new process: " + job_process)
job_process.stdout.on('data', function(data) {
console.log("\n\nserver: stdout.on data got data: " + data + "\n")
let data_str = data.toString()
//server_response.write(data_str) //pipe straight through to calling browser's handle_stdout
//https://github.com/expressjs/compression/issues/56 sez call flush even though it isn't documented.
//server_response.flushHeaders() //flush is deprecated.
browser_socket.send(data_str)
})

job_process.stderr.on('data', function(data) {
console.log("\n\njob: " + job_name + " got stderr with data: " + data)
remove_job_name_to_process(job_name)
//server_response.write("Job." + job_name + " errored with: " + data)
let lit_obj = {job_name: job_name,
kind: "show_job_button",
button_tooltip: "Server errored with: " + data,
button_color: "red"}
browser_socket.send("<for_server>" + JSON.stringify(lit_obj) + "</for_server>")
//server_response.end()
})
job_process.on('close', function(code) {
console.log("\n\nServer closed the process of Job: " + job_name + " with code: " + code)
if(code !== 0){
let lit_obj = {job_name: job_name,
kind: "show_job_button",
button_tooltip: "Errored with server close error code: " + code,
button_color: "red"}
browser_socket.send("<for_server>" + JSON.stringify(lit_obj) + "</for_server>")
}
remove_job_name_to_process(job_name)
//server_response.end()
})
}
else {
let code
if(job_name === "keep_alive") { //happens when transition from keep_alive box checked to unchecked
code = "set_keep_alive_value(" + mess_obj.keep_alive_value + ")"
}
else {
//code = "Job." + job_name + ".server_job_button_click()"
code = 'Job.maybe_define_and_server_job_button_click("' + jobfile + '")'
}
console.log("serve_job_button_click writing to job: " + job_name + " stdin: " + code)
//https://stackoverflow.com/questions/13230370/nodejs-child-process-write-to-stdin-from-an-already-initialised-process
job_process.stdin.setEncoding('utf-8');
job_process.stdin.write(code + "\n")
//job_process.stdin.end();
}
//serve_get_refresh(q, req, res)
//return serve_jobs(q, req, res) //res.end()
}

//see bottom of je_and_browser_code.js: submit_window for
//the properties of mess_obj
function serve_show_window_call_callback(browser_socket, mess_obj){
let callback_arg = mess_obj.callback_arg
let job_name = callback_arg.job_name
let job_process = get_job_name_to_process(job_name)
console.log("\n\nserve_show_window_call_callback got job_name: " + job_name + " and its process: " + job_process)
let code = mess_obj.callback_fn_name + "(" +
JSON.stringify(callback_arg) + ")"
//code = mess_obj.callback_fn_name + '({"is_submit": false})' //out('short str')" //just for testing
console.log("serve_show_window_call_callback made code: " + code)
job_process.stdin.setEncoding('utf-8');
job_process.stdin.write(code + "\n") //need the newline so that stdio.js readline will be called
}

function serve_file(q, req, res){
var filename = "/srv/samba/share/www/" + q.pathname
console.log("serving" + q.pathname)
fs.readFile(filename, function(err, data) {
if (err) {
res.writeHead(404, {'Content-Type': 'text/html'})
return res.end("404 Not Found")
}
res.setHeader('Access-Control-Allow-Origin', '*');
let mimeType = mimeTypes[ q.pathname.split(".").pop() ] || "application/octet-stream"
console.log("Content-Type:", mimeType)
res.setHeader("Content-Type", mimeType);
res.writeHead(200)
res.write(data)
return res.end()
})
}

//standard web server on port 80 to serve files
const httpd = http.createServer(function (req, res) {
var http_server = http.createServer(function (req, res) {
//see https://nodejs.org/api/http.html#http_class_http_incomingmessage
//for the format of q.
var q = url.parse(req.url, true)
if ("/"==q.pathname)
q.pathname="index.html"
if ("/runjob"==q.pathname) {
var jobfile = "/srv/samba/share/dde_apps/"+q.search.substr(1)
console.log("Running job "+jobfile+".")
res.writeHead(200, {'Content-Type': 'text/html'})
spawn_job_proc(jobfile, function(msg){res.write(msg)}, function(){res.end()})
return
}
if ("/jobs"==q.pathname) {
console.log("serving job list")
console.log("web server passed pathname: " + q.pathname)
if (q.pathname === "/") {
q.pathname = "index.html"
}
if (q.pathname === "/init_jobs") {
serve_init_jobs(q, req, res)
}
//else if (q.pathname === "/get_refresh") { serve_get_refresh(q, req, res) }
//else if(q.pathname === "/job_button_click") {
// serve_job_button_click(q, req, res)
//}
//else if(q.pathname === "/show_window_button_click") {
// serve_show_window_button_click(q, req, res)
//}
else {
serve_file(q, req, res)
}
})

http_server.listen(80)
console.log("listening on port 80")

/* orig james N code
function jobs(q, res){
console.log("serving job list")
fs.readdir("/srv/samba/share/dde_apps/", function(err, items) {
if (err) {
console.log("ERROR:"+err)
Expand All @@ -80,65 +216,39 @@ const httpd = http.createServer(function (req, res) {
}
return res.end()
})
return //res.end()
return
}
*/


wss.on('connection', function(the_ws, req) {
console.log("\n\nwss got connection: " + the_ws)
console.log("\nwss SAME AS the_ws : " + (wss === the_ws))
let browser_socket = the_ws //the_socket used when stdout from job engine comes to the web server process
the_ws.on('message', function(message) {
console.log('\n\nwss server on message received: %s', message);
//the_ws.send("server sent this to browser in response to: " + message)
let mess_obj = JSON.parse(message)
console.log("\nwss server on message receieved kind: " + mess_obj.kind)
if(mess_obj.kind === "keep_alive_click") {
serve_job_button_click(browser_socket, mess_obj)
}
var filename = "/srv/samba/share/www/" + q.pathname
console.log("serving"+q.pathname)
fs.readFile(filename, function(err, data) {
if (err) {
res.writeHead(404, {'Content-Type': 'text/html'})
return res.end("404 Not Found")
}
res.setHeader('Access-Control-Allow-Origin', '*');
let mimeType = mimeTypes[ q.pathname.split(".").pop() ] || "application/octet-stream"
console.log("Content-Type:",mimeType)
res.setHeader("Content-Type", mimeType);
res.writeHead(200)
res.write(data)
return res.end()
});
else if(mess_obj.kind === "job_button_click") {
serve_job_button_click(browser_socket, mess_obj)
}
else if(mess_obj.kind === "show_window_call_callback"){
serve_show_window_call_callback(browser_socket, mess_obj)
}
else {
console.log("\n\nwss server received invalid message kind: " + mess_obj.kind)
}
})
the_ws.send('websocket connected.\n')
})
httpd.listen(80)
console.log("listing on port 80")

//handle websocket upgrade request.
let job_wss = new ws.Server({ noServer: true })
job_wss.child_process = null //reserve a spot for the job process
httpd.on('upgrade', function upgrade(request, socket, head) {
const pathname = url.parse(request.url).pathname
if (pathname === '/runjob') { //request to run a job via web socket
job_wss.handleUpgrade( request, socket, head, function done(ws) {
job_wss.emit('connection', ws, request)
}) //when connected, trigger ws Server's .on('connection') callback
} else { //what the heck is this for?
socket.destroy() //reject it.
}
})
job_wss.on('connection', function connection(wsocket, req) {
let jobname = url.parse(req.url, true).search.substr(1)
let jobfile = "/srv/samba/share/dde_apps/"+jobname
console.log("job_wss connection "+jobname)
job_wss.child_process = spawn_job_proc(jobfile
, function(msg){ //data from job
console.log("job says:"+msg)
wsocket.send(msg)
}
, function(){ //job ended
console.log("job ended")
job_wss.child_process = null
wsocket.close()
}
)
wsocket.on('message', function incoming(msg) {
// console.log("\n\n\nbrowser says:"+msg+".\n\n\n")
let job = job_wss.child_process
if (job) {
console.log("\n\n\nbrowser says:"+msg+".\n\n\n")
job.stdin.write(msg+"\n")
}
});
})



//websocket server that connects to Dexter
//socket server to accept websockets from the browser on port 3000
//and forward them out to DexRun as a raw socket
var browser = new ws.Server({ port:3000 })
Expand Down Expand Up @@ -204,3 +314,4 @@ browser.on('connection', function connection(socket, req) {

//test to see if we can get a status update from DexRun
//dexter.write("1 1 1 undefined g ;")

0 comments on commit 27ca559

Please sign in to comment.