Skip to content

Commit

Permalink
Add client ClientProvidedName (#270)
Browse files Browse the repository at this point in the history
* Add client ClientProvidedName to producer and consumer.
* Closes #269
* Add the HA test for the producer based on ClientProvidedName
* Set SetClientProvidedName on the tests

---------

Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio authored Feb 27, 2024
1 parent a503d8a commit 1b75cdb
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 33 deletions.
1 change: 1 addition & 0 deletions examples/getting_started.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func main() {
streamName,
handleMessages,
stream.NewConsumerOptions().
SetClientProvidedName("my_consumer"). // connection name
SetConsumerName("my_consumer"). // set a consumer name
SetOffset(stream.OffsetSpecification{}.First()). // start consuming from the beginning
SetCRCCheck(false)) // Disable crc control, increase the performances
Expand Down
4 changes: 3 additions & 1 deletion examples/haProducer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ func main() {
var unConfirmedMessages []message.StreamMessage
rProducer, err := ha.NewReliableProducer(env,
streamName,
stream.NewProducerOptions().SetConfirmationTimeOut(5*time.Second),
stream.NewProducerOptions().
SetConfirmationTimeOut(5*time.Second).
SetClientProvidedName(fmt.Sprintf("producer-%d", i)),
func(messageStatus []*stream.ConfirmationStatus) {
go func() {
for _, msgStatus := range messageStatus {
Expand Down
71 changes: 47 additions & 24 deletions pkg/ha/ha_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
. "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
"sync/atomic"
"time"
)

var _ = Describe("Reliable Producer", func() {
Expand Down Expand Up @@ -59,29 +60,51 @@ var _ = Describe("Reliable Producer", func() {
// the client provider name is not exposed to the user.
// we need to expose it than kill the connection

//It("restart Reliable Producer in case of killing connection", func() {
// signal := make(chan struct{})
// var confirmed int32
// producer, err := NewReliableProducer(envForRProducer,
// streamForRProducer, &ProducerOptions{}, func(messageConfirm []*ConfirmationStatus) {
// for _, confirm := range messageConfirm {
// Expect(confirm.IsConfirmed()).To(BeTrue())
// }
// if atomic.AddInt32(&confirmed, int32(len(messageConfirm))) == 10 {
// signal <- struct{}{}
// }
// })
// Expect(err).NotTo(HaveOccurred())
//
// // kill the connection
//
// for i := 0; i < 10; i++ {
// msg := amqp.NewMessage([]byte("ha"))
// err := producer.Send(msg)
// Expect(err).NotTo(HaveOccurred())
// }
// <-signal
// Expect(producer.Close()).NotTo(HaveOccurred())
//})
It("restart Reliable Producer in case of killing connection", func() {
signal := make(chan struct{})
var confirmed int32
clientProvidedName := uuid.New().String()
producer, err := NewReliableProducer(envForRProducer,
streamForRProducer, NewProducerOptions().SetClientProvidedName(clientProvidedName), func(messageConfirm []*ConfirmationStatus) {
for _, confirm := range messageConfirm {
Expect(confirm.IsConfirmed()).To(BeTrue())
}
if atomic.AddInt32(&confirmed, int32(len(messageConfirm))) == 10 {
signal <- struct{}{}
}
})
Expect(err).NotTo(HaveOccurred())

time.Sleep(1 * time.Second)
connectionToDrop := ""
Eventually(func() bool {
connections, err := Connections("15672")
if err != nil {
return false
}
for _, connection := range connections {
if connection.ClientProperties.Connection_name == clientProvidedName {
connectionToDrop = connection.Name
return true
}
}
return false
}, time.Second*5).
Should(BeTrue())

Expect(connectionToDrop).NotTo(BeEmpty())
// kill the connection
errDrop := DropConnection(connectionToDrop, "15672")
Expect(errDrop).NotTo(HaveOccurred())

time.Sleep(2 * time.Second) // we give some time to the client to reconnect
for i := 0; i < 10; i++ {
msg := amqp.NewMessage([]byte("ha"))
err := producer.Send(msg)
Expect(err).NotTo(HaveOccurred())
}
<-signal
Expect(producer.Close()).NotTo(HaveOccurred())
})

})
11 changes: 10 additions & 1 deletion pkg/ha/http_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ type queue struct {
Messages int `json:"messages"`
}

type client_properties struct {
Connection_name string `json:"connection_name"`
}

type connection struct {
Name string `json:"name"`
Name string `json:"name"`
ClientProperties client_properties `json:"client_properties"`
}

func messagesReady(queueName string, port string) (int, error) {
Expand Down Expand Up @@ -82,8 +87,12 @@ func baseCall(url, username, password string, method string) (string, error) {
return "", err2
}
return string(bodyBytes), nil
}

if resp.StatusCode == 204 { // No Content
return "", nil
}

return "", errors.New(strconv.Itoa(resp.StatusCode))

}
7 changes: 7 additions & 0 deletions pkg/stream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type ConsumerOptions struct {
Offset OffsetSpecification
CRCCheck bool
initialCredits int16
ClientProvidedName string
}

func NewConsumerOptions() *ConsumerOptions {
Expand All @@ -145,6 +146,7 @@ func NewConsumerOptions() *ConsumerOptions {
autoCommitStrategy: NewAutoCommitStrategy(),
CRCCheck: false,
initialCredits: 10,
ClientProvidedName: "go-stream-consumer",
}
}

Expand Down Expand Up @@ -182,6 +184,11 @@ func (c *ConsumerOptions) SetOffset(offset OffsetSpecification) *ConsumerOptions
return c
}

func (c *ConsumerOptions) SetClientProvidedName(clientProvidedName string) *ConsumerOptions {
c.ClientProvidedName = clientProvidedName
return c
}

func (c *Client) credit(subscriptionId byte, credit int16) {
length := 2 + 2 + 1 + 2
var b = bytes.NewBuffer(make([]byte, 0, length+4))
Expand Down
1 change: 1 addition & 0 deletions pkg/stream/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ var _ = Describe("Streaming Consumers", func() {
atomic.AddInt32(&messagesReceived, 1)
}, NewConsumerOptions().
SetOffset(OffsetSpecification{}.Offset(50)).
SetClientProvidedName("consumer_test").
SetCRCCheck(true))
Expect(err).NotTo(HaveOccurred())

Expand Down
24 changes: 17 additions & 7 deletions pkg/stream/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,9 +542,13 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
break
}
}
clientProvidedName := "go-stream-producer"
if options != nil && options.ClientProvidedName != "" {
clientProvidedName = options.ClientProvidedName
}

if clientResult == nil {
clientResult = cc.newClientForProducer(leader, tcpParameters, saslConfiguration)
clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration)
}

err := clientResult.connect()
Expand All @@ -561,7 +565,7 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
if err != nil {
return nil, err
}
clientResult = cc.newClientForProducer(leader, tcpParameters, saslConfiguration)
clientResult = cc.newClientForProducer(options.ClientProvidedName, leader, tcpParameters, saslConfiguration)
err = clientResult.connect()
if err != nil {
return nil, err
Expand All @@ -578,8 +582,8 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
return producer, nil
}

func (cc *environmentCoordinator) newClientForProducer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration) *Client {
clientResult := newClient("go-stream-producer", leader, tcpParameters, saslConfiguration)
func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration) *Client {
clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration)
chMeta := make(chan metaDataUpdateEvent, 1)
clientResult.metadataListener = chMeta
go func(ch <-chan metaDataUpdateEvent, cl *Client) {
Expand All @@ -598,7 +602,7 @@ func (cc *environmentCoordinator) newClientForProducer(leader *Broker, tcpParame
return clientResult
}

func (cc *environmentCoordinator) newConsumer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration,
func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration,
streamName string, messagesHandler MessagesHandler,
options *ConsumerOptions) (*Consumer, error) {
cc.mutex.Lock()
Expand All @@ -614,7 +618,7 @@ func (cc *environmentCoordinator) newConsumer(leader *Broker, tcpParameters *TCP
}

if clientResult == nil {
clientResult = newClient("go-stream-consumer", leader, tcpParameters, saslConfiguration)
clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration)
chMeta := make(chan metaDataUpdateEvent)
clientResult.metadataListener = chMeta
go func(ch <-chan metaDataUpdateEvent, cl *Client) {
Expand Down Expand Up @@ -762,8 +766,14 @@ func (ps *consumersEnvironment) NewSubscriber(clientLocator *Client, streamName
}
}
consumerBroker.cloneFrom(clientLocator.broker, resolver)
clientProvidedName := "go-stream-consumer"
if consumerOptions != nil && consumerOptions.ClientProvidedName != "" {
clientProvidedName = consumerOptions.ClientProvidedName
}
consumer, err := ps.consumersCoordinator[coordinatorKey].
newConsumer(consumerBroker, clientLocator.tcpParameters, clientLocator.saslConfiguration, streamName, messagesHandler, consumerOptions)
newConsumer(clientProvidedName, consumerBroker, clientLocator.tcpParameters,
clientLocator.saslConfiguration,
streamName, messagesHandler, consumerOptions)
if err != nil {
return nil, err
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type ProducerOptions struct {
SubEntrySize int // Size of sub Entry, to aggregate more subEntry using one publishing id
Compression Compression // Compression type, it is valid only if SubEntrySize > 1
ConfirmationTimeOut time.Duration // Time to wait for the confirmation
ClientProvidedName string // Client provider name that will be shown in the management UI
}

func (po *ProducerOptions) SetProducerName(name string) *ProducerOptions {
Expand Down Expand Up @@ -125,6 +126,11 @@ func (po *ProducerOptions) SetConfirmationTimeOut(duration time.Duration) *Produ
return po
}

func (po *ProducerOptions) SetClientProvidedName(name string) *ProducerOptions {
po.ClientProvidedName = name
return po
}

func NewProducerOptions() *ProducerOptions {
return &ProducerOptions{
QueueSize: defaultQueuePublisherSize,
Expand All @@ -133,6 +139,7 @@ func NewProducerOptions() *ProducerOptions {
SubEntrySize: 1,
Compression: Compression{},
ConfirmationTimeOut: defaultConfirmationTimeOut,
ClientProvidedName: "go-stream-producer",
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/stream/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ var _ = Describe("Streaming Producers", func() {
NewProducerOptions().
SetBatchSize(BatchSize).
SetSubEntrySize(SubEntrySize).
SetClientProvidedName("batch-go-stream-producer").
SetConfirmationTimeOut(1*time.Second).
SetCompression(Compression{}.None()),
&messagesReceived,
Expand Down

0 comments on commit 1b75cdb

Please sign in to comment.