RabbitMQ provides the necessary semantics to create allow for horizontally scaling the consumption from a Stream. It is built on top of two main concepts:
- Single-Active-Consumer: A way to inform the server that for a specified stream, there can only be one active consumer with a specific name a time. See Upgrade and Downgrade.
- Stream Partitions: A logical grouping of Stream.
As of 3.13.x, a SuperStream can be declare by using the RabbitMQStream.Connection.create_super_stream/3
callback, but RabbitMQ Server 3.12.x and earlier requires you to manually declare it using the CLI. You can check more information on the Stream Partitions section bellow.
You can declare a consumer as being part of a Single Active Consumer group by passing the :single_active_consumer
property, with the group's name, as follows:
defmodule Subs1 do
use RabbitMQStream.Consumer,
stream_name: "super-stream-01",
connection: MyApp.MyConnection,
properties: [single_active_consumer: "group-1"]
@impl true
def handle_update(_, :upgrade) do
{:ok, :last}
end
@impl true
def handle_message(_message) do
# ...
:ok
end
end
The server will then start sending chunks to the consumer, unless it decides it not to be the active one. If the Server decides to upgrade a consumer to being the active one, it goes through a Upgrade process.
Based on the semantics of single_active_consumer
property, this library implements RabbitMQStream.SuperProducer
and RabbitMQStream.SuperConsumer
, which manages a connection to a SuperStream. It spawns a process for each Partition of the SuperStreams, and provides an API for Consuming or Producing messages.
You can declare SuperStreams with:
:ok = MyApp.MyConnection.create_super_stream("my_super_stream", "route-A": ["stream-01", "stream-02"], "route-B": ["stream-03"])
And you can consume from it with:
defmodule MyApp.MySuperConsumer do
use RabbitMQStream.SuperConsumer,
initial_offset: :next,
connection: MyApp.MyConnection,
super_stream: "my_super_stream"
@impl true
def handle_message(_message) do
# ...
:ok
end
end
When using single-active-consumer
property, each consumer might go through :upgrade
or :downgrade
cycle. This happens when a new Consumer is chosen as the active
one for a specific stream.
To handle upgrade and downgrade requests, a consumer must implement the RabbitMQStream.Consumer.handle_update/2
callback. It receives the current state of the consumer, with the flag being either :upgrade
or :downgrade
.
When a new consumer is chosen, the server first sends a :consumer_update
request to the currently 'active' consumer with the flag set to :downgrade
. You could take this moment to externally persist the consumer's current offset, either in the stream itself by calling RabbitMQStream.Connection.store_offset/3
, or storing it externally. It waits for a response with the current consumer's offset before going forward with the upgrade, eventhough as of RabbiMQ 3.13, the server does not use this information.
After the server receives the response from the current consumer, it sends a :consumer_update
request with the :upgrade
flag to the newly selected consumer. The now active consumer must return its desired offset to consume messages from.
A Super Stream partition a Stream that is a part of a SuperStream, and has a associated routing key. When producing messages using RabbitMQStream.SuperProducer
, you must define the c:RabbitMQStream.SuperProducer.partition/2
callback, that must map each message to its Partition, which is used by the manager to forward the message to the correct partition.
As of 3.12 and earlier, all partitions are automatically generated by the CLI. It can be done with the following command:
rabbitmq_stream rabbitmq-streams add_super_stream invoices --partitions 3
The server then created a stream for each partition, with its name following the format: <super-stream-name>-<N>
, where N
is also the routing key for each message
As of 3.13.x, you can declare a SuperStream, alongside all its partitions with the RabbitMQStream.Connection.create_super_stream/3
command, passing a custom name and routing key for each.