Skip to content

A very opinionated abstraction built on top of amqplib and amqp-connection-manager

Notifications You must be signed in to change notification settings

brunogaldino/nestjs-rabbitmq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

46 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

@bgaldino/nestjs-rabbitmq

Table of Contents

Description

This module features an opinionated way of configuring the RabbitMQ connection by using a configuration file that describes the behaviour of all publishers and consumers present in your project

Motivation

I wanted to have a central place where its easier to open the project and see the declared behaviour of the RabbitMQ instance for that project, without having to look around for NestJS annotations, microservices or whatever. Simply to open, go to the configuration file and know exactly what I'm looking at and what I should expect.

Requirements

Instalation

PNPM

pnpm add @nestjs-rabbitmq

YARN

yarn add @nestjs-rabbitmq

NPM

npm add @nestjs-rabbitmq

Getting Started

Importing the module

Import the RabbitMQModule in your app.module.ts and call the method register({})

import { RabbitMQModule } from '@bgaldino/nestjs-rabbitmq';
import { RabbitOptions } from '@bgaldino/nestjs-rabbitmq';

@Module({
  imports: [
    ...
    RabbitMQModule.register({ useClass: RabbitOptions, injects: [...] }),
    ...
  ]
})
export class AppModule {}

The RabbitMQModule is marked as @Global, therefore, calling it once is enough to allow the injection of the RabbitMQService

The configuration file

Create a rabbitmq.config.ts or whatever the name you prefer containing the minimum configuration:

import { Injectable } from "@nestjs/common";
import {
  RabbitMQModuleOptions,
  RabbitOptionsFactory,
} from "@bgaldino/nestjs-rabbitmq";

@Injectable()
export class RabbitOptions implements RabbitOptionsFactory {
  createRabbitOptions(): RabbitMQModuleOptions {
    return {
      connectionString: "amqp://{user}:{password}@{url}/{vhost}",
      delayExchangeName: "MyDelayExchange",
      assertExchanges: [],
      consumerChannels: [],
    };
  }
}

Publishers

Example config file:

assertExchanges: [
  {
    name: 'webhooks', type: 'topic',
    options: { durable: true, autoDelete: false }
  },
  { name: 'test-fanout', type: 'fanout' },
  { name: 'example-direct', type: 'direct'},
],

The assertExchanges property expects an array of RabbitMQAssertExchange and each entry will asserted against the RabbitMQ connected server.

If any entry does not match a current configuration, or cannot be created/attached. A terminal error 406 - PRECONDITION_FAILED will be thrown with the reason and the server will not initialize

Publishing messages

Example:

import { Injectable } from "@nestjs/common";
import { RabbitMQService } from "@bgaldino/nestjs-rabbitmq";

@Injectable()
export class MyService {
  constructor(private readonly rabbitMQService: RabbitMQService) {}
}

async publishMe(){
  const isPublished = await this.rabbitMQService.publish('exchange_name', 'routing_key', {});
}

//or

async publishMeTyped() {
  const isPublished = await this.rabbitMQService.publish<CustomType>('exchange_name', 'routing_key', {});
  //This will return an error if the object is not properly typed
}

The publish() method uses Publish Confirms to make sure that the message is delivered to the broker before returning the promise.

Consumers

Inside the configuration file you can declare your consumers on the section consumerChannels. This list of RabbitMQConsumerChannel will be evaluated and each entry will try to create the queue and bind it to the declared exchange.

Example:

createRabbitOptions(): RabbitMQModuleOptions {
    return {
      ...,
      consumerChannels: [
        {
          options: {
            queue: 'myqueue',
            exchangeName: 'foobar.exchange',
            routingKey: 'myqueue',
            prefetch: Number(process.env.RABBIT_PREFETCH ?? 10),
            retryStrategy: {
              enabled: true,
              maxAttempts: 5,
              delay: (attempt: number) => {
                return attempt * 5000;
              },
            },
          },
          messageHandler: this.consumerService.messageHandler.bind(
            this.consumerService,
          ),
        },
      ]
  }
}

The consumer DOES NOT create exchanges and only bind to ones that already exists. This is to avoid creating exchanges with typos and misconfigurations.

You can also declare an array of routingKeys: string[] if you want to attach multiple keys to the same queue/callback

The messageHandler callback

As declared in the example above, the messageHandler property expects a callback of the type IRabbitHandler. Because of the nature of the library, we will need to call the .bind(this.yourService) in order to bind the this context of the origin service to the callback.

The RabbitMQModule.register() accepts an array of NestJS Modules with the any module that contains an consumer callback function.

The callback has the following signature:

async messageHandler(content: any, params?: RabbitConsumerParams): Promise<void>;

where RabbitConsumerParams is optional and contains the following info:

export type RabbitConsumerParameters = {
  message: ConsumeMessage;
  channel: ConfirmChannel;
  queue: string;
};

Strongly typed consumer

You can use the IRabbitConsumer<T> to type the consumer first parameter content.

export interface MyInterface {
  type: string;
  id: number;
}

@Injectable()
export class MyClass implements IRabbitConsumer<MyInterface> {
  public async messageHandler(content: MyInterface): Promise<void> {
    console.log(content.type);
  }
}

Declaration example

Service example:

//consumer.module.ts
@Module({
  provides: [ConsumerService],
  exports: [ConsumerService],
})
export class ConsumerModule {}

//consumer.service.ts
@Injectable()
export class ConsumerService {
  async messageHandler(content: any) {
    return null;
  }
}

Config Example:

//rabbit.config.ts
@Injectable()
export class RabbitOptions implements RabbitOptionsFactory {
  constructor(
    readonly consumerService: ConsumerService ,
  ) {}

createRabbitOptions(): RabbitMQModuleOptions {
  return {
        ...
        consumerchannels: [
          {
            options: {
              queue: "myqueue",
              exchangename: "test.exchange",
              routingkey: "myqueue",
              prefetch: number(process.env.rabbit_prefetch ?? 10),
            },
            messagehandler: this.MyService.messageHandler.bind(this.MyService),
          },
        ];
    }
}

//app.module.ts
@Module({
  imports: [
  ...,
  RabbitMQModule.register({
    useClass: RabbitConfig,
    injects: [ConsumerModule]
  })
  ]
})
export class AppModule {}

Retrying strategy

On each consumer declaration you can pass the optional parameter: retryStrategy with following the contract:

retryStrategy: {
  enabled?: true,
  maxAttempts?: 5,
  delay?: (attempt) => {return attempt * 5000};
}

By default, the retryStrategy is enabled. When consuming a new message and the callback throws an error.

When enabled, the library will create a {options.queue}.dlq queue and will use the delayExchangeName exchange as the retrying orchestrator where the x-delay value is the return of the anonymous callback delay.

When the maximum attempts is reached, the library issues a nack, sending the message to the .dlq queue.

Deadletter strategy

Each consumer can have an optional parameter deadletterStrategy with the following contract:

retryStrategy: {
  suffix: string
  callback?: (content: T): boolean | Promise<boolean>;
}

By default the suffix for all DLQs will be .dlq.

When giving a callback function, the library will execute it after finishing executing all retry attempts. This is useful if, in case of failure, you want to update your database, send an alert or anything else.

The function expects you to return a boolean, where depending on the result it will behave differently, such as:

  • When returning TRUE, the callback will be executed and the message will be forwarded to DLQ
  • When returning FALSE, the message will not be sent to the DLQ, skipping it entirely, allowing you to drop messages if the callback execution is successful

Finally, if the callback throws any errors or is unable to be executed, an error message will be thrown with the reason and the message will be forwarded to the DLQ normally.

Disabling the automatic ack

By default, the consumer will automatically send an ack at the end of the callback execution. If you need to disable this behaviour, you can pass:

consumerChannels: [
  options: {
    ...,
    autoAck: false
  }
]

When disabled, it is necessary to manually acknowledge the message as follows:

async messageHandler(
  content: ChangeEventStatusUseCaseInput,
  params: RabbitConsumerParameters,
): Promise<void> {
 params.channel.ack(params.message);
}

Extra options

Consumer manual loading

This library attaches the consumers during the OnApplicationBootstrap lifecycle of the NestJS Application, meaning that the application will begin to receive messages as soon as the lifecycle is done.

If your application needs some time to initiate the consumers for some reason, (pods with limited resource for example), you can set the flag extraOptions.consumerManualLoad: true on the configuration file and manually call the consumer instantiation.

Example:

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  await app.listen(3000);

  const rabbit: RabbitMQService = app.get(RabbitMQService);
  await rabbit.createConsumers();
}
bootstrap();

Message inspection and logging

You can inspect the consumer/publisher messages by setting the parameter extraOptions.logType or setting the environment variable RABBITMQ_LOG_TYPE to either: all | consumer | publisher | none.

The default value is none

You can also use the extraOptions.loggerInstance to pass your custom Logger as long as it follows the Logger/Console interfaces. The SDK will use the given instance to log any messages

How to build this library locally?

Just pull the project and run:

pnpm install
pnpm build

And should be good to go

Planned features

  • Add tests
  • Improve semantics of the config file
  • Offer a retry mechanism without the x-delay
  • Make the publisher method strongly typed based on the assertExchanges exchangeName and routingKeys configurations

Contribute

TBD

License

MIT License

About

A very opinionated abstraction built on top of amqplib and amqp-connection-manager

Resources

Stars

Watchers

Forks

Packages

No packages published