From 80cdcd5feb7ca9f7268c722009bcd53d2900bf81 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 1 Mar 2024 16:37:15 +0100 Subject: [PATCH] Implement the reliable consumer (#271) * Implement the reliable consumer * Add tests for the reliable consumer * Add the documentation for the `ha` package --------- Signed-off-by: Gabriele Santomaggio --- README.md | 26 ++-- examples/README.md | 20 +-- examples/haProducer/http/http.go | 89 -------------- examples/haProducer/producer.go | 125 ------------------- examples/reliable/README.md | 20 +++ examples/reliable/reliable_client.go | 177 +++++++++++++++++++++++++++ pkg/ha/ha_consumer.go | 147 ++++++++++++++++++++++ pkg/ha/ha_consumer_test.go | 131 ++++++++++++++++++++ pkg/ha/ha_publisher.go | 116 +++++++++--------- pkg/ha/ha_publisher_test.go | 29 ++++- pkg/ha/reliable_common.go | 78 ++++++++++++ pkg/stream/client.go | 4 +- pkg/stream/constants.go | 3 + pkg/stream/environment.go | 7 +- 14 files changed, 664 insertions(+), 308 deletions(-) delete mode 100644 examples/haProducer/http/http.go delete mode 100644 examples/haProducer/producer.go create mode 100644 examples/reliable/README.md create mode 100644 examples/reliable/reliable_client.go create mode 100644 pkg/ha/ha_consumer.go create mode 100644 pkg/ha/ha_consumer_test.go create mode 100644 pkg/ha/reliable_common.go diff --git a/README.md b/README.md index e8c96c03..0f0ea0c0 100644 --- a/README.md +++ b/README.md @@ -29,12 +29,12 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv * [Publish Confirmation](#publish-confirmation) * [Deduplication](#deduplication) * [Sub Entries Batching](#sub-entries-batching) - * [HA producer - Experimental](#ha-producer-experimental) * [Consume messages](#consume-messages) * [Manual Track Offset](#manual-track-offset) * [Automatic Track Offset](#automatic-track-offset) * [Get consumer Offset](#get-consumer-offset) * [Handle Close](#handle-close) + * [Reliable Producer and Reliable Consumer](#reliable-producer-and-reliable-consumer) - [Performance test tool](#performance-test-tool) * [Performance test tool Docker](#performance-test-tool-docker) - [Build form source](#build-form-source) @@ -379,22 +379,20 @@ producer, err := env.NewProducer(streamName, stream.NewProducerOptions(). SetCompression(stream.Compression{}.Gzip())) ``` -### Ha Producer Experimental -The ha producer is built up the standard producer.
+### Reliable Producer and Reliable Consumer + +The `ReliableProducer` and `ReliableConsumer` are built up the standard producer/consumer.
+Both use the standard events to handle the close. So you can write your own code to handle the fail-over.
+ Features: - - auto-reconnect in case of disconnection - - handle the unconfirmed messages automatically in case of fail. + - [`Both`] auto-reconnect in case of disconnection. + - [`Both`] check if stream exists, if not they close the `ReliableProducer` and `ReliableConsumer`. + - [`Both`] check if the stream has a valid leader and replicas, if not they retry until the stream is ready. + - [`ReliableProducer`] handle the unconfirmed messages automatically in case of fail. + - [`ReliableConsumer`] restart from the last offset in case of restart. -You can find a "HA producer" example in the [examples](./examples/) directory.
+You can find a "Reliable" example in the [examples](./examples/) directory.
-```golang -haproducer := NewHAProducer( - env *stream.Environment, // mandatory - streamName string, // mandatory - producerOptions *stream.ProducerOptions, //optional - confirmMessageHandler ConfirmMessageHandler // mandatory - ) -``` ### Consume messages diff --git a/examples/README.md b/examples/README.md index d0259492..6d482849 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,15 +1,17 @@ Stream examples === - - [Getting started](./getting_started.go). A good point to start. - - [Offset Start](./offsetStart/offset.go). How to set different points to start consuming - - [Offset Tracking](./offsetTracking/offsetTracking.go). Manually store the consumer offset - - [Automatic Offset Tracking](./automaticOffsetTracking/automaticOffsetTracking.go). Automatic store the consumer offset - - [Getting started TLS](./tls/getting_started_tls.go). A TLS example. ( you can run `make rabbitmq-server-tls` to create a tls single rabbitmq node ) - - [HA Producer](./haProducer/producer.go). HA producer example (Still experimental) - - [Deduplication](./deduplication/deduplication.go). deduplication example, run it more than one time, and the records
+ - [Getting started](./getting_started.go) - A good point to start. + - [Offset Start](./offsetStart/offset.go) - How to set different points to start consuming + - [Offset Tracking](./offsetTracking/offsetTracking.go) - Manually store the consumer offset + - [Automatic Offset Tracking](./automaticOffsetTracking/automaticOffsetTracking.go) - Automatic store the consumer offset + - [Getting started TLS](./tls/getting_started_tls.go) - A TLS example. ( you can run `make rabbitmq-server-tls` to create a tls single rabbitmq node ) + - [Deduplication](./deduplication/deduplication.go) - Deduplication example, run it more than one time, and the records
won't change, since the server will handle the deduplication. - - [Using a load balancer](./proxy/proxy.go). An example how to use the client with a TLS load balancer.
+ - [Using a load balancer](./proxy/proxy.go) - An example how to use the client with a TLS load balancer.
Use the [RabbitMQ TLS cluster](../compose) to run a TLS and no TLS cluster.
For more details: https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/ - - [Sub Entries Batching](./sub-entries-batching/sub_entries_batching.go). Sub Entries Batching example + - [Sub Entries Batching](./sub-entries-batching/sub_entries_batching.go) - Sub Entries Batching example + + - [Reliable](./reliable) - Reliable Producer and Reliable Consumer example + diff --git a/examples/haProducer/http/http.go b/examples/haProducer/http/http.go deleted file mode 100644 index 6ce73556..00000000 --- a/examples/haProducer/http/http.go +++ /dev/null @@ -1,89 +0,0 @@ -package http - -import ( - "encoding/json" - "github.com/pkg/errors" - "io/ioutil" - "net/http" - "strconv" -) - -type queue struct { - Messages int `json:"messages"` -} - -type connection struct { - Name string `json:"name"` -} - -func messagesReady(queueName string, port string) (int, error) { - bodyString, err := httpGet("http://localhost:"+port+"/api/queues/%2F/"+queueName, "guest", "guest") - if err != nil { - return 0, err - } - - var data queue - err = json.Unmarshal([]byte(bodyString), &data) - if err != nil { - return 0, err - } - return data.Messages, nil -} - -func Connections(port string) ([]connection, error) { - bodyString, err := httpGet("http://localhost:"+port+"/api/connections/", "guest", "guest") - if err != nil { - return nil, err - } - - var data []connection - err = json.Unmarshal([]byte(bodyString), &data) - if err != nil { - return nil, err - } - return data, nil -} - -func DropConnection(name string, port string) error { - _, err := httpDelete("http://localhost:"+port+"/api/connections/"+name, "guest", "guest") - if err != nil { - return err - } - - return nil -} -func httpGet(url, username, password string) (string, error) { - return baseCall(url, username, password, "GET") -} - -func httpDelete(url, username, password string) (string, error) { - return baseCall(url, username, password, "DELETE") -} - -func baseCall(url, username, password string, method string) (string, error) { - var client http.Client - req, err := http.NewRequest(method, url, nil) - if err != nil { - return "", err - } - req.SetBasicAuth(username, password) - - resp, err3 := client.Do(req) - - if err3 != nil { - return "", err3 - } - - defer resp.Body.Close() - - if resp.StatusCode == 200 { // OK - bodyBytes, err2 := ioutil.ReadAll(resp.Body) - if err2 != nil { - return "", err2 - } - return string(bodyBytes), nil - - } - return "", errors.New(strconv.Itoa(resp.StatusCode)) - -} diff --git a/examples/haProducer/producer.go b/examples/haProducer/producer.go deleted file mode 100644 index e7fa646e..00000000 --- a/examples/haProducer/producer.go +++ /dev/null @@ -1,125 +0,0 @@ -package main - -// The ha producer provides a way to auto-reconnect in case of connection problems -// the function handlePublishConfirm is mandatory -// in case of problems the messages have the message.Confirmed == false - -import ( - "bufio" - "fmt" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" - "os" - "sync" - "sync/atomic" - "time" -) - -func CheckErr(err error) { - if err != nil { - fmt.Printf("%s ", err) - os.Exit(1) - } -} - -var confirmed int32 = 0 -var fail int32 = 0 - -func main() { - reader := bufio.NewReader(os.Stdin) - - fmt.Println("HA producer example") - fmt.Println("Connecting to RabbitMQ streaming ...") - const messagesToSend = 500_000 - const numberOfProducers = 7 - - addresses := []string{ - //"rabbitmq-stream://guest:guest@node1:5572/%2f", - //"rabbitmq-stream://guest:guest@node1:5572/%2f", - "rabbitmq-stream://guest:guest@localhost:5552/%2f"} - - env, err := stream.NewEnvironment( - stream.NewEnvironmentOptions(). - SetMaxProducersPerClient(4). - SetUris(addresses)) - CheckErr(err) - - streamName := "golang-reliable-producer-Test" - env.DeleteStream(streamName) - - err = env.DeclareStream(streamName, - &stream.StreamOptions{ - MaxLengthBytes: stream.ByteCapacity{}.GB(2), - }, - ) - var sent int32 - isRunning := true - go func() { - for isRunning { - totalHandled := atomic.LoadInt32(&confirmed) + atomic.LoadInt32(&fail) - fmt.Printf("%s - ToSend: %d - Sent:%d - Confirmed:%d - Not confirmed:%d - Total :%d \n", - time.Now().Format(time.RFC822), messagesToSend*numberOfProducers, sent, confirmed, fail, totalHandled) - time.Sleep(5 * time.Second) - } - }() - var producers []*ha.ReliableProducer - for i := 0; i < numberOfProducers; i++ { - var mutex = sync.Mutex{} - var unConfirmedMessages []message.StreamMessage - rProducer, err := ha.NewReliableProducer(env, - streamName, - stream.NewProducerOptions(). - SetConfirmationTimeOut(5*time.Second). - SetClientProvidedName(fmt.Sprintf("producer-%d", i)), - func(messageStatus []*stream.ConfirmationStatus) { - go func() { - for _, msgStatus := range messageStatus { - if msgStatus.IsConfirmed() { - atomic.AddInt32(&confirmed, 1) - } else { - atomic.AddInt32(&fail, 1) - mutex.Lock() - unConfirmedMessages = append(unConfirmedMessages, msgStatus.GetMessage()) - mutex.Unlock() - } - - } - }() - }) - CheckErr(err) - producers = append(producers, rProducer) - - go func() { - for i := 0; i < messagesToSend; i++ { - msg := amqp.NewMessage([]byte("ha")) - mutex.Lock() - for _, confirmedMessage := range unConfirmedMessages { - err := rProducer.Send(confirmedMessage) - atomic.AddInt32(&sent, 1) - CheckErr(err) - } - unConfirmedMessages = []message.StreamMessage{} - mutex.Unlock() - err := rProducer.Send(msg) - time.Sleep(1 * time.Millisecond) - atomic.AddInt32(&sent, 1) - CheckErr(err) - } - }() - - } - - fmt.Println("Terminated. Press enter to close the connections.") - _, _ = reader.ReadString('\n') - for _, producer := range producers { - err := producer.Close() - if err != nil { - CheckErr(err) - } - } - isRunning = false - err = env.Close() - CheckErr(err) -} diff --git a/examples/reliable/README.md b/examples/reliable/README.md new file mode 100644 index 00000000..4f70387b --- /dev/null +++ b/examples/reliable/README.md @@ -0,0 +1,20 @@ +### Reliable Producer/Consumer example + +This example demonstrates how to use reliable producers and consumers to send and receive messages. +The `ReliableProducer` and `ReliableConsumer` are in the `ha` package and use the disconnection event to reconnect to the broker. +You can write your own `ReliableProducer` and `ReliableConsumer` by using the `Close` channel. + +The `ReliableProducer` blocks the sending of messages when the broker is disconnected and resumes sending when the broker is reconnected. + +In this example, we use `unConfirmedMessages` to re-send the messages that were not confirmed by the broker, for instance, in case of a disconnection. +Then, the `unConfirmedMessages` are sent to the broker again. +Note: +- The `unConfirmedMessages` are not persisted, so if the application is restarted, the `unConfirmedMessages` will be lost. +- The `unConfirmedMessages` order is not guaranteed +- The `unConfirmedMessages` can grow indefinitely if the broker is unavailable for a long time. + + +The `reliable_common.go/retry` function does different checks because during the restart broker can happen different events, please check: +- [this presentation](https://docs.google.com/presentation/d/111PccBLRGb-RNpYEKeIm2MQvdky-fXrQ/edit?usp=sharing&ouid=106772747306273309885&rtpof=true&sd=true) for more details. +- [the code](../../pkg/ha/reliable_common.go) for the implementation details. + diff --git a/examples/reliable/reliable_client.go b/examples/reliable/reliable_client.go new file mode 100644 index 00000000..d4a8db94 --- /dev/null +++ b/examples/reliable/reliable_client.go @@ -0,0 +1,177 @@ +package main + +import ( + "bufio" + "errors" + "fmt" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" + "os" + "sync" + "sync/atomic" + "time" +) + +// The ha producer and consumer provide a way to auto-reconnect in case of connection problems + +import ( + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" +) + +func CheckErr(err error) { + if err != nil { + fmt.Printf("%s ", err) + os.Exit(1) + } +} + +var confirmed int32 = 0 +var fail int32 = 0 +var consumed int32 = 0 +var sent int32 +var reSent int32 + +func main() { + // Tune the parameters to test the reliability + const messagesToSend = 5_000_000 + const numberOfProducers = 2 + const numberOfConsumers = 2 + const sendDelay = 1 * time.Millisecond + const delayEachMessages = 200 + const maxProducersPerClient = 4 + const maxConsumersPerClient = 2 + // + + reader := bufio.NewReader(os.Stdin) + stream.SetLevelInfo(logs.DEBUG) + fmt.Println("Reliable Producer/Consumer example") + fmt.Println("Connecting to RabbitMQ streaming ...") + + addresses := []string{ + //"rabbitmq-stream://guest:guest@node1:5572/%2f", + //"rabbitmq-stream://guest:guest@node1:5572/%2f", + "rabbitmq-stream://guest:guest@localhost:5552/%2f"} + + env, err := stream.NewEnvironment( + stream.NewEnvironmentOptions(). + SetMaxProducersPerClient(maxProducersPerClient). + SetMaxConsumersPerClient(maxConsumersPerClient). + SetUris(addresses)) + CheckErr(err) + fmt.Printf("Environment created with %d producers and %d consumers\n\n", maxProducersPerClient, maxConsumersPerClient) + + streamName := "golang-reliable-Test" + + err = env.DeleteStream(streamName) + // If the stream does not exist, + // we don't care here as we are going to create it anyway + if !errors.Is(err, stream.StreamDoesNotExist) { + CheckErr(err) + } + err = env.DeclareStream(streamName, + &stream.StreamOptions{ + MaxLengthBytes: stream.ByteCapacity{}.GB(2), + }, + ) + CheckErr(err) + + isRunning := true + go func() { + for isRunning { + totalConfirmed := atomic.LoadInt32(&confirmed) + atomic.LoadInt32(&fail) + fmt.Printf("%s - ToSend: %d - nProducers: %d - nConsumers %d \n", time.Now().Format(time.RFC822), + messagesToSend*numberOfProducers, numberOfProducers, numberOfConsumers) + fmt.Printf("Sent:%d - ReSent %d - Confirmed:%d - Not confirmed:%d - Fail+Confirmed :%d \n", + sent, atomic.LoadInt32(&reSent), atomic.LoadInt32(&confirmed), atomic.LoadInt32(&fail), totalConfirmed) + fmt.Printf("Total Consumed: %d - Per consumer: %d \n", atomic.LoadInt32(&consumed), + atomic.LoadInt32(&consumed)/numberOfConsumers) + fmt.Printf("********************************************\n") + time.Sleep(5 * time.Second) + } + }() + var producers []*ha.ReliableProducer + for i := 0; i < numberOfProducers; i++ { + var mutex = sync.Mutex{} + // Here we store the messages that have not been confirmed + // then we resend them. + // Note: This is only for test. The list can grow indefinitely + var unConfirmedMessages []message.StreamMessage + rProducer, err := ha.NewReliableProducer(env, + streamName, + stream.NewProducerOptions(). + SetConfirmationTimeOut(5*time.Second). + SetClientProvidedName(fmt.Sprintf("producer-%d", i)), + func(messageStatus []*stream.ConfirmationStatus) { + go func() { + for _, msgStatus := range messageStatus { + if msgStatus.IsConfirmed() { + atomic.AddInt32(&confirmed, 1) + } else { + atomic.AddInt32(&fail, 1) + mutex.Lock() + unConfirmedMessages = append(unConfirmedMessages, msgStatus.GetMessage()) + mutex.Unlock() + } + } + }() + }) + CheckErr(err) + producers = append(producers, rProducer) + go func() { + for i := 0; i < messagesToSend; i++ { + msg := amqp.NewMessage([]byte("ha")) + mutex.Lock() + for _, confirmedMessage := range unConfirmedMessages { + err := rProducer.Send(confirmedMessage) + atomic.AddInt32(&reSent, 1) + CheckErr(err) + } + unConfirmedMessages = []message.StreamMessage{} + mutex.Unlock() + err := rProducer.Send(msg) + if i%delayEachMessages == 0 { + time.Sleep(sendDelay) + } + atomic.AddInt32(&sent, 1) + CheckErr(err) + } + }() + } + var consumers []*ha.ReliableConsumer + + for i := 0; i < numberOfConsumers; i++ { + go func(name string) { + consumer, err := ha.NewReliableConsumer(env, + streamName, + stream.NewConsumerOptions().SetOffset(stream.OffsetSpecification{}.First()), + func(consumerContext stream.ConsumerContext, message *amqp.Message) { + atomic.AddInt32(&consumed, 1) + }) + CheckErr(err) + consumers = append(consumers, consumer) + }(streamName) + } + + fmt.Println("Press enter to close the connections.") + _, _ = reader.ReadString('\n') + for _, producer := range producers { + err := producer.Close() + if err != nil { + CheckErr(err) + } + } + for _, consumer := range consumers { + err := consumer.Close() + if err != nil { + CheckErr(err) + } + } + isRunning = false + fmt.Println("Connections Closed. Press enter to close the environment.") + _, _ = reader.ReadString('\n') + + err = env.Close() + CheckErr(err) +} diff --git a/pkg/ha/ha_consumer.go b/pkg/ha/ha_consumer.go new file mode 100644 index 00000000..27f88ac5 --- /dev/null +++ b/pkg/ha/ha_consumer.go @@ -0,0 +1,147 @@ +package ha + +import ( + "fmt" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" + "strings" + "sync" + "time" +) + +// ReliableConsumer is a consumer that can reconnect in case of connection problems +// the function messagesHandler is mandatory +type ReliableConsumer struct { + env *stream.Environment + consumer *stream.Consumer + streamName string + consumerOptions *stream.ConsumerOptions + mutexStatus *sync.Mutex + mutexConnection *sync.Mutex + status int + messagesHandler stream.MessagesHandler + currentPosition int64 // the last offset consumed. It is needed in case of restart + + //bootstrap: if true the consumer will start from the user offset. + // If false it will start from the last offset consumed (currentPosition) + bootstrap bool +} + +func (c *ReliableConsumer) handleNotifyClose(channelClose stream.ChannelClose) { + go func() { + for event := range channelClose { + if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) { + logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", c.getInfo()) + c.bootstrap = false + err, reconnected := retry(0, c) + if err != nil { + logs.LogInfo(""+ + "[Reliable] - %s won't be reconnected. Error: %s", c.getInfo(), err) + } + if reconnected { + c.setStatus(StatusOpen) + } else { + c.setStatus(StatusClosed) + } + + } else { + logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", c.getInfo(), event.Reason) + c.setStatus(StatusClosed) + } + } + }() +} + +func NewReliableConsumer(env *stream.Environment, streamName string, + consumerOptions *stream.ConsumerOptions, messagesHandler stream.MessagesHandler) (*ReliableConsumer, error) { + res := &ReliableConsumer{ + env: env, + streamName: streamName, + consumerOptions: consumerOptions, + mutexStatus: &sync.Mutex{}, + mutexConnection: &sync.Mutex{}, + messagesHandler: messagesHandler, + currentPosition: 0, + bootstrap: true, + } + if messagesHandler == nil { + return nil, fmt.Errorf("the messages handler is mandatory") + } + if consumerOptions == nil { + return nil, fmt.Errorf("the consumer options is mandatory") + } + + err := res.newConsumer() + if err == nil { + res.setStatus(StatusOpen) + } + return res, err +} + +func (c *ReliableConsumer) setStatus(value int) { + c.mutexStatus.Lock() + defer c.mutexStatus.Unlock() + c.status = value +} + +func (c *ReliableConsumer) GetStatus() int { + c.mutexStatus.Lock() + defer c.mutexStatus.Unlock() + return c.status +} + +func (c *ReliableConsumer) getEnv() *stream.Environment { + return c.env +} + +func (c *ReliableConsumer) getStreamName() string { + return c.streamName +} + +func (c *ReliableConsumer) getNewInstance() newEntityInstance { + return c.newConsumer +} + +func (c *ReliableConsumer) getInfo() string { + return fmt.Sprintf("consumer %s for stream %s", + c.consumerOptions.ClientProvidedName, c.streamName) +} + +func (c *ReliableConsumer) getTimeOut() time.Duration { + return time.Duration(3) +} + +func (c *ReliableConsumer) newConsumer() error { + c.mutexConnection.Lock() + defer c.mutexConnection.Unlock() + offset := stream.OffsetSpecification{}.Offset(c.currentPosition + 1) + if c.bootstrap { + offset = c.consumerOptions.Offset + } + logs.LogDebug("[Reliable] - creating %s. Boot: %s. StartOffset: %s", c.getInfo(), + c.bootstrap, offset) + consumer, err := c.env.NewConsumer(c.streamName, func(consumerContext stream.ConsumerContext, message *amqp.Message) { + c.mutexConnection.Lock() + + c.currentPosition = consumerContext.Consumer.GetOffset() + c.mutexConnection.Unlock() + c.messagesHandler(consumerContext, message) + }, c.consumerOptions.SetOffset(offset)) + if err != nil { + return err + } + channelNotifyClose := consumer.NotifyClose() + c.handleNotifyClose(channelNotifyClose) + c.consumer = consumer + return err +} + +func (c *ReliableConsumer) Close() error { + c.setStatus(StatusClosed) + err := c.consumer.Close() + if err != nil { + return err + } + return nil +} diff --git a/pkg/ha/ha_consumer_test.go b/pkg/ha/ha_consumer_test.go new file mode 100644 index 00000000..c09c995d --- /dev/null +++ b/pkg/ha/ha_consumer_test.go @@ -0,0 +1,131 @@ +package ha + +import ( + "github.com/google/uuid" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + . "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" + "sync/atomic" + "time" +) + +var _ = Describe("Reliable Consumer", func() { + + var ( + envForRConsumer *Environment + streamForRConsumer string + ) + BeforeEach(func() { + testEnv, err := NewEnvironment(nil) + envForRConsumer = testEnv + Expect(err).NotTo(HaveOccurred()) + streamForRConsumer = uuid.New().String() + err = envForRConsumer.DeclareStream(streamForRConsumer, nil) + Expect(err).NotTo(HaveOccurred()) + }) + AfterEach(func() { + exists, err := envForRConsumer.StreamExists(streamForRConsumer) + Expect(err).NotTo(HaveOccurred()) + if exists { + Expect(envForRConsumer.DeleteStream(streamForRConsumer)).NotTo(HaveOccurred()) + } + }) + + It("Validate mandatory fields", func() { + _, err := NewReliableConsumer(envForRConsumer, + streamForRConsumer, &ConsumerOptions{}, nil) + Expect(err).To(HaveOccurred()) + _, err = NewReliableConsumer(envForRConsumer, streamForRConsumer, nil, func(consumerContext ConsumerContext, message *amqp.Message) { + }) + Expect(err).To(HaveOccurred()) + }) + + It("Create/Confirm and close a Reliable Producer / Consumer", func() { + signal := make(chan struct{}) + var confirmed int32 + producer, err := NewReliableProducer(envForRConsumer, + streamForRConsumer, NewProducerOptions(), func(messageConfirm []*ConfirmationStatus) { + for _, confirm := range messageConfirm { + Expect(confirm.IsConfirmed()).To(BeTrue()) + } + if atomic.AddInt32(&confirmed, int32(len(messageConfirm))) == 10 { + signal <- struct{}{} + } + }) + Expect(err).NotTo(HaveOccurred()) + for i := 0; i < 10; i++ { + msg := amqp.NewMessage([]byte("ha")) + err := producer.Send(msg) + Expect(err).NotTo(HaveOccurred()) + } + <-signal + Expect(producer.Close()).NotTo(HaveOccurred()) + + signal = make(chan struct{}) + var consumed int32 + consumer, err := NewReliableConsumer(envForRConsumer, streamForRConsumer, NewConsumerOptions().SetOffset(OffsetSpecification{}.First()), func(consumerContext ConsumerContext, message *amqp.Message) { + atomic.AddInt32(&consumed, 1) + if atomic.LoadInt32(&consumed) == 10 { + signal <- struct{}{} + } + }) + + Expect(err).NotTo(HaveOccurred()) + <-signal + Expect(consumed).To(Equal(int32(10))) + Expect(consumer.Close()).NotTo(HaveOccurred()) + }) + + It("restart Reliable Consumer in case of killing connection", func() { + + clientProvidedName := uuid.New().String() + consumer, err := NewReliableConsumer(envForRConsumer, streamForRConsumer, NewConsumerOptions().SetOffset(OffsetSpecification{}.First()).SetClientProvidedName(clientProvidedName), + func(consumerContext ConsumerContext, message *amqp.Message) { + }) + Expect(err).NotTo(HaveOccurred()) + Expect(consumer).NotTo(BeNil()) + time.Sleep(1 * time.Second) + Expect(consumer.GetStatus()).To(Equal(StatusOpen)) + connectionToDrop := "" + Eventually(func() bool { + connections, err := Connections("15672") + if err != nil { + return false + } + for _, connection := range connections { + if connection.ClientProperties.Connection_name == clientProvidedName { + connectionToDrop = connection.Name + return true + } + } + return false + }, time.Second*5). + Should(BeTrue()) + + Expect(connectionToDrop).NotTo(BeEmpty()) + // kill the connection + errDrop := DropConnection(connectionToDrop, "15672") + Expect(errDrop).NotTo(HaveOccurred()) + + time.Sleep(2 * time.Second) // we give some time to the client to reconnect + Expect(consumer.GetStatus()).To(Equal(StatusOpen)) + Expect(consumer.Close()).NotTo(HaveOccurred()) + Expect(consumer.GetStatus()).To(Equal(StatusClosed)) + }) + + It("Delete the stream should close the consumer", func() { + consumer, err := NewReliableConsumer(envForRConsumer, streamForRConsumer, + NewConsumerOptions(), + func(consumerContext ConsumerContext, message *amqp.Message) { + }) + Expect(err).NotTo(HaveOccurred()) + Expect(consumer).NotTo(BeNil()) + Expect(consumer.GetStatus()).To(Equal(StatusOpen)) + Expect(envForRConsumer.DeleteStream(streamForRConsumer)).NotTo(HaveOccurred()) + Eventually(func() int { + return consumer.GetStatus() + }, "15s").WithPolling(300 * time.Millisecond).Should(Equal(StatusClosed)) + + }) +}) diff --git a/pkg/ha/ha_publisher.go b/pkg/ha/ha_publisher.go index bc3b27d8..6f20e7e0 100644 --- a/pkg/ha/ha_publisher.go +++ b/pkg/ha/ha_publisher.go @@ -6,19 +6,12 @@ import ( "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" - "math/rand" + "strings" "sync" "sync/atomic" "time" ) -const ( - StatusOpen = 1 - StatusClosed = 2 - StatusStreamDoesNotExist = 3 - StatusReconnecting = 4 -) - func (p *ReliableProducer) handlePublishConfirm(confirms stream.ChannelPublishConfirm) { go func() { for messagesIds := range confirms { @@ -31,23 +24,35 @@ func (p *ReliableProducer) handlePublishConfirm(confirms stream.ChannelPublishCo func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) { go func() { for event := range channelClose { - // TODO: Convert the string to a constant - if event.Reason == "socket client closed" { - logs.LogError("[RProducer] - producer closed unexpectedly.. Reconnecting..") - err, reconnected := p.retry(1) + if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) { + logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", p.getInfo()) + err, reconnected := retry(0, p) if err != nil { - // TODO: Handle stream is not available - return + logs.LogInfo(""+ + "[Reliable] - %s won't be reconnected. Error: %s", p.getInfo(), err) } if reconnected { p.setStatus(StatusOpen) + } else { + p.setStatus(StatusClosed) } - p.reconnectionSignal <- struct{}{} + } else { + logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", p.getInfo(), event.Reason) + p.setStatus(StatusClosed) + } + + select { + case p.reconnectionSignal <- struct{}{}: + case <-time.After(2 * time.Second): } } }() } +// ReliableProducer is a producer that can reconnect in case of connection problems +// the function handlePublishConfirm is mandatory +// in case of problems the messages have the message.Confirmed == false +// The function `send` is blocked during the reconnection type ReliableProducer struct { env *stream.Environment producer *stream.Producer @@ -80,6 +85,9 @@ func NewReliableProducer(env *stream.Environment, streamName string, if confirmMessageHandler == nil { return nil, fmt.Errorf("the confirmation message handler is mandatory") } + if producerOptions == nil { + return nil, fmt.Errorf("the producer options is mandatory") + } err := res.newProducer() if err == nil { @@ -89,7 +97,6 @@ func NewReliableProducer(env *stream.Environment, streamName string, } func (p *ReliableProducer) newProducer() error { - producer, err := p.env.NewProducer(p.streamName, p.producerOptions) if err != nil { return err @@ -103,17 +110,17 @@ func (p *ReliableProducer) newProducer() error { } func (p *ReliableProducer) Send(message message.StreamMessage) error { - if p.getStatus() == StatusStreamDoesNotExist { + if p.GetStatus() == StatusStreamDoesNotExist { return stream.StreamDoesNotExist } - if p.getStatus() == StatusClosed { - return errors.New("producer is closed") + if p.GetStatus() == StatusClosed { + return errors.New(fmt.Sprintf("%s is closed", p.getInfo())) } - if p.getStatus() == StatusReconnecting { - logs.LogDebug("[RProducer] - send producer is reconnecting") + if p.GetStatus() == StatusReconnecting { + logs.LogDebug("[Reliable] %s is reconnecting. The send is blocked", p.getInfo()) <-p.reconnectionSignal - logs.LogDebug("[RProducer] - send producer reconnected") + logs.LogDebug("[Reliable] %s reconnected. The send is unlocked", p.getInfo()) } p.mutex.Lock() @@ -129,7 +136,7 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error { } default: time.Sleep(500 * time.Millisecond) - logs.LogError("[RProducer] - error during send %s", errW.Error()) + logs.LogError("[Reliable] %s - error during send %s", p.getInfo(), errW.Error()) } } @@ -137,61 +144,48 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error { return nil } -func (p *ReliableProducer) retry(backoff int) (error, bool) { - p.setStatus(StatusReconnecting) - sleepValue := rand.Intn(int((p.producerOptions.ConfirmationTimeOut.Seconds()-2+1)+2)*1000) + backoff*1000 - logs.LogInfo("[RProducer] - The producer for the stream %s is in reconnection in %d milliseconds", p.streamName, sleepValue) - time.Sleep(time.Duration(sleepValue) * time.Millisecond) - streamMetaData, errS := p.env.StreamMetaData(p.streamName) - if errors.Is(errS, stream.StreamDoesNotExist) { - return errS, true - } - if errors.Is(errS, stream.StreamNotAvailable) { - logs.LogInfo("[RProducer] - stream %s is not available. Trying to reconnect", p.streamName) - return p.retry(backoff + 1) - } - if streamMetaData.Leader == nil { - logs.LogInfo("[RProducer] - The leader for the stream %s is not ready. Trying to reconnect") - return p.retry(backoff + 1) - } - - var result error - if streamMetaData != nil { - logs.LogInfo("[RProducer] - stream %s exists. Reconnecting the producer.", p.streamName) - result = p.newProducer() - if result == nil { - logs.LogInfo("[RProducer] - stream %s exists. Producer reconnected.", p.streamName) - } else { - logs.LogInfo("[RProducer] - error creating producer for the stream %s exists. Trying to reconnect", p.streamName) - return p.retry(backoff + 1) - } - } else { - logs.LogError("[RProducer] - stream %s does not exist. Closing..", p.streamName) - result = stream.StreamDoesNotExist - } - - return result, true - -} - func (p *ReliableProducer) IsOpen() bool { p.mutexStatus.Lock() defer p.mutexStatus.Unlock() return p.status == StatusOpen } -func (p *ReliableProducer) getStatus() int { +func (p *ReliableProducer) GetStatus() int { p.mutexStatus.Lock() defer p.mutexStatus.Unlock() return p.status } +// IReliable interface func (p *ReliableProducer) setStatus(value int) { p.mutexStatus.Lock() defer p.mutexStatus.Unlock() p.status = value } +func (p *ReliableProducer) getInfo() string { + return fmt.Sprintf("producer %s for stream %s", + p.producerOptions.ClientProvidedName, p.streamName) +} + +func (p *ReliableProducer) getEnv() *stream.Environment { + return p.env +} + +func (p *ReliableProducer) getNewInstance() newEntityInstance { + return p.newProducer +} + +func (p *ReliableProducer) getTimeOut() time.Duration { + return p.producerOptions.ConfirmationTimeOut +} + +func (p *ReliableProducer) getStreamName() string { + return p.streamName +} + +// End of IReliable interface + func (p *ReliableProducer) GetBroker() *stream.Broker { p.mutex.Lock() defer p.mutex.Unlock() diff --git a/pkg/ha/ha_publisher_test.go b/pkg/ha/ha_publisher_test.go index f2eadc2b..94a1efae 100644 --- a/pkg/ha/ha_publisher_test.go +++ b/pkg/ha/ha_publisher_test.go @@ -25,13 +25,21 @@ var _ = Describe("Reliable Producer", func() { Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { - Expect(envForRProducer.DeleteStream(streamForRProducer)).NotTo(HaveOccurred()) + exists, err := envForRProducer.StreamExists(streamForRProducer) + Expect(err).NotTo(HaveOccurred()) + if exists { + Expect(envForRProducer.DeleteStream(streamForRProducer)).NotTo(HaveOccurred()) + } }) - It("Validate confirm handler", func() { + It("Validate mandatory fields", func() { _, err := NewReliableProducer(envForRProducer, streamForRProducer, &ProducerOptions{}, nil) Expect(err).To(HaveOccurred()) + _, err = NewReliableProducer(envForRProducer, streamForRProducer, nil, func(messageConfirm []*ConfirmationStatus) { + + }) + Expect(err).To(HaveOccurred()) }) It("Create/Confirm and close a Reliable Producer", func() { @@ -56,10 +64,6 @@ var _ = Describe("Reliable Producer", func() { Expect(producer.Close()).NotTo(HaveOccurred()) }) - //TODO: The test is commented out because it is not possible to kill the connection from the client side - // the client provider name is not exposed to the user. - // we need to expose it than kill the connection - It("restart Reliable Producer in case of killing connection", func() { signal := make(chan struct{}) var confirmed int32 @@ -107,4 +111,17 @@ var _ = Describe("Reliable Producer", func() { Expect(producer.Close()).NotTo(HaveOccurred()) }) + It("Delete the stream should close the producer", func() { + producer, err := NewReliableProducer(envForRProducer, + streamForRProducer, NewProducerOptions(), func(messageConfirm []*ConfirmationStatus) { + }) + Expect(err).NotTo(HaveOccurred()) + Expect(producer).NotTo(BeNil()) + Expect(envForRProducer.DeleteStream(streamForRProducer)).NotTo(HaveOccurred()) + Eventually(func() int { + return producer.GetStatus() + }, "15s").WithPolling(300 * time.Millisecond).Should(Equal(StatusClosed)) + + }) + }) diff --git a/pkg/ha/reliable_common.go b/pkg/ha/reliable_common.go new file mode 100644 index 00000000..66e1cc9c --- /dev/null +++ b/pkg/ha/reliable_common.go @@ -0,0 +1,78 @@ +package ha + +import ( + "errors" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" + "math/rand" + "time" +) + +const ( + StatusOpen = 1 + StatusClosed = 2 + StatusStreamDoesNotExist = 3 + StatusReconnecting = 4 +) + +type newEntityInstance func() error + +type IReliable interface { + setStatus(value int) + GetStatus() int + getInfo() string + getEnv() *stream.Environment + getNewInstance() newEntityInstance + getTimeOut() time.Duration + getStreamName() string +} + +// Retry is a function that retries the IReliable to the stream +// The first step is to set the status to reconnecting +// Then it sleeps for a random time between 2 and the timeout to avoid overlapping with other reconnecting +// Then it checks if the stream exists. During the restart the stream could be deleted +// If the stream does not exist it returns a StreamDoesNotExist error +// If the stream exists it tries to create a new instance of the IReliable + +// +// The stream could be in a `StreamNotAvailable` status or the `LeaderNotReady` +// `StreamNotAvailable` is a server side error: Stream exists but is not available for the producer and consumer +// `LeaderNotReady` is a client side error: Stream exists it is Ready but the leader is not elected yet. It is mandatory for the Producer +// In both cases it retries the reconnection + +func retry(backoff int, reliable IReliable) (error, bool) { + reliable.setStatus(StatusReconnecting) + sleepValue := rand.Intn(int((reliable.getTimeOut().Seconds()-2+1)+2)*1000) + backoff*1000 + logs.LogInfo("[Reliable] - The %s for the stream %s is in reconnection in %d milliseconds", reliable.getInfo(), reliable.getStreamName(), sleepValue) + time.Sleep(time.Duration(sleepValue) * time.Millisecond) + streamMetaData, errS := reliable.getEnv().StreamMetaData(reliable.getStreamName()) + if errors.Is(errS, stream.StreamDoesNotExist) { + return errS, false + } + if errors.Is(errS, stream.StreamNotAvailable) { + logs.LogInfo("[Reliable] - The stream %s is not available for %s. Trying to reconnect", reliable.getStreamName(), reliable.getInfo()) + return retry(backoff+1, reliable) + } + if errors.Is(errS, stream.LeaderNotReady) { + logs.LogInfo("[Reliable] - The leader for the stream %s is not ready for %s. Trying to reconnect", reliable.getStreamName(), reliable.getInfo()) + return retry(backoff+1, reliable) + } + + var result error + if streamMetaData != nil { + logs.LogInfo("[Reliable] - The stream %s exists. Reconnecting the %s.", reliable.getStreamName(), reliable.getInfo()) + result = reliable.getNewInstance()() + if result == nil { + logs.LogInfo("[Reliable] - The stream %s exists. %s reconnected.", reliable.getInfo(), reliable.getStreamName()) + } else { + logs.LogInfo("[Reliable] - error %s creating %s for the stream %s. Trying to reconnect", result, reliable.getInfo(), reliable.getStreamName()) + return retry(backoff+1, reliable) + } + } else { + logs.LogError("[Reliable] - The stream %s does not exist for %s. Closing..", reliable.getStreamName(), reliable.getInfo()) + return stream.StreamDoesNotExist, false + } + + return result, true + +} diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 4a24af65..e9e1802e 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -412,7 +412,7 @@ func (c *Client) Close() error { Command: CommandClose, StreamName: p.(*Producer).GetStreamName(), Name: p.(*Producer).GetName(), - Reason: "socket client closed", + Reason: SocketClosed, Err: nil, }) @@ -426,7 +426,7 @@ func (c *Client) Close() error { Command: CommandClose, StreamName: cs.(*Consumer).GetStreamName(), Name: cs.(*Consumer).GetName(), - Reason: "socket client closed", + Reason: SocketClosed, Err: nil, }) if err != nil { diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index b46724ec..99fd11ac 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -104,6 +104,9 @@ const ( defaultConfirmationTimeOut = 10 * time.Second // + SocketClosed = "socket client closed" + MetaDataUpdate = "metadata Data update" + StreamTcpPort = "5552" ) diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index f3bd8a59..c89038e3 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -226,6 +226,9 @@ func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, erro } streamsMetadata := client.metaData(streamName) streamMetadata := streamsMetadata.Get(streamName) + if streamMetadata.responseCode != responseCodeOk { + return nil, lookErrorCode(streamMetadata.responseCode) + } tentatives := 0 for streamMetadata == nil || streamMetadata.Leader == nil && tentatives < 3 { @@ -487,7 +490,7 @@ func (c *Client) maybeCleanProducers(streamName string) { Command: CommandMetadataUpdate, StreamName: streamName, Name: producer.(*Producer).GetName(), - Reason: "Meta data update", + Reason: MetaDataUpdate, Err: nil, }) if err != nil { @@ -512,7 +515,7 @@ func (c *Client) maybeCleanConsumers(streamName string) { Command: CommandMetadataUpdate, StreamName: streamName, Name: consumer.(*Consumer).GetName(), - Reason: "Meta data update", + Reason: MetaDataUpdate, Err: nil, }) if err != nil {