Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query sequence #145

Merged
merged 2 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ tls-gen/
dist/

perfTest/perfTest
go.dev/
Empty file added Docker/Dockerfile
Empty file.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv
### Installing

```shell
go get -u github.com/rabbitmq/rabbitmq-stream-go-client@v1.0.0-rc.13
go get -u github.com/rabbitmq/rabbitmq-stream-go-client
```

imports:
Expand Down Expand Up @@ -296,6 +296,11 @@ https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/
You can find a "Deduplication" example in the [examples](./examples/) directory. </br>
Run it more than time, the messages count will be always 10.

To retrieve the last sequence id for producer you can use:
```
publishingId, err := producer.GetLastPublishingId()
```

### Sub Entries Batching

The number of messages to put in a sub-entry. A sub-entry is one "slot" in a publishing frame,
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.0-rc.13
1.0.1-rc.1
7 changes: 7 additions & 0 deletions examples/deduplication/deduplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ func main() {
}
}(chConfirm, producer)

// In case you need to know which is the last ID for the producer: GetLastPublishingId
lastPublishingId, err := producer.GetLastPublishingId()
CheckErr(err)
fmt.Printf("lastPublishingId: %d\n",
lastPublishingId,
)

data := make(map[int]string)
data[0] = "Piaggio"
data[1] = "Ferrari"
Expand Down
12 changes: 8 additions & 4 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,8 @@ func (c *Client) internalDeclarePublisher(streamName string, producer *Producer)
res := c.handleWrite(b.Bytes(), resp)

if publisherReferenceSize > 0 {
producer.sequence = c.queryPublisherSequence(producer.options.Name, streamName)
v, _ := c.queryPublisherSequence(producer.options.Name, streamName)
producer.sequence = v
}

return res
Expand Down Expand Up @@ -562,7 +563,7 @@ func (c *Client) metaData(streams ...string) *StreamsMetadata {
return data.(*StreamsMetadata)
}

func (c *Client) queryPublisherSequence(publisherReference string, stream string) int64 {
func (c *Client) queryPublisherSequence(publisherReference string, stream string) (int64, error) {

length := 2 + 2 + 4 + 2 + len(publisherReference) + 2 + len(stream)
resp := c.coordinator.NewResponse(commandQueryPublisherSequence)
Expand All @@ -572,10 +573,13 @@ func (c *Client) queryPublisherSequence(publisherReference string, stream string

writeString(b, publisherReference)
writeString(b, stream)
c.handleWriteWithResponse(b.Bytes(), resp, false)
err := c.handleWriteWithResponse(b.Bytes(), resp, false)
sequence := <-resp.data
_ = c.coordinator.RemoveResponseById(resp.correlationid)
return sequence.(int64)
if err.Err != nil {
return 0, err.Err
}
return sequence.(int64), nil

}

Expand Down
2 changes: 1 addition & 1 deletion pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
const initBufferPublishSize = 2 + 2 + 1 + 4

const (
ClientVersion = "1.0.0-rc.13"
ClientVersion = "1.0.1-rc.1"

commandDeclarePublisher = 1
commandPublish = 2
Expand Down
18 changes: 16 additions & 2 deletions pkg/stream/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,13 @@ var _ = Describe("Streaming Consumers", func() {
})

It("Deduplication", func() {
producer, err := env.NewProducer(streamName, NewProducerOptions().SetProducerName("producer-ded"))
producerName := "producer-ded"
producer, err := env.NewProducer(streamName, NewProducerOptions().SetProducerName(producerName))
Expect(err).NotTo(HaveOccurred())
var arr []message.StreamMessage
for z := 0; z < 10; z++ {
m := amqp.NewMessage([]byte("test_" + strconv.Itoa(z)))
m.SetPublishingId(int64(z * 10))
m.SetPublishingId(int64(z * 10)) // id stored: the last one should be the same on QuerySequence
arr = append(arr, m)
}

Expand Down Expand Up @@ -300,6 +301,19 @@ var _ = Describe("Streaming Consumers", func() {
return atomic.LoadInt32(&messagesReceived)
}, 5*time.Second).Should(Equal(int32(10)),
"consumer should receive only 10 messages")

Eventually(func() int64 {
v, _ := env.QuerySequence(producerName, streamName)
return v
}, 5*time.Second).Should(Equal(int64(90)),
"QuerySequence should give the last id: 90")

Eventually(func() int64 {
v, _ := producer.GetLastPublishingId()
return v
}, 5*time.Second).Should(Equal(int64(90)),
"GetLastPublishingId should give the last id: 90")

Expect(producer.Close()).NotTo(HaveOccurred())
Expect(consumer.Close()).NotTo(HaveOccurred())
})
Expand Down
16 changes: 16 additions & 0 deletions pkg/stream/enviroment.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,22 @@ func (env *Environment) QueryOffset(consumerName string, streamName string) (int
return client.queryOffset(consumerName, streamName)
}

// QuerySequence gets the last id stored for a producer
// you can also see producer.GetLastPublishingId() that is the easier way to get the last-id
func (env *Environment) QuerySequence(publisherReference string, streamName string) (int64, error) {
client, err := env.newReconnectClient()
defer func(client *Client) {
err := client.Close()
if err != nil {
return
}
}(client)
if err != nil {
return 0, err
}
return client.queryPublisherSequence(publisherReference, streamName)
}

func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, error) {
client, err := env.newReconnectClient()
defer func(client *Client) {
Expand Down
14 changes: 14 additions & 0 deletions pkg/stream/enviroment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,20 @@ var _ = Describe("Environment test", func() {

})

Describe("Validation Query Offset/Sequence", func() {

env, err := NewEnvironment(NewEnvironmentOptions())
Expect(err).NotTo(HaveOccurred())
_, err = env.QuerySequence("my_prod",
"Stream_Doesnt_exist")
Expect(err).To(HaveOccurred())

_, err = env.QueryOffset("my_cons",
"Stream_Doesnt_exist")
Expect(err).To(HaveOccurred())
Expect(env.Close()).NotTo(HaveOccurred())
})

Describe("Stream Existing/Meta data", func() {

env, err := NewEnvironment(NewEnvironmentOptions().SetPort(5552).
Expand Down
3 changes: 3 additions & 0 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,9 @@ func (producer *Producer) FlushUnConfirmedMessages() {
producer.mutex.Unlock()
}

func (producer *Producer) GetLastPublishingId() (int64, error) {
return producer.options.client.queryPublisherSequence(producer.GetName(), producer.GetStreamName())
}
func (producer *Producer) Close() error {
if producer.getStatus() == closed {
return AlreadyClosed
Expand Down