Skip to content

Commit

Permalink
fix: set TCP parameters only if they are greater than 0 (#375)
Browse files Browse the repository at this point in the history
  • Loading branch information
hiimjako authored Jan 20, 2025
1 parent fb01bca commit 2d3b96e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 23 deletions.
34 changes: 18 additions & 16 deletions best_practices/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
Client best practices
=====================
# Client best practices

The scope of this document is to provide a set of best practices for the client applications that use the Go client library.</br>


#### General recommendations

- Messages are not thread-safe, you should not share the same message between different go-routines or different Send/BatchSend calls.
- Use the producer name only if you need deduplication.
- Avoid to store the consumer offset to the server too often.
Expand All @@ -21,12 +20,15 @@ No particular tuning is required. Just follow the [Getting started](../examples/

Each connection can support multiple producers and consumers, you can reduce the number of connections by using the same connection for multiple producers and consumers.</br>
With:

```golang
SetMaxConsumersPerClient(10).
SetMaxConsumersPerClient(10)
```

The TCP connection will be shared between the producers and consumers.
Note about consumers: One slow consumer can block the others, so it is important:

- To have a good balance between the number of consumers and the speed of the consumers.
- work application side to avoid slow consumers, for example, by using a go-routines/buffers.

Expand All @@ -35,7 +37,7 @@ Note about consumers: One slow consumer can block the others, so it is important
To achieve high throughput, you should use one producer per connection, and one consumer per connection.
This will avoid lock contention between the producers when sending messages and between the consumers when receiving messages.

The method `Send` is usually enough to achieve high throughput.
The method `Send` is usually enough to achieve high throughput.
In some case you can use the `BatchSend` method. See the `Send` vs `BatchSend` documentation for more details.

#### Low latency
Expand All @@ -46,25 +48,23 @@ The method `Send` is the best choice to achieve low latency. Default values are
You can change the `BatchSize` parameter to increase or reduce the max number of messages sent in one batch.
Note: Since the client uses dynamic send, the `BatchSize` parameter is a hint to the client, the client can send less than the `BatchSize`.

#### Store several text based messages
#### Store several text based messages

In case you want to store logs, text-based or big messages, you can use the `Sub Entries Batching` method.
Where it is possible to store multiple messages in one entry and compress the entry with different algorithms.
It is useful to reduce the disk space and the network bandwidth.
It is useful to reduce the disk space and the network bandwidth.
See the `Sub Entries Batching` documentation for more details.</br>

#### Store several small messages

In case you want to store a lot of small messages, you can use the `BatchSend` method.
Where it is possible to store multiple messages in one entry. This will avoid creating small chunks on the server side.</br>


#### Avoid duplications

In case you want to store messages with deduplication, you need to set the producer name and the deduplication id.
See the `Deduplication` documentation for more details.</br>


#### Consumer fail over

In case you want to have a consumer fail over, you can use the `Single Active Consumer` method.
Expand All @@ -75,32 +75,34 @@ Where only one consumer is active at a time, and the other consumers are in stan
The client library provides a reliable producer and consumer, where the producer and consumer can recover from a connection failure.
See the `Reliable` documentation for more details.</br>


#### Scaling the streams

In case you want to scale the streams, you can use the `Super Stream` method.
Where you can have multiple streams and only one stream is active at a time.
See the `Super Stream` documentation for more details.</br>


#### Filtering the data when consuming

In case you want to filter the data when consuming, you can use the `Stream Filtering` method.
Where you can filter the data based on the metadata.
See the `Stream Filtering` documentation for more details.</br>


#### Using a load balancer

In case you want to use a load balancer, you can use the `Using a load balancer` method.
In Kubernetes, you can use the service name as load balancer dns.
See the `Using a load balancer` documentation for more details.</br>

#### Configure TCP parameters

By default, this client uses optimized TCP read and write buffer sizes to achieve the best performance.
In some environments, this optimization may cause latency issues. To restore the default OS parameters, you can call:

```go
env, err := stream.NewEnvironment(stream.NewEnvironmentOptions().
SetWriteBuffer(-1).
SetReadBuffer(-1)
)
```






See these issues [Issue #293](https://github.com/rabbitmq/rabbitmq-stream-go-client/issues/293) and [PR #374](https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/374), to get more insight.
21 changes: 14 additions & 7 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"crypto/tls"
"errors"
"fmt"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
"math/rand"
"net"
"net/url"
Expand All @@ -15,6 +14,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
)

// SaslConfiguration see
Expand Down Expand Up @@ -161,14 +162,20 @@ func (c *Client) connect() error {
return errorConnection
}

if err = connection.SetWriteBuffer(c.tcpParameters.WriteBuffer); err != nil {
logs.LogError("Failed to SetWriteBuffer to %d due to %v", c.tcpParameters.WriteBuffer, err)
return err
if c.tcpParameters.WriteBuffer > 0 {
if err = connection.SetWriteBuffer(c.tcpParameters.WriteBuffer); err != nil {
logs.LogError("Failed to SetWriteBuffer to %d due to %v", c.tcpParameters.WriteBuffer, err)
return err
}
}
if err = connection.SetReadBuffer(c.tcpParameters.ReadBuffer); err != nil {
logs.LogError("Failed to SetReadBuffer to %d due to %v", c.tcpParameters.ReadBuffer, err)
return err

if c.tcpParameters.ReadBuffer > 0 {
if err = connection.SetReadBuffer(c.tcpParameters.ReadBuffer); err != nil {
logs.LogError("Failed to SetReadBuffer to %d due to %v", c.tcpParameters.ReadBuffer, err)
return err
}
}

if err = connection.SetNoDelay(c.tcpParameters.NoDelay); err != nil {
logs.LogError("Failed to SetNoDelay to %b due to %v", c.tcpParameters.NoDelay, err)
return err
Expand Down

0 comments on commit 2d3b96e

Please sign in to comment.