Skip to content

Commit

Permalink
Merge branch 'feature/action-server' of github.com:NASA-AMMOS/aerie i…
Browse files Browse the repository at this point in the history
…nto feature/action-server
  • Loading branch information
cohansen committed Mar 4, 2025
2 parents 08b7877 + 3d9a9aa commit 0d93314
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 16 deletions.
117 changes: 101 additions & 16 deletions action-server/src/app.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import express from "express";
import {configuration} from "./config";
import {jsExecute} from "./utils/codeRunner";
import {extractSchemas, jsExecute} from "./utils/codeRunner";
import {isActionRunRequest, validateActionRunRequest} from "./utils/validators";
import {ActionResponse} from "./type/types";
import {ActionsDbManager} from "./db";
import {Pool, PoolClient} from "pg";
import {Pool, PoolClient, Notification} from "pg";

import {readFile} from "fs/promises";
import {corsMiddleware, jsonErrorMiddleware} from "./middleware";
import * as path from "node:path";

const app = express();

Expand All @@ -20,6 +21,7 @@ app.use(corsMiddleware);

// Route for running a JS action
app.post("/run-action", async (req, res) => {
// TODO: old - deprecate?
if (!isActionRunRequest(req.body)) {
const msg = validateActionRunRequest(req.body);
throw new Error(msg || "Unknown");
Expand Down Expand Up @@ -48,6 +50,88 @@ const server = app.listen(port, () => {

app.use(jsonErrorMiddleware);

// -- begin PG event handling

async function readFileFromStore(fileName: string): Promise<string> {
// read file from aerie file store and return [resolve] it as a string
const fileStoreBasePath = `/usr/src/app/action_file_store`; // todo get from env
const filePath = path.join(fileStoreBasePath, fileName);
console.log(`path is ${filePath}`);
return await readFile(filePath, 'utf-8');
}

type ActionDefinitionInsertedPayload = {
action_definition_id: number,
action_file_path: string
}
async function handleActionDefinition(payload: ActionDefinitionInsertedPayload) {
console.log("action definition inserted");
// pre-process and extract schemas
const actionJS = await readFileFromStore(payload.action_file_path);
console.log(actionJS);

const schemas = await extractSchemas(actionJS);

console.log(`schemas ${JSON.stringify(schemas, null, 2)}`);

// todo: set schemas on the DB row?
const pool = ActionsDbManager.getDb();
const query = `
UPDATE actions.action_definition
SET
parameter_schema = parameter_schema || $1::jsonb,
settings_schema = settings_schema || $2::jsonb
WHERE id = $3
RETURNING *;
`;

try {
const res = await pool.query(query, [
JSON.stringify(schemas.paramDefs),
JSON.stringify(schemas.settingDefs),
payload.action_definition_id,
]);
console.log("Updated action_definition:", res.rows[0]);
} catch (err) {
console.error("Error updating action_definition:", err);
}
}

type ActionRunInsertedPayload = {
settings: Record<string, any>,
parameters: Record<string, any>,
action_definition_id: number,
workspace_id: number,
action_file_path: string
}

async function handleActionRun(payload: ActionRunInsertedPayload) {
console.log("action run inserted");
// event payload contains a file path for the action file which should be run
const actionJS = await readFileFromStore(payload.action_file_path);
console.log(actionJS);

const parameters = payload.parameters;
const settings = payload.settings;

// TODO: how to handle auth tokens??
// const authToken = req.header("authorization");
// if (!authToken) console.warn("No valid `authorization` header in action-run request");

// todo: maintain a queue and enqueue run requests
// todo: use piscina worker pool to run in separate thread
// todo: run the action file, put results in the same DB row and mark status as successful
// todo: try/catch - need to handle errors manually since not in express handler?
const jsRun = await jsExecute(actionJS, parameters, settings, "");

const response = {
results: jsRun.results,
console: jsRun.console,
errors: jsRun.errors,
} as ActionResponse;
console.log('finished run');
console.log(response);
}

let pool: Pool | undefined;
let listenClient: PoolClient | undefined;
Expand All @@ -56,27 +140,28 @@ async function initDb() {
ActionsDbManager.init();
pool = ActionsDbManager.getDb();

// todo:
// todo: check for definitions/runs that may have been inserted while action-server was down (ie. missed notifs) & process them?

// listen for `action_run_inserted` events from postgres
// which occur when a user inserts a row in the `action_run` table, signifying a run request
listenClient = await pool.connect();
// these occur when user inserts row in `action_definition`, need to pre-process to extract the schemas
listenClient.query('LISTEN action_definition_inserted');
// these occur when a user inserts a row in the `action_run` table, signifying a run request
listenClient.query('LISTEN action_run_inserted');

listenClient.on('notification', async (msg) => {
console.log("action_run_inserted");
console.log(JSON.stringify(msg));

// event payload contains a file path for the action file which should be run
if(!msg || !msg.payload) return;
console.info(`PG notify event: ${JSON.stringify(msg, null, 2)}`);
if(!msg || !msg.payload) {
console.warn(`warning: PG event with no message or payload: ${JSON.stringify(msg, null, 2)}`);
return;
}
const payload = JSON.parse(msg.payload);
const filePath = payload.action_file_path;

console.log(`path is ${filePath}`);
const actionFile = await readFile(filePath, 'utf-8');
console.log(actionFile);
// todo: maintain a queue and enqueue run requests
// todo: run the action file, put results in the same DB row and mark status as successful
if(msg.channel === "action_definition_inserted") {
// todo should these be awaited?
await handleActionDefinition(payload);
} else if(msg.channel === "action_run_inserted") {
await handleActionRun(payload);
}
});
}
initDb();
Expand Down
58 changes: 58 additions & 0 deletions action-server/src/utils/codeRunner.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
import * as vm from "node:vm";
import { ActionResponse, ActionResults, ConsoleOutput } from "../type/types";

// function getConsoleHandlers(oldConsole: any) {
// return {
// ...oldConsole,
// log: (...args: any[]) => {
// consoleOutput.log.push(args.join(" "));
// },
// debug: (...args: any[]) => {
// consoleOutput.debug.push(args.join(" "));
// },
// info: (...args: any[]) => {
// consoleOutput.info.push(args.join(" "));
// },
// warn: (...args: any[]) => {
// consoleOutput.warn.push(args.join(" "));
// },
// error: (...args: any[]) => {
// consoleOutput.error.push(args.join(" "));
// },
// }
// }

export const jsExecute = async (
code: string,
parameters: Record<string, any>,
Expand All @@ -9,6 +30,9 @@ export const jsExecute = async (
): Promise<ActionResponse> => {
/** Array to store console output. */
const consoleOutput: ConsoleOutput = { log: [], debug: [], info: [], error: [], warn: [] };

// create a clone of the global object (including getters/setters/non-enumerable properties)
// to be passed to the context so it has access to eg. node built-ins
let aerieGlobal = Object.defineProperties({ ...global }, Object.getOwnPropertyDescriptors(global));

aerieGlobal.console = {
Expand All @@ -30,6 +54,9 @@ export const jsExecute = async (
},
};

// need to initialize exports for the module to work correctly
aerieGlobal.exports = {};

const context = vm.createContext(aerieGlobal);

try {
Expand All @@ -55,3 +82,34 @@ export const jsExecute = async (
});
}
};

// todo correct return type for schemas?
export const extractSchemas = async (code: string): Promise<any> => {
// todo: do we need to pass globals/console for this part?

// need to initialize exports for the cjs module to work correctly
const context = vm.createContext({exports: {}});

try {
vm.runInContext(code, context);
const {paramDefs, settingDefs} = context.exports;
return {paramDefs, settingDefs};
} catch (error: any) {
// wrap `throw 10` into a `new throw(10)`
let errorResponse: Error;
if ((error !== null && typeof error !== "object") || !("message" in error && "stack" in error)) {
errorResponse = new Error(String(error));
} else {
errorResponse = error;
}
return Promise.resolve({
results: null,
errors: {
stack: errorResponse.stack,
message: errorResponse.message,
cause: errorResponse.cause,
},
});
}
};

0 comments on commit 0d93314

Please sign in to comment.