Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the ReliableProducer during the broker restart #268

Merged
merged 2 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions compose/ha_tls/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
networks:
- back
hostname: node0
image: rabbitmq:3.13-rc-management
image: rabbitmq:3.13-management
pull_policy: always
ports:
- "5561:5551"
Expand All @@ -24,7 +24,7 @@ services:
networks:
- back
hostname: node1
image: rabbitmq:3.13-rc-management
image: rabbitmq:3.13-management
pull_policy: always
ports:
- "5571:5551"
Expand All @@ -41,7 +41,7 @@ services:
networks:
- back
hostname: node2
image: rabbitmq:3.13-rc-management
image: rabbitmq:3.13-management
pull_policy: always
ports:
- "5581:5551"
Expand Down
94 changes: 53 additions & 41 deletions examples/haProducer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,23 @@ func CheckErr(err error) {

var confirmed int32 = 0
var fail int32 = 0
var mutex = sync.Mutex{}
var unConfirmedMessages []message.StreamMessage

func handlePublishConfirm(messageStatus []*stream.ConfirmationStatus) {
go func() {
for _, msgStatus := range messageStatus {
if msgStatus.IsConfirmed() {
atomic.AddInt32(&confirmed, 1)
} else {
atomic.AddInt32(&fail, 1)
mutex.Lock()
unConfirmedMessages = append(unConfirmedMessages, msgStatus.GetMessage())
mutex.Unlock()
}

}
}()
}

func main() {
reader := bufio.NewReader(os.Stdin)

fmt.Println("HA producer example")
fmt.Println("Connecting to RabbitMQ streaming ...")
const messagesToSend = 20_000_000
const messagesToSend = 500_000
const numberOfProducers = 7

addresses := []string{
"rabbitmq-stream://guest:guest@localhost:5552/%2f",
"rabbitmq-stream://guest:guest@localhost:5552/%2f",
//"rabbitmq-stream://guest:guest@node1:5572/%2f",
//"rabbitmq-stream://guest:guest@node1:5572/%2f",
"rabbitmq-stream://guest:guest@localhost:5552/%2f"}

env, err := stream.NewEnvironment(
stream.NewEnvironmentOptions().
SetMaxProducersPerClient(4).
SetUris(addresses))
CheckErr(err)

Expand All @@ -70,42 +54,70 @@ func main() {
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
},
)

rProducer, err := ha.NewReliableProducer(env,
streamName,
stream.NewProducerOptions().SetConfirmationTimeOut(5*time.Second), handlePublishConfirm)
CheckErr(err)
var sent int32
isRunning := true
go func() {
for isRunning {
totalHandled := atomic.LoadInt32(&confirmed) + atomic.LoadInt32(&fail)
fmt.Printf("%s - ToSend: %d - Sent:%d - Confirmed:%d - Not confirmed:%d - Total :%d \n",
time.Now().Format(time.RFC822), messagesToSend, sent, confirmed, fail, totalHandled)
time.Now().Format(time.RFC822), messagesToSend*numberOfProducers, sent, confirmed, fail, totalHandled)
time.Sleep(5 * time.Second)
}
}()
var producers []*ha.ReliableProducer
for i := 0; i < numberOfProducers; i++ {
var mutex = sync.Mutex{}
var unConfirmedMessages []message.StreamMessage
rProducer, err := ha.NewReliableProducer(env,
streamName,
stream.NewProducerOptions().SetConfirmationTimeOut(5*time.Second),
func(messageStatus []*stream.ConfirmationStatus) {
go func() {
for _, msgStatus := range messageStatus {
if msgStatus.IsConfirmed() {
atomic.AddInt32(&confirmed, 1)
} else {
atomic.AddInt32(&fail, 1)
mutex.Lock()
unConfirmedMessages = append(unConfirmedMessages, msgStatus.GetMessage())
mutex.Unlock()
}

for i := 0; i < messagesToSend; i++ {
msg := amqp.NewMessage([]byte("ha"))
mutex.Lock()
for _, confirmedMessage := range unConfirmedMessages {
err := rProducer.Send(confirmedMessage)
atomic.AddInt32(&sent, 1)
CheckErr(err)
}
unConfirmedMessages = []message.StreamMessage{}
mutex.Unlock()
err := rProducer.Send(msg)
atomic.AddInt32(&sent, 1)
}
}()
})
CheckErr(err)
producers = append(producers, rProducer)

go func() {
for i := 0; i < messagesToSend; i++ {
msg := amqp.NewMessage([]byte("ha"))
mutex.Lock()
for _, confirmedMessage := range unConfirmedMessages {
err := rProducer.Send(confirmedMessage)
atomic.AddInt32(&sent, 1)
CheckErr(err)
}
unConfirmedMessages = []message.StreamMessage{}
mutex.Unlock()
err := rProducer.Send(msg)
time.Sleep(1 * time.Millisecond)
atomic.AddInt32(&sent, 1)
CheckErr(err)
}
}()

}

fmt.Println("Terminated. Press enter to close the connections.")
_, _ = reader.ReadString('\n')
for _, producer := range producers {
err := producer.Close()
if err != nil {
CheckErr(err)
}
}
isRunning = false
err = rProducer.Close()
CheckErr(err)
err = env.Close()
CheckErr(err)
}
48 changes: 34 additions & 14 deletions pkg/ha/ha_publisher.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package ha

import (
"errors"
"fmt"
"github.com/pkg/errors"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
Expand Down Expand Up @@ -34,7 +34,7 @@ func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) {
// TODO: Convert the string to a constant
if event.Reason == "socket client closed" {
logs.LogError("[RProducer] - producer closed unexpectedly.. Reconnecting..")
err, reconnected := p.retry()
err, reconnected := p.retry(1)
if err != nil {
// TODO: Handle stream is not available
return
Expand Down Expand Up @@ -111,9 +111,9 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error {
}

if p.getStatus() == StatusReconnecting {
logs.LogDebug("[RProducer] - producer is reconnecting")
logs.LogDebug("[RProducer] - send producer is reconnecting")
<-p.reconnectionSignal
logs.LogDebug("[RProducer] - producer reconnected")
logs.LogDebug("[RProducer] - send producer reconnected")
}

p.mutex.Lock()
Expand All @@ -128,6 +128,7 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error {
return stream.FrameTooLarge
}
default:
time.Sleep(500 * time.Millisecond)
logs.LogError("[RProducer] - error during send %s", errW.Error())
}

Expand All @@ -136,22 +137,41 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error {
return nil
}

func (p *ReliableProducer) retry() (error, bool) {
func (p *ReliableProducer) retry(backoff int) (error, bool) {
p.setStatus(StatusReconnecting)
sleepValue := rand.Intn(int(p.producerOptions.ConfirmationTimeOut.Seconds()-2+1) + 2)
time.Sleep(time.Duration(sleepValue) * time.Second)
exists, errS := p.env.StreamExists(p.streamName)
if errS != nil {
sleepValue := rand.Intn(int((p.producerOptions.ConfirmationTimeOut.Seconds()-2+1)+2)*1000) + backoff*1000
logs.LogInfo("[RProducer] - The producer for the stream %s is in reconnection in %d milliseconds", p.streamName, sleepValue)
time.Sleep(time.Duration(sleepValue) * time.Millisecond)
streamMetaData, errS := p.env.StreamMetaData(p.streamName)
if errors.Is(errS, stream.StreamDoesNotExist) {
return errS, true

}
if exists {
logs.LogDebug("[RProducer] - stream %s exists. Reconnecting the producer.", p.streamName)
return p.newProducer(), true
if errors.Is(errS, stream.StreamNotAvailable) {
logs.LogInfo("[RProducer] - stream %s is not available. Trying to reconnect", p.streamName)
return p.retry(backoff + 1)
}
if streamMetaData.Leader == nil {
logs.LogInfo("[RProducer] - The leader for the stream %s is not ready. Trying to reconnect")
return p.retry(backoff + 1)
}

var result error
if streamMetaData != nil {
logs.LogInfo("[RProducer] - stream %s exists. Reconnecting the producer.", p.streamName)
result = p.newProducer()
if result == nil {
logs.LogInfo("[RProducer] - stream %s exists. Producer reconnected.", p.streamName)
} else {
logs.LogInfo("[RProducer] - error creating producer for the stream %s exists. Trying to reconnect", p.streamName)
return p.retry(backoff + 1)
}
} else {
logs.LogError("[RProducer] - stream %s does not exist. Closing..", p.streamName)
return stream.StreamDoesNotExist, true
result = stream.StreamDoesNotExist
}

return result, true

}

func (p *ReliableProducer) IsOpen() bool {
Expand Down
87 changes: 87 additions & 0 deletions pkg/ha/ha_publisher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package ha

import (
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
. "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
"sync/atomic"
)

var _ = Describe("Reliable Producer", func() {

var (
envForRProducer *Environment
streamForRProducer string
)
BeforeEach(func() {
testEnv, err := NewEnvironment(nil)
envForRProducer = testEnv
Expect(err).NotTo(HaveOccurred())
streamForRProducer = uuid.New().String()
err = envForRProducer.DeclareStream(streamForRProducer, nil)
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
Expect(envForRProducer.DeleteStream(streamForRProducer)).NotTo(HaveOccurred())
})

It("Validate confirm handler", func() {
_, err := NewReliableProducer(envForRProducer,
streamForRProducer, &ProducerOptions{}, nil)
Expect(err).To(HaveOccurred())
})

It("Create/Confirm and close a Reliable Producer", func() {
signal := make(chan struct{})
var confirmed int32
producer, err := NewReliableProducer(envForRProducer,
streamForRProducer, NewProducerOptions(), func(messageConfirm []*ConfirmationStatus) {
for _, confirm := range messageConfirm {
Expect(confirm.IsConfirmed()).To(BeTrue())
}
if atomic.AddInt32(&confirmed, int32(len(messageConfirm))) == 10 {
signal <- struct{}{}
}
})
Expect(err).NotTo(HaveOccurred())
for i := 0; i < 10; i++ {
msg := amqp.NewMessage([]byte("ha"))
err := producer.Send(msg)
Expect(err).NotTo(HaveOccurred())
}
<-signal
Expect(producer.Close()).NotTo(HaveOccurred())
})

//TODO: The test is commented out because it is not possible to kill the connection from the client side
// the client provider name is not exposed to the user.
// we need to expose it than kill the connection

//It("restart Reliable Producer in case of killing connection", func() {
// signal := make(chan struct{})
// var confirmed int32
// producer, err := NewReliableProducer(envForRProducer,
// streamForRProducer, &ProducerOptions{}, func(messageConfirm []*ConfirmationStatus) {
// for _, confirm := range messageConfirm {
// Expect(confirm.IsConfirmed()).To(BeTrue())
// }
// if atomic.AddInt32(&confirmed, int32(len(messageConfirm))) == 10 {
// signal <- struct{}{}
// }
// })
// Expect(err).NotTo(HaveOccurred())
//
// // kill the connection
//
// for i := 0; i < 10; i++ {
// msg := amqp.NewMessage([]byte("ha"))
// err := producer.Send(msg)
// Expect(err).NotTo(HaveOccurred())
// }
// <-signal
// Expect(producer.Close()).NotTo(HaveOccurred())
//})

})
13 changes: 13 additions & 0 deletions pkg/ha/ha_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ha_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestHa(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Ha Suite")
}
Loading
Loading