Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic send and batch send splits for milestone 1.5 #367

Merged
merged 26 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b8ad4c0
work in progress
Gsantomaggio Nov 29, 2024
f8153b5
work in progress
Gsantomaggio Nov 29, 2024
d744d95
bump to 1.22 golang
Gsantomaggio Dec 1, 2024
88ce242
Merge branch 'main' into dynamic_send
Gsantomaggio Dec 3, 2024
4dc144d
change the batchSend
Gsantomaggio Dec 3, 2024
cee445a
Merge branch 'dynamic_send' of github.com:rabbitmq/rabbitmq-stream-go…
Gsantomaggio Dec 3, 2024
b1001e1
change the batchSend
Gsantomaggio Dec 3, 2024
fb0a6c9
adding tests
Gsantomaggio Dec 3, 2024
3930f60
send
Gsantomaggio Dec 9, 2024
16018dd
code refactor
Gsantomaggio Dec 10, 2024
31f86c5
unconfirmed
Gsantomaggio Dec 10, 2024
1642636
documentation
Gsantomaggio Dec 11, 2024
5aa81ed
Update pkg/stream/producer.go
Gsantomaggio Dec 18, 2024
2163a7f
Update pkg/stream/producer_unconfirmed.go
Gsantomaggio Dec 18, 2024
2d39c3c
Update pkg/stream/producer_unconfirmed.go
Gsantomaggio Dec 18, 2024
967fe44
Update README.md
Gsantomaggio Dec 18, 2024
3de11e0
replace the channel with a blocking queue
Gsantomaggio Dec 20, 2024
fc5fe5e
make the unconfirmed operations atomics
Gsantomaggio Dec 27, 2024
57b8473
rename variable
Gsantomaggio Dec 27, 2024
37ff466
Improve producer edge cases
Gsantomaggio Jan 7, 2025
5218fc3
Improve producer edge cases
Gsantomaggio Jan 7, 2025
978a07e
bump windows version to rabbitmq 4.0.5 and erlang 27.2
Gsantomaggio Jan 7, 2025
4f3075f
udpate dependencies
Gsantomaggio Jan 7, 2025
5293c48
more logs for windows CI
Gsantomaggio Jan 7, 2025
3bbc832
restore old timeout
Gsantomaggio Jan 8, 2025
d1db816
temporany remove windows test due of https://github.com/actions/chec…
Gsantomaggio Jan 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 38 additions & 76 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv
- [Run server with Docker](#run-server-with-docker)
- [Getting started for impatient](#getting-started-for-impatient)
- [Examples](#examples)
- [Client best practices](#client-best-practices)
- [Usage](#usage)
* [Connect](#connect)
* [Multi hosts](#multi-hosts)
Expand Down Expand Up @@ -67,7 +68,7 @@ You may need a server to test locally. Let's start the broker:
```shell
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672\
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost -rabbit loopback_users "none"' \
rabbitmq:3-management
rabbitmq:4-management
```
The broker should start in a few seconds. When it’s ready, enable the `stream` plugin and `stream_management`:
```shell
Expand All @@ -85,6 +86,11 @@ See [getting started](./examples/getting_started.go) example.

See [examples](./examples/) directory for more use cases.

### Client best practices

This client provides a set of best practices to use the client in the best way. </br>
See [best practices](./best_practices/README.md) for more details.

# Usage

### Connect
Expand Down Expand Up @@ -251,15 +257,8 @@ To publish a message you need a `*stream.Producer` instance:
producer, err := env.NewProducer("my-stream", nil)
```

With `ProducerOptions` is possible to customize the Producer behaviour:
```golang
type ProducerOptions struct {
Name string // Producer name, it is useful to handle deduplication messages
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
BatchPublishingDelay int // Period to send a batch of messages.
}
```
With `ProducerOptions` is possible to customize the Producer behaviour.


The client provides two interfaces to send messages.
`send`:
Expand All @@ -277,30 +276,35 @@ for z := 0; z < 10; z++ {
err = producer.BatchSend(messages)
```

### `Send` vs `BatchSend`

`producer.Send`:
- accepts one message as parameter
- automatically aggregates the messages
- automatically splits the messages in case the size is bigger than `requestedMaxFrameSize`
- automatically splits the messages based on batch-size
- sends the messages in case nothing happens in `producer-send-timeout`
- is asynchronous
The `BatchSend` is the primitive to send the messages. It is up to the user to manage the aggregation.
`Send` introduces a smart layer to publish messages and internally uses `BatchSend`.

`producer.BatchSend`:
- accepts an array messages as parameter
- is synchronous
Starting from version 1.5.0, the `Send` uses a dynamic send.
The client sends the message buffer regardless of any timeout.</br>

Close the producer:
`producer.Close()` the producer is removed from the server. TCP connection is closed if there aren't </b>
other producers
What should you use? </br>
The `Send` method is the best choice for most of the cases:</br>
- It is asynchronous
- It is smart to aggregate the messages in a batch with a low-latency
- It is smart to split the messages in case the size is bigger than `requestedMaxFrameSize`
- You can play with `BatchSize` parameter to increase the throughput

### `Send` vs `BatchSend`
The `BatchSend` is useful in case you need to manage the aggregation by yourself. </br>
It gives you more control over the aggregation process: </br>
- It is synchronous
- It is up to the user to manage the aggregation
- It is up to the user to split the messages in case the size is bigger than `requestedMaxFrameSize`
- It can be faster than `Send` in case the aggregation is managed by the user.

The `BatchSend` is the primitive to send the messages, `Send` introduces a smart layer to publish messages and internally uses `BatchSend`.
#### Throughput vs Latency</br>
With both methods you can have low-latency and/or high-throughput. </br>
The `Send` is the best choice for low-latency without care about aggregation.
With `BatchSend` you have more control.</br>

The `Send` interface works in most of the cases, In some condition is about 15/20 slower than `BatchSend`. See also this [thread](https://groups.google.com/g/rabbitmq-users/c/IO_9-BbCzgQ).

See also "Client performances" example in the [examples](./examples/performances/) directory
Performance test tool can help you to test `Send` and `BatchSend` </br>
See also the [Performance test tool](#performance-test-tool) section.

### Publish Confirmation

Expand Down Expand Up @@ -350,10 +354,13 @@ the values `messageStatus.GetMessage().GetPublishingId()` and `messageStatus.Get

See also "Getting started" example in the [examples](./examples/) directory



### Deduplication

The deduplication is a feature that allows to avoid the duplication of messages. </br>
It is enabled by the user by setting the producer name with the options: </br>
```golang
producer, err := env.NewProducer(streamName, stream.NewProducerOptions().SetName("my_producer"))
```
The stream plugin can handle deduplication data, see this blog post for more details:
https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/ </br>
You can find a "Deduplication" example in the [examples](./examples/) directory. </br>
Expand Down Expand Up @@ -596,55 +603,10 @@ Like the standard stream, you should avoid to store the offset for each single m
### Performance test tool

Performance test tool it is useful to execute tests.
The performance test tool is in the [perfTest](./perfTest) directory. </br>
See also the [Java Performance](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#the-performance-tool) tool


To install you can download the version from github:

Mac:
```
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz
```

Linux:
```
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz
```

Windows
```
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip
```

execute `stream-perf-test --help` to see the parameters. By default it executes a test with one producer, one consumer.

here an example:
```shell
stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10
```

### Performance test tool Docker
A docker image is available: `pivotalrabbitmq/go-stream-perf-test`, to test it:

Run the server is host mode:
```shell
docker run -it --rm --name rabbitmq --network host \
rabbitmq:3-management
```
enable the plugin:
```
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
```
then run the docker image:
```shell
docker run -it --network host pivotalrabbitmq/go-stream-perf-test
```

To see all the parameters:
```shell
docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help
```

### Build form source

```shell
Expand Down
106 changes: 106 additions & 0 deletions best_practices/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
Client best practices
=====================

The scope of this document is to provide a set of best practices for the client applications that use the Go client library.</br>


#### General recommendations
- Messages are not thread-safe, you should not share the same message between different go-routines or different Send/BatchSend calls.
- Use the producer name only if you need deduplication.
- Avoid to store the consumer offset to the server too often.
- `Send` works well in most of the cases, use `BatchSend` when you need more control.
- Connections/producers/consumers are designed to be long-lived. You should avoid creating and closing them too often.
- The library is generally thread-safe,even it is better to use one producer/consumer per go-routine.

#### Default configuration

The default configuration of the client library is designed to be used in most of the cases.
No particular tuning is required. Just follow the [Getting started](../examples/getting_started.go) example.

#### Multiple producers and consumers

Each connection can support multiple producers and consumers, you can reduce the number of connections by using the same connection for multiple producers and consumers.</br>
With:
```golang
SetMaxConsumersPerClient(10).
SetMaxConsumersPerClient(10)
```
The TCP connection will be shared between the producers and consumers.
Note about consumers: One slow consumer can block the others, so it is important:
- To have a good balance between the number of consumers and the speed of the consumers.
- work application side to avoid slow consumers, for example, by using a go-routines/buffers.

#### High throughput

To achieve high throughput, you should use one producer per connection, and one consumer per connection.
This will avoid lock contention between the producers when sending messages and between the consumers when receiving messages.

The method `Send` is usually enough to achieve high throughput.
In some case you can use the `BatchSend` method. See the `Send` vs `BatchSend` documentation for more details.

#### Low latency

To achieve Low latency, you should use one producer per connection, and one consumer per connection.

The method `Send` is the best choice to achieve low latency. Default values are tuned for low latency.
You can change the `BatchSize` parameter to increase or reduce the max number of messages sent in one batch.
Note: Since the client uses dynamic send, the `BatchSize` parameter is a hint to the client, the client can send less than the `BatchSize`.

#### Store several text based messages

In case you want to store logs, text-based or big messages, you can use the `Sub Entries Batching` method.
Where it is possible to store multiple messages in one entry and compress the entry with different algorithms.
It is useful to reduce the disk space and the network bandwidth.
See the `Sub Entries Batching` documentation for more details.</br>

#### Store several small messages

In case you want to store a lot of small messages, you can use the `BatchSend` method.
Where it is possible to store multiple messages in one entry. This will avoid creating small chunks on the server side.</br>


#### Avoid duplications

In case you want to store messages with deduplication, you need to set the producer name and the deduplication id.
See the `Deduplication` documentation for more details.</br>


#### Consumer fail over

In case you want to have a consumer fail over, you can use the `Single Active Consumer` method.
Where only one consumer is active at a time, and the other consumers are in standby mode.

#### Reliable producer and consumer

The client library provides a reliable producer and consumer, where the producer and consumer can recover from a connection failure.
See the `Reliable` documentation for more details.</br>


#### Scaling the streams

In case you want to scale the streams, you can use the `Super Stream` method.
Where you can have multiple streams and only one stream is active at a time.
See the `Super Stream` documentation for more details.</br>


#### Filtering the data when consuming

In case you want to filter the data when consuming, you can use the `Stream Filtering` method.
Where you can filter the data based on the metadata.
See the `Stream Filtering` documentation for more details.</br>


#### Using a load balancer

In case you want to use a load balancer, you can use the `Using a load balancer` method.
In Kubernetes, you can use the service name as load balancer dns.
See the `Using a load balancer` documentation for more details.</br>









2 changes: 0 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,3 @@ Stream examples
- [Single Active Consumer](./single_active_consumer) - Single Active Consumer example
- [Reliable](./reliable) - Reliable Producer and Reliable Consumer example
- [Super Stream](./super_stream) - Super Stream example with Single Active Consumer
- [Client performances](./performances) - Client performances example

5 changes: 4 additions & 1 deletion examples/deduplication/deduplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ func main() {
}

producer, err := env.NewProducer(streamName,
stream.NewProducerOptions().SetProducerName("myProducer")) // producer name is mandatory to handle the deduplication
stream.NewProducerOptions().
// producer name is mandatory to handle the deduplication
// don't use the producer name if you don't need the deduplication
SetProducerName("myProducer"))

CheckErr(err)

Expand Down
38 changes: 0 additions & 38 deletions examples/performances/README.md

This file was deleted.

Loading
Loading