Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job Status Update: Implement RabbitMQ Action #1284

Merged
merged 6 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/config/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
import { LogJobAction } from "../jobs/actions/logaction";
import { EmailJobAction } from "../jobs/actions/emailaction";
import { URLAction } from "src/jobs/actions/urlaction";
import { RabbitMQJobAction } from "src/jobs/actions/rabbitmqaction";
import * as fs from "fs";
import { merge } from "lodash";
import localconfiguration from "./localconfiguration";
Expand Down Expand Up @@ -237,9 +238,11 @@ export function registerDefaultActions() {
registerCreateAction(LogJobAction);
registerCreateAction(EmailJobAction);
registerCreateAction(URLAction);
registerCreateAction(RabbitMQJobAction);
// Status Update
registerStatusUpdateAction(LogJobAction);
registerStatusUpdateAction(EmailJobAction);
registerStatusUpdateAction(RabbitMQJobAction);
}

export type OidcConfig = ReturnType<typeof configuration>["oidc"];
Expand Down
4 changes: 2 additions & 2 deletions src/jobs/actions/logaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ export class LogJobAction<T> implements JobAction<T> {
}

async validate(dto: T) {
Logger.log("Validating CREATE job: " + JSON.stringify(dto), "LogJobAction");
Logger.log("Validating job: " + JSON.stringify(dto), "LogJobAction");
}

async performJob(job: JobClass) {
Logger.log("Performing CREATE job: " + JSON.stringify(job), "LogJobAction");
Logger.log("Performing job: " + JSON.stringify(job), "LogJobAction");
}

constructor(data: Record<string, any>) {
Expand Down
91 changes: 91 additions & 0 deletions src/jobs/actions/rabbitmqaction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { Logger, NotFoundException } from "@nestjs/common";
import amqp, { Connection } from "amqplib/callback_api";
import { JobAction } from "../config/jobconfig";
import { JobClass } from "../schemas/job.schema";


/**
* Publish a message in a RabbitMQ queue
*/
export class RabbitMQJobAction<T> implements JobAction<T> {
public static readonly actionType = "rabbitmq";
private connection;
private binding;

constructor(data: Record<string, any>) {
Logger.log(
"Initializing RabbitMQJobAction. Params: " + JSON.stringify(data),
"RabbitMQJobAction",
);

this.connection = {
protocol: "amqp",
hostname: data.hostname,
port: data.port,
username: data.username,
password: data.password,
};
this.binding = {
exchange: data.exchange,
queue: data.queue,
key: data.key
};
}

getActionType(): string {
return RabbitMQJobAction.actionType;
}

async validate(dto: T) {
Logger.log(
"Validating RabbitMQJobAction: " + JSON.stringify(dto),
"RabbitMQJobAction",
);

const connectionDetailsMissing = [undefined, ""].some(el => Object.values(this.connection).includes(el));
if (connectionDetailsMissing) {
throw new NotFoundException("RabbitMQ configuration is missing connection details.");
}

const bindingDetailsMissing = [undefined, ""].some(el => Object.values(this.binding).includes(el));
if (bindingDetailsMissing) {
throw new NotFoundException("RabbitMQ binding is missing exchange/queue/key details.");
}
}

async performJob(job: JobClass) {
Logger.log(
"Performing RabbitMQJobAction: " + JSON.stringify(job),
"RabbitMQJobAction",
);

amqp.connect(this.connection, (connectionError: Error, connection: Connection) => {
if (connectionError) {
Logger.error(
"Connection error in RabbitMQJobAction: " + JSON.stringify(connectionError.message),
"RabbitMQJobAction",
);
return;
}

connection.createChannel((channelError: Error, channel) => {
if (channelError) {
Logger.error(
"Channel error in RabbitMQJobAction: " + JSON.stringify(channelError.message),
"RabbitMQJobAction",
);
return;
}

channel.assertQueue(this.binding.queue, { durable: true });
channel.assertExchange(this.binding.exchange, "topic", { durable: true });
channel.bindQueue(this.binding.queue, this.binding.exchange, this.binding.key);
channel.sendToQueue(this.binding.queue, Buffer.from(JSON.stringify(job)));

channel.close(() => {
connection.close();
});
});
});
}
}
24 changes: 23 additions & 1 deletion src/jobs/config/jobConfig.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,33 @@
"headers": {
"accept": "application/json"
}
},
{
"actionType": "rabbitmq",
"hostname": "rabbitmq",
"port": 5672,
"username": "guest",
"password": "guest",
"exchange": "jobs.write",
"queue": "client.jobs.write",
"key": "jobqueue"
}
]
},
"statusUpdate": {
"auth": "archivemanager"
"auth": "archivemanager",
"actions": [
{
"actionType": "rabbitmq",
"hostname": "rabbitmq",
"port": 5672,
"username": "guest",
"password": "guest",
"exchange": "jobs.write",
"queue": "client.jobs.write",
"key": "jobqueue"
}
]
}
},
{
Expand Down
56 changes: 33 additions & 23 deletions src/jobs/config/jobconfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import { CreateJobAuth, JobsAuth } from "../types/jobs-auth.enum";
import Ajv from "ajv";
import { JobConfigSchema } from "./jobConfig.schema";


/**
* Encapsulates all responses to a particular job type (eg "archive")
*/
export class JobConfig {
jobType: string;
configVersion: string;
create: JobOperation<CreateJobDto>;
// read: JobReadAction[];
// read: JobOperation<ReadJobDto>;
statusUpdate: JobOperation<StatusUpdateJobDto>;

constructor(
Expand All @@ -54,23 +55,27 @@ export class JobConfig {
* @returns
*/
static parse(
data: Record<string, any>,
jobData: Record<string, any>,
configVersion: string
): JobConfig {
const type = data[JobsConfigSchema.JobType];
const type = jobData[JobsConfigSchema.JobType];
const create = JobOperation.parse<CreateJobDto>(
createActions,
data[AuthOp.Create],
jobData[AuthOp.Create],
);
const read = undefined; // "read" in data ? oneOrMore(data["read"]).map((json) => parseReadAction(json["action"])) : [];
const read = undefined;
const statusUpdate = JobOperation.parse<StatusUpdateJobDto>(
statusUpdateActions,
data[AuthOp.StatusUpdate],
jobData[AuthOp.StatusUpdate],
);
return new JobConfig(type, configVersion, create, read, statusUpdate);
}
}


/**
* Encapsulates all information for a particular job operation (eg "create", "statusUpdate")
*/
export class JobOperation<DtoType> {
auth: JobsAuth | undefined;
actions: JobAction<DtoType>[];
Expand Down Expand Up @@ -98,6 +103,7 @@ export class JobOperation<DtoType> {
}
}


/**
* Given a JSON object configuring a JobConfigAction.
*
Expand All @@ -122,6 +128,7 @@ function parseAction<DtoType>(
return new actionClass(data);
}


/**
* Superclass for all responses to Job changes
*/
Expand All @@ -130,16 +137,20 @@ export interface JobAction<DtoType> {
* Validate the DTO, throwing an HttpException for problems
*/
validate: (dto: DtoType) => Promise<void>;

/**
* Respond to the action
*/
performJob: (job: JobClass) => Promise<void>;

/**
* Return the actionType for this action. This should match the class's
* static actionType (used for constructing the class from the configuration file)
*/
getActionType(): string;
}


/**
* Describes the constructor and static members for JobAction implementations
*/
Expand All @@ -155,49 +166,48 @@ export type JobCreateAction = JobAction<CreateJobDto>;
// export type JobReadAction = JobAction<ReadJobDto>;
export type JobStatusUpdateAction = JobAction<StatusUpdateJobDto>;

/// Action registration

// type JobActionCtor<T> = (json: Record<string,any>) => JobAction<T>;

/**
* Action registration
*/
const createActions: Record<string, JobActionClass<CreateJobDto>> = {};
// const readActions: Record<string, JobActionCtor<ReadJobDto>> = {};
// const readActions: Record<string, JobActionClass<ReadJobDto>> = {};
const statusUpdateActions: Record<string, JobActionClass<StatusUpdateJobDto>> = {};

/**
* Registers an action to handle jobs of a particular type
* @param action
*/
export function registerCreateAction(action: JobActionClass<CreateJobDto>) {
export function registerCreateAction(
action: JobActionClass<CreateJobDto>
) {
createActions[action.actionType] = action;
}
/**
* List of action types with a registered action
* @returns
*/
export function getRegisteredCreateActions(): string[] {
return Object.keys(createActions);
}

/**
* Registers an action to handle jobs of a particular type
* @param action
*/
export function registerStatusUpdateAction(
action: JobActionClass<StatusUpdateJobDto>,
) {
statusUpdateActions[action.actionType] = action;
}

/**
* List of action types with a registered action
* @returns
*/
export function getRegisteredCreateActions(): string[] {
return Object.keys(createActions);
}

export function getRegisteredStatusUpdateActions(): string[] {
return Object.keys(statusUpdateActions);
}

/// Parsing

/**
* Parsing
*/
let jobConfig: JobConfig[] | null = null; // singleton

/**
* Load jobconfig.json file.
* Expects one or more JobConfig configurations (see JobConfig.parse)
Expand Down
1 change: 1 addition & 0 deletions src/jobs/interceptors/job-create.interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export class JobCreateInterceptor implements NestInterceptor {
});
}),
);

return jc;
}
}
Loading