Skip to content

Commit

Permalink
Implement the reliable consumer (#271)
Browse files Browse the repository at this point in the history
* Implement the reliable consumer
* Add tests for the reliable consumer
* Add the documentation for the `ha` package
---------

Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio authored Mar 1, 2024
1 parent 1b75cdb commit 80cdcd5
Show file tree
Hide file tree
Showing 14 changed files with 664 additions and 308 deletions.
26 changes: 12 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv
* [Publish Confirmation](#publish-confirmation)
* [Deduplication](#deduplication)
* [Sub Entries Batching](#sub-entries-batching)
* [HA producer - Experimental](#ha-producer-experimental)
* [Consume messages](#consume-messages)
* [Manual Track Offset](#manual-track-offset)
* [Automatic Track Offset](#automatic-track-offset)
* [Get consumer Offset](#get-consumer-offset)
* [Handle Close](#handle-close)
* [Reliable Producer and Reliable Consumer](#reliable-producer-and-reliable-consumer)
- [Performance test tool](#performance-test-tool)
* [Performance test tool Docker](#performance-test-tool-docker)
- [Build form source](#build-form-source)
Expand Down Expand Up @@ -379,22 +379,20 @@ producer, err := env.NewProducer(streamName, stream.NewProducerOptions().
SetCompression(stream.Compression{}.Gzip()))
```

### Ha Producer Experimental
The ha producer is built up the standard producer. </br>
### Reliable Producer and Reliable Consumer

The `ReliableProducer` and `ReliableConsumer` are built up the standard producer/consumer. </br>
Both use the standard events to handle the close. So you can write your own code to handle the fail-over. </br>

Features:
- auto-reconnect in case of disconnection
- handle the unconfirmed messages automatically in case of fail.
- [`Both`] auto-reconnect in case of disconnection.
- [`Both`] check if stream exists, if not they close the `ReliableProducer` and `ReliableConsumer`.
- [`Both`] check if the stream has a valid leader and replicas, if not they retry until the stream is ready.
- [`ReliableProducer`] handle the unconfirmed messages automatically in case of fail.
- [`ReliableConsumer`] restart from the last offset in case of restart.

You can find a "HA producer" example in the [examples](./examples/) directory. </br>
You can find a "Reliable" example in the [examples](./examples/) directory. </br>

```golang
haproducer := NewHAProducer(
env *stream.Environment, // mandatory
streamName string, // mandatory
producerOptions *stream.ProducerOptions, //optional
confirmMessageHandler ConfirmMessageHandler // mandatory
)
```

### Consume messages

Expand Down
20 changes: 11 additions & 9 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
Stream examples
===

- [Getting started](./getting_started.go). A good point to start.
- [Offset Start](./offsetStart/offset.go). How to set different points to start consuming
- [Offset Tracking](./offsetTracking/offsetTracking.go). Manually store the consumer offset
- [Automatic Offset Tracking](./automaticOffsetTracking/automaticOffsetTracking.go). Automatic store the consumer offset
- [Getting started TLS](./tls/getting_started_tls.go). A TLS example. ( you can run `make rabbitmq-server-tls` to create a tls single rabbitmq node )
- [HA Producer](./haProducer/producer.go). HA producer example (Still experimental)
- [Deduplication](./deduplication/deduplication.go). deduplication example, run it more than one time, and the records <br />
- [Getting started](./getting_started.go) - A good point to start.
- [Offset Start](./offsetStart/offset.go) - How to set different points to start consuming
- [Offset Tracking](./offsetTracking/offsetTracking.go) - Manually store the consumer offset
- [Automatic Offset Tracking](./automaticOffsetTracking/automaticOffsetTracking.go) - Automatic store the consumer offset
- [Getting started TLS](./tls/getting_started_tls.go) - A TLS example. ( you can run `make rabbitmq-server-tls` to create a tls single rabbitmq node )
- [Deduplication](./deduplication/deduplication.go) - Deduplication example, run it more than one time, and the records <br />
won't change, since the server will handle the deduplication.
- [Using a load balancer](./proxy/proxy.go). An example how to use the client with a TLS load balancer.<br />
- [Using a load balancer](./proxy/proxy.go) - An example how to use the client with a TLS load balancer.<br />
Use the [RabbitMQ TLS cluster](../compose) to run a TLS and no TLS cluster. <br />
For more details: https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/
- [Sub Entries Batching](./sub-entries-batching/sub_entries_batching.go). Sub Entries Batching example
- [Sub Entries Batching](./sub-entries-batching/sub_entries_batching.go) - Sub Entries Batching example

- [Reliable](./reliable) - Reliable Producer and Reliable Consumer example

89 changes: 0 additions & 89 deletions examples/haProducer/http/http.go

This file was deleted.

125 changes: 0 additions & 125 deletions examples/haProducer/producer.go

This file was deleted.

20 changes: 20 additions & 0 deletions examples/reliable/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
### Reliable Producer/Consumer example

This example demonstrates how to use reliable producers and consumers to send and receive messages.
The `ReliableProducer` and `ReliableConsumer` are in the `ha` package and use the disconnection event to reconnect to the broker.
You can write your own `ReliableProducer` and `ReliableConsumer` by using the `Close` channel.

The `ReliableProducer` blocks the sending of messages when the broker is disconnected and resumes sending when the broker is reconnected.

In this example, we use `unConfirmedMessages` to re-send the messages that were not confirmed by the broker, for instance, in case of a disconnection.
Then, the `unConfirmedMessages` are sent to the broker again.
Note:
- The `unConfirmedMessages` are not persisted, so if the application is restarted, the `unConfirmedMessages` will be lost.
- The `unConfirmedMessages` order is not guaranteed
- The `unConfirmedMessages` can grow indefinitely if the broker is unavailable for a long time.


The `reliable_common.go/retry` function does different checks because during the restart broker can happen different events, please check:
- [this presentation](https://docs.google.com/presentation/d/111PccBLRGb-RNpYEKeIm2MQvdky-fXrQ/edit?usp=sharing&ouid=106772747306273309885&rtpof=true&sd=true) for more details.
- [the code](../../pkg/ha/reliable_common.go) for the implementation details.

Loading

0 comments on commit 80cdcd5

Please sign in to comment.