Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable seeking individual topic partitions #829

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,6 @@ type Consumer interface {

// Seek resets the subscription associated with this consumer to a specific message id.
// The message id can either be a specific message or represent the first or last messages in the topic.
//
// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
// seek() on the individual partitions.
Seek(MessageID) error

// SeekByTime resets the subscription associated with this consumer to a specific message publish time.
Expand Down
14 changes: 10 additions & 4 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,15 +575,21 @@ func (c *consumer) Seek(msgID MessageID) error {
c.Lock()
defer c.Unlock()

if len(c.consumers) > 1 {
return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions")
}

mid, ok := c.messageID(msgID)
if !ok {
return nil
}

if mid.partitionIdx < 0 {
return newError(SeekFailed, "partitionIdx is negative")
}
Comment on lines +583 to +585
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this break the non-partitioned topic message acknowledgment?
The partition index -1 means the message from a non-partitioned topic.

@freeznet @wolfstudy Please help confirm

Copy link
Author

@severinson severinson Aug 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that really true? The pre-existing code would do the following:

return c.consumers[mid.partitionIdx].Seek(mid)

This suggests that the partition index is 0 for non-partitioned topics. Otherwise seek() on a non-partitioned topic would've always resulted in an index out of bounds panic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Go SDK, the partition index less than 0 is not allowed.

// did we receive a valid partition index?
	if partition < 0 || partition >= len(c.consumers) {
		c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
			partition, len(c.consumers))
		return trackingMessageID{}, false
	}

And why do we need to modify the current logic, where the number of consumers is equivalent to the number of partitions, and each partition corresponds to a unique consumer.

The Go SDK currently does not support seek operations on partitioned topics, and this logic is still being implemented.

Copy link
Author

@severinson severinson Aug 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments. I have a few questions.

And why do we need to modify the current logic, where the number of consumers is equivalent to the number of partitions, and each partition corresponds to a unique consumer.

I don't understand what you mean. I'm not changing the current logic. I'm just passing through the message id to the partitionConsumer responsible for the partition.

The Go SDK currently does not support seek operations on partitioned topics, and this logic is still being implemented.

All right. Do you know what work still remains to have the Go client support seeking by partition?

if mid.partitionIdx > int32(len(c.consumers)) {
return newError(
SeekFailed,
fmt.Sprintf("partitionIdx is %d, but there are %d partitions", mid.partitionIdx, len(c.consumers)),
)
}

return c.consumers[mid.partitionIdx].Seek(mid)
}

Expand Down
145 changes: 145 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3053,6 +3053,151 @@ func TestEncryptDecryptRedeliveryOnFailure(t *testing.T) {
consumer.Ack(msg)
}

// TestConsumerSeekOnPartitionedTopic test seekin on a partitioned topic.
// It is based on existing test case [TestConsumerSeek] but for a partitioned topic.
func TestConsumerSeekOnPartitionedTopic(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()

// Create topic with 5 partitions
topicAdminURL := "admin/v2/persistent/public/default/TestSeekOnPartitionedTopic/partitions"
err = httpPut(topicAdminURL, 5)
defer httpDelete(topicAdminURL)
assert.Nil(t, err)

topicName := "persistent://public/default/TestSeekOnPartitionedTopic"

partitions, err := client.TopicPartitions(topicName)
assert.Nil(t, err)
assert.Equal(t, len(partitions), 5)
for i := 0; i < 5; i++ {
assert.Equal(t, partitions[i],
fmt.Sprintf("%s-partition-%d", topicName, i))
}

ctx := context.Background()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "my-sub",
})
assert.Nil(t, err)
defer consumer.Close()

// Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10
const N = 1100
var seekID MessageID
for i := 0; i < N; i++ {
id, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Key: "key", // Ensure all messages go to the same partition.
})
assert.Nil(t, err)

if i == N-50 {
seekID = id
}
}

// Don't consume all messages so some stay in queues
for i := 0; i < N-20; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
consumer.Ack(msg)
}

err = consumer.Seek(seekID)
assert.Nil(t, err)

msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", N-50), string(msg.Payload()))
}

// TestConsumerSeekOnPartitionedTopicKeyShared test seekin on a partitioned topic with a KeyShared subscription.
// It is based on existing test case [TestConsumerSeekOnPartitionedTopicKeyShared] but for a KeyShared subscription.
func TestConsumerSeekOnPartitionedTopicKeyShared(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()

// Create topic with 5 partitions
topicAdminURL := "admin/v2/persistent/public/default/TestSeekOnPartitionedTopicKeyShared/partitions"
err = httpPut(topicAdminURL, 5)
defer httpDelete(topicAdminURL)
assert.Nil(t, err)

topicName := "persistent://public/default/TestSeekOnPartitionedTopicKeyShared"

partitions, err := client.TopicPartitions(topicName)
assert.Nil(t, err)
assert.Equal(t, len(partitions), 5)
for i := 0; i < 5; i++ {
assert.Equal(t, partitions[i],
fmt.Sprintf("%s-partition-%d", topicName, i))
}

ctx := context.Background()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "my-sub",
Type: KeyShared,
})
assert.Nil(t, err)
defer consumer.Close()

// Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10
const N = 1100
var seekID MessageID
for i := 0; i < N; i++ {
id, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Key: "key", // Ensure all messages go to the same partition.
})
assert.Nil(t, err)

if i == N-50 {
seekID = id
}
}

// Don't consume all messages so some stay in queues
for i := 0; i < N-20; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
consumer.Ack(msg)
}

err = consumer.Seek(seekID)
assert.Nil(t, err)

msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", N-50), string(msg.Payload()))
}

// TestConsumerSeekByTimeOnPartitionedTopic test seek by time on partitioned topic.
// It is based on existing test case [TestConsumerSeekByTime] but for partitioned topic.
func TestConsumerSeekByTimeOnPartitionedTopic(t *testing.T) {
Expand Down