Skip to content

Commit

Permalink
Job Status Update: Implement RabbitMQ Action (#1284)
Browse files Browse the repository at this point in the history
* add RabbitMQJobAction, fix how job config is stored

* rabbitmq queue fixes

* fix comment

* fixes based on PR comments, register action for job create, send json

* finalize rammitmq
  • Loading branch information
despadam authored Jul 22, 2024
1 parent c840a2a commit a1dfaf6
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 41 deletions.
3 changes: 3 additions & 0 deletions src/config/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
import { LogJobAction } from "../jobs/actions/logaction";
import { EmailJobAction } from "../jobs/actions/emailaction";
import { URLAction } from "src/jobs/actions/urlaction";
import { RabbitMQJobAction } from "src/jobs/actions/rabbitmqaction";
import * as fs from "fs";
import { merge } from "lodash";
import localconfiguration from "./localconfiguration";
Expand Down Expand Up @@ -237,9 +238,11 @@ export function registerDefaultActions() {
registerCreateAction(LogJobAction);
registerCreateAction(EmailJobAction);
registerCreateAction(URLAction);
registerCreateAction(RabbitMQJobAction);
// Status Update
registerStatusUpdateAction(LogJobAction);
registerStatusUpdateAction(EmailJobAction);
registerStatusUpdateAction(RabbitMQJobAction);
}

export type OidcConfig = ReturnType<typeof configuration>["oidc"];
Expand Down
4 changes: 2 additions & 2 deletions src/jobs/actions/logaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ export class LogJobAction<T> implements JobAction<T> {
}

async validate(dto: T) {
Logger.log("Validating CREATE job: " + JSON.stringify(dto), "LogJobAction");
Logger.log("Validating job: " + JSON.stringify(dto), "LogJobAction");
}

async performJob(job: JobClass) {
Logger.log("Performing CREATE job: " + JSON.stringify(job), "LogJobAction");
Logger.log("Performing job: " + JSON.stringify(job), "LogJobAction");
}

constructor(data: Record<string, any>) {
Expand Down
91 changes: 91 additions & 0 deletions src/jobs/actions/rabbitmqaction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { Logger, NotFoundException } from "@nestjs/common";
import amqp, { Connection } from "amqplib/callback_api";
import { JobAction } from "../config/jobconfig";
import { JobClass } from "../schemas/job.schema";


/**
* Publish a message in a RabbitMQ queue
*/
export class RabbitMQJobAction<T> implements JobAction<T> {
public static readonly actionType = "rabbitmq";
private connection;
private binding;

constructor(data: Record<string, any>) {
Logger.log(
"Initializing RabbitMQJobAction. Params: " + JSON.stringify(data),
"RabbitMQJobAction",
);

this.connection = {
protocol: "amqp",
hostname: data.hostname,
port: data.port,
username: data.username,
password: data.password,
};
this.binding = {
exchange: data.exchange,
queue: data.queue,
key: data.key
};
}

getActionType(): string {
return RabbitMQJobAction.actionType;
}

async validate(dto: T) {
Logger.log(
"Validating RabbitMQJobAction: " + JSON.stringify(dto),
"RabbitMQJobAction",
);

const connectionDetailsMissing = [undefined, ""].some(el => Object.values(this.connection).includes(el));
if (connectionDetailsMissing) {
throw new NotFoundException("RabbitMQ configuration is missing connection details.");
}

const bindingDetailsMissing = [undefined, ""].some(el => Object.values(this.binding).includes(el));
if (bindingDetailsMissing) {
throw new NotFoundException("RabbitMQ binding is missing exchange/queue/key details.");
}
}

async performJob(job: JobClass) {
Logger.log(
"Performing RabbitMQJobAction: " + JSON.stringify(job),
"RabbitMQJobAction",
);

amqp.connect(this.connection, (connectionError: Error, connection: Connection) => {
if (connectionError) {
Logger.error(
"Connection error in RabbitMQJobAction: " + JSON.stringify(connectionError.message),
"RabbitMQJobAction",
);
return;
}

connection.createChannel((channelError: Error, channel) => {
if (channelError) {
Logger.error(
"Channel error in RabbitMQJobAction: " + JSON.stringify(channelError.message),
"RabbitMQJobAction",
);
return;
}

channel.assertQueue(this.binding.queue, { durable: true });
channel.assertExchange(this.binding.exchange, "topic", { durable: true });
channel.bindQueue(this.binding.queue, this.binding.exchange, this.binding.key);
channel.sendToQueue(this.binding.queue, Buffer.from(JSON.stringify(job)));

channel.close(() => {
connection.close();
});
});
});
}
}
24 changes: 23 additions & 1 deletion src/jobs/config/jobConfig.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,33 @@
"headers": {
"accept": "application/json"
}
},
{
"actionType": "rabbitmq",
"hostname": "rabbitmq",
"port": 5672,
"username": "guest",
"password": "guest",
"exchange": "jobs.write",
"queue": "client.jobs.write",
"key": "jobqueue"
}
]
},
"statusUpdate": {
"auth": "archivemanager"
"auth": "archivemanager",
"actions": [
{
"actionType": "rabbitmq",
"hostname": "rabbitmq",
"port": 5672,
"username": "guest",
"password": "guest",
"exchange": "jobs.write",
"queue": "client.jobs.write",
"key": "jobqueue"
}
]
}
},
{
Expand Down
56 changes: 33 additions & 23 deletions src/jobs/config/jobconfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import { CreateJobAuth, JobsAuth } from "../types/jobs-auth.enum";
import Ajv from "ajv";
import { JobConfigSchema } from "./jobConfig.schema";


/**
* Encapsulates all responses to a particular job type (eg "archive")
*/
export class JobConfig {
jobType: string;
configVersion: string;
create: JobOperation<CreateJobDto>;
// read: JobReadAction[];
// read: JobOperation<ReadJobDto>;
statusUpdate: JobOperation<StatusUpdateJobDto>;

constructor(
Expand All @@ -54,23 +55,27 @@ export class JobConfig {
* @returns
*/
static parse(
data: Record<string, any>,
jobData: Record<string, any>,
configVersion: string
): JobConfig {
const type = data[JobsConfigSchema.JobType];
const type = jobData[JobsConfigSchema.JobType];
const create = JobOperation.parse<CreateJobDto>(
createActions,
data[AuthOp.Create],
jobData[AuthOp.Create],
);
const read = undefined; // "read" in data ? oneOrMore(data["read"]).map((json) => parseReadAction(json["action"])) : [];
const read = undefined;
const statusUpdate = JobOperation.parse<StatusUpdateJobDto>(
statusUpdateActions,
data[AuthOp.StatusUpdate],
jobData[AuthOp.StatusUpdate],
);
return new JobConfig(type, configVersion, create, read, statusUpdate);
}
}


/**
* Encapsulates all information for a particular job operation (eg "create", "statusUpdate")
*/
export class JobOperation<DtoType> {
auth: JobsAuth | undefined;
actions: JobAction<DtoType>[];
Expand Down Expand Up @@ -98,6 +103,7 @@ export class JobOperation<DtoType> {
}
}


/**
* Given a JSON object configuring a JobConfigAction.
*
Expand All @@ -122,6 +128,7 @@ function parseAction<DtoType>(
return new actionClass(data);
}


/**
* Superclass for all responses to Job changes
*/
Expand All @@ -130,16 +137,20 @@ export interface JobAction<DtoType> {
* Validate the DTO, throwing an HttpException for problems
*/
validate: (dto: DtoType) => Promise<void>;

/**
* Respond to the action
*/
performJob: (job: JobClass) => Promise<void>;

/**
* Return the actionType for this action. This should match the class's
* static actionType (used for constructing the class from the configuration file)
*/
getActionType(): string;
}


/**
* Describes the constructor and static members for JobAction implementations
*/
Expand All @@ -155,49 +166,48 @@ export type JobCreateAction = JobAction<CreateJobDto>;
// export type JobReadAction = JobAction<ReadJobDto>;
export type JobStatusUpdateAction = JobAction<StatusUpdateJobDto>;

/// Action registration

// type JobActionCtor<T> = (json: Record<string,any>) => JobAction<T>;

/**
* Action registration
*/
const createActions: Record<string, JobActionClass<CreateJobDto>> = {};
// const readActions: Record<string, JobActionCtor<ReadJobDto>> = {};
// const readActions: Record<string, JobActionClass<ReadJobDto>> = {};
const statusUpdateActions: Record<string, JobActionClass<StatusUpdateJobDto>> = {};

/**
* Registers an action to handle jobs of a particular type
* @param action
*/
export function registerCreateAction(action: JobActionClass<CreateJobDto>) {
export function registerCreateAction(
action: JobActionClass<CreateJobDto>
) {
createActions[action.actionType] = action;
}
/**
* List of action types with a registered action
* @returns
*/
export function getRegisteredCreateActions(): string[] {
return Object.keys(createActions);
}

/**
* Registers an action to handle jobs of a particular type
* @param action
*/
export function registerStatusUpdateAction(
action: JobActionClass<StatusUpdateJobDto>,
) {
statusUpdateActions[action.actionType] = action;
}

/**
* List of action types with a registered action
* @returns
*/
export function getRegisteredCreateActions(): string[] {
return Object.keys(createActions);
}

export function getRegisteredStatusUpdateActions(): string[] {
return Object.keys(statusUpdateActions);
}

/// Parsing

/**
* Parsing
*/
let jobConfig: JobConfig[] | null = null; // singleton

/**
* Load jobconfig.json file.
* Expects one or more JobConfig configurations (see JobConfig.parse)
Expand Down
1 change: 1 addition & 0 deletions src/jobs/interceptors/job-create.interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export class JobCreateInterceptor implements NestInterceptor {
});
}),
);

return jc;
}
}
Loading

0 comments on commit a1dfaf6

Please sign in to comment.