Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sub Entries Batching Implementation #86

Merged
merged 16 commits into from
Oct 26, 2021
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.16 as builder
FROM golang:1.17 as builder
ENV GOPATH=/go GOOS=linux CGO_ENABLED=0
WORKDIR /go/src/github.com/rabbitmq/rabbitmq-stream-go-client
COPY go.mod go.sum VERSION ./
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ check: $(STATICCHECK)
$(STATICCHECK) ./pkg/stream

test: vet fmt check
go test --tags=debug -v ./pkg/stream -coverprofile=coverage.txt -covermode=atomic #-ginkgo.v
go test --race --tags=debug -v ./pkg/stream -coverprofile=coverage.txt -covermode=atomic #-ginkgo.v

build-all: vet fmt check build-darwin build-windows build-linux
go test --tags=debug -v -race ./pkg/stream -coverprofile=coverage.txt -covermode=atomic #-ginkgo.v
Expand Down
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv
* [Publish Confirmation](#publish-confirmation)
* [Publish Errors](#publish-errors)
* [Deduplication](#deduplication)
* [Sub Entries Batching](#sub-entries-batching)
* [HA producer - Experimental](#ha-producer-experimental)
* [Consume messages](#consume-messages)
* [Track Offset](#track-offset)
Expand All @@ -43,7 +44,7 @@ Experimental client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rab
### Installing

```shell
go get -u github.com/rabbitmq/[email protected].0-beta
go get -u github.com/rabbitmq/[email protected].1-beta
```

imports:
Expand Down Expand Up @@ -274,6 +275,22 @@ https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/
You can find a "Deduplication" example in the [examples](./examples/) directory. </br>
Run it more than time, the messages count will be always 10.

### Sub Entries Batching

The number of messages to put in a sub-entry. A sub-entry is one "slot" in a publishing frame,
meaning outbound messages are not only batched in publishing frames, but in sub-entries as well.
Use this feature to increase throughput at the cost of increased latency. </br>
You can find a "Sub Entries Batching" example in the [examples](./examples/) directory. </br>

Default compression is `None`, you can define also `gzip` compression. </br>
Compression is valid only is `SubEntrySize > 1`

```golang
producer, err := env.NewProducer(streamName, stream.NewProducerOptions().
SetSubEntrySize(100).
SetCompression(stream.Compression{}.Gzip()))
```

### Ha Producer Experimental
The ha producer is built up the standard producer. </br>
Features:
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.0-beta
0.1.1-beta
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ Stream examples
- [Using a load balancer](./proxy/proxy.go). An example how to use the client with a TLS load balancer.<br />
Use the [RabbitMQ TLS cluster](../compose) to run a TLS and no TLS cluster. <br />
For more details: https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/
- [Sub Entries Batching](./sub-entries-batching/sub_entries_batching.go). Sub Entries Batching example
13 changes: 9 additions & 4 deletions examples/haProducer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,11 @@ func main() {
for i := 0; i < 10; i++ {
wg.Add(1)
go func(wg *sync.WaitGroup) {
for i := 0; i < 500000; i++ {
for i := 0; i < 1200000; i++ {
msg := amqp.NewMessage([]byte("ha"))
err := rProducer.Send(msg)
CheckErr(err)
err = rProducer1.Send(msg)

if atomic.AddInt32(&sent, 2)%20000 == 0 {
time.Sleep(100 * time.Millisecond)
fmt.Printf("Sent..%d messages\n", atomic.LoadInt32(&sent))
Expand Down Expand Up @@ -122,8 +121,14 @@ func main() {
sent, counter, fail, totalHandled)
if sent == totalHandled {
fmt.Printf(" - Messages sent %d match with handled: %d! yea! \n\n", sent, totalHandled)
} else {
fmt.Printf(" - Messages sent %d don't match with handled: %d! that's not good!\n\n", sent, totalHandled)
}

if totalHandled > sent {
fmt.Printf(" - Messages sent %d are lower than handled: %d! some duplication, can happens ! \n\n", sent, totalHandled)
}

if sent > totalHandled {
fmt.Printf(" - Messages handled %d are lower than send: %d! that's not good!\n\n", totalHandled, sent)
}

err = rProducer.Close()
Expand Down
120 changes: 120 additions & 0 deletions examples/sub-entries-batching/sub_entries_batching.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package main

import (
"bufio"
"fmt"
"github.com/google/uuid"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
"os"
"strconv"
"time"
)

func CheckErr(err error) {
if err != nil {
fmt.Printf("%s ", err)
os.Exit(1)
}
}

func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
go func() {
for confirmed := range confirms {
for _, msg := range confirmed {
if msg.Confirmed {
fmt.Printf("message %s stored \n ", msg.Message.GetData())
} else {
fmt.Printf("message %s failed \n ", msg.Message.GetData())
}

}
}
}()
}

func consumerClose(channelClose stream.ChannelClose) {
event := <-channelClose
fmt.Printf("Consumer: %s closed on the stream: %s, reason: %s \n", event.Name, event.StreamName, event.Reason)
}

func main() {
reader := bufio.NewReader(os.Stdin)

fmt.Println("Sub Entry batch example")
fmt.Println("Connecting to RabbitMQ streaming ...")

// Connect to the broker ( or brokers )
env, err := stream.NewEnvironment(
stream.NewEnvironmentOptions().
SetHost("localhost").
SetPort(5552).
SetUser("guest").
SetPassword("guest"))
CheckErr(err)
// Create a stream, you can create streams without any option like:
// err = env.DeclareStream(streamName, nil)
// it is a best practise to define a size, 1GB for example:

streamName := uuid.New().String()
err = env.DeclareStream(streamName,
&stream.StreamOptions{
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
},
)
CheckErr(err)

// Get a new producer for a stream
// define the sub entry batch using producer options
// ex: stream.NewProducerOptions().SetSubEntrySize(100)
// set compression: SetCompression(stream.Compression{}.Gzip()
// or SetCompression(stream.Compression{}.None() <<-- Default value
producer, err := env.NewProducer(streamName, stream.NewProducerOptions().
SetSubEntrySize(100).
SetCompression(stream.Compression{}.Gzip()))
CheckErr(err)

//optional publish confirmation channel
chPublishConfirm := producer.NotifyPublishConfirmation()
handlePublishConfirm(chPublishConfirm)

// the send method automatically aggregates the messages
// based on batch size
for i := 0; i < 10000; i++ {
err := producer.Send(amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i))))
CheckErr(err)
}

// this sleep is not mandatory, just to show the confirmed messages
time.Sleep(1 * time.Second)
err = producer.Close()
CheckErr(err)

// Consumer side don't need to specify anything
//
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
fmt.Printf("consumer name: %s, text: %s \n ", consumerContext.Consumer.GetName(), message.Data)
}

consumer, err := env.NewConsumer(
streamName,
handleMessages,
stream.NewConsumerOptions().
SetConsumerName("my_consumer"). // set a consumer name
SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning
CheckErr(err)
channelClose := consumer.NotifyClose()
// channelClose receives all the closing events, here you can handle the
// client reconnection or just log
defer consumerClose(channelClose)

fmt.Println("Press any key to stop ")
_, _ = reader.ReadString('\n')
err = consumer.Close()
time.Sleep(200 * time.Millisecond)
CheckErr(err)
err = env.DeleteStream(streamName)
CheckErr(err)
err = env.Close()
CheckErr(err)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/rabbitmq/rabbitmq-stream-go-client

go 1.15
go 1.16

require (
github.com/google/uuid v1.3.0
Expand Down
4 changes: 4 additions & 0 deletions perfTest/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ var (
variableBody int
fixedBody int
batchSize int
subEntrySize int
compression string
exitOnError bool
debugLogs bool
runDuration int
Expand All @@ -52,6 +54,8 @@ func setupCli(baseCmd *cobra.Command) {
baseCmd.PersistentFlags().StringSliceVarP(&rabbitmqBrokerUrl, "uris", "", []string{stream.LocalhostUriConnection}, "Broker URLs")
baseCmd.PersistentFlags().IntVarP(&publishers, "publishers", "", 1, "Number of Publishers")
baseCmd.PersistentFlags().IntVarP(&batchSize, "batch-size", "", 100, "Batch Size, from 1 to 200")
baseCmd.PersistentFlags().IntVarP(&subEntrySize, "sub-entry-size", "", 1, "SubEntry size, default 1. > 1 Enable the subEntryBatch")
baseCmd.PersistentFlags().StringVarP(&compression, "compression", "", "", "Compression for sub batching")
baseCmd.PersistentFlags().IntVarP(&consumers, "consumers", "", 1, "Number of Consumers")
baseCmd.PersistentFlags().IntVarP(&publishersPerClient, "publishers-per-client", "", 3, "Publishers Per Client")
baseCmd.PersistentFlags().IntVarP(&consumersPerClient, "consumers-per-client", "", 3, "Consumers Per Client")
Expand Down
35 changes: 23 additions & 12 deletions perfTest/cmd/silent.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ func printStats() {
for {
select {
case _ = <-tickerReset.C:
start = time.Now()

atomic.SwapInt32(&publisherMessageCount, 0)
atomic.SwapInt32(&consumerMessageCount, 0)
atomic.SwapInt32(&confirmedMessageCount, 0)
atomic.SwapInt32(&notConfirmedMessageCount, 0)
start = time.Now()
}
}

Expand All @@ -104,13 +104,14 @@ func printStats() {

func decodeBody() string {
if publishers > 0 {

if fixedBody > 0 {
return fmt.Sprintf("Fixed Body: %d", fixedBody)
return fmt.Sprintf("Fixed Body: %d", fixedBody+8)
}
if variableBody > 0 {
return fmt.Sprintf("Variable Body: %d", variableBody)
}
return fmt.Sprintf("Fixed Body: %d", len("simul_message"))
return fmt.Sprintf("Fixed Body: %d", len("simul_message")+8)
} else {
return "ND"
}
Expand Down Expand Up @@ -240,7 +241,21 @@ func handlePublishConfirms(messageConfirm []*stream.UnConfirmedMessage) {

func startPublisher(streamName string) error {

rPublisher, err := ha.NewHAProducer(simulEnvironment, streamName, nil,
producerOptions := stream.NewProducerOptions()

if subEntrySize > 1 {
cp := stream.Compression{}.None()
if compression == "gzip" {
cp = stream.Compression{}.Gzip()
}
producerOptions.SetSubEntrySize(subEntrySize).SetCompression(cp)

logInfo("Enable SubEntrySize: %d, compression: %s", subEntrySize, cp)
}

rPublisher, err := ha.NewHAProducer(simulEnvironment,
streamName,
producerOptions,
handlePublishConfirms)
if err != nil {
logError("Error create publisher: %s", err)
Expand All @@ -262,6 +277,7 @@ func startPublisher(streamName string) error {
n := time.Now().UnixNano()
var buff = make([]byte, 8)
binary.BigEndian.PutUint64(buff, uint64(n))
/// added to calculate the latency
msg := amqp.NewMessage(append(buff, body...))
arr = append(arr, msg)
}
Expand All @@ -286,15 +302,10 @@ func startPublisher(streamName string) error {
}
time.Sleep(time.Duration(sleep) * time.Millisecond)
}
atomic.AddInt32(&publisherMessageCount, int32(len(arr)))

for _, streamMessage := range arr {
atomic.AddInt64(&messagesSent, 1)
err = prod.Send(streamMessage)
if err != nil {
logError("Error publishing: %s", err)
}
}
atomic.AddInt64(&messagesSent, int64(len(arr)))
err = prod.BatchSend(arr)
atomic.AddInt32(&publisherMessageCount, int32(len(arr)))
checkErr(err)

}
Expand Down
1 change: 1 addition & 0 deletions pkg/amqp/error_stdlib.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !pkgerrors
// +build !pkgerrors

package amqp
Expand Down
Loading