diff --git a/pulsar/ack_grouping_tracker.go b/pulsar/ack_grouping_tracker.go new file mode 100644 index 0000000000..dbc70f5d70 --- /dev/null +++ b/pulsar/ack_grouping_tracker.go @@ -0,0 +1,298 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "time" + + "github.com/bits-and-blooms/bitset" +) + +type ackGroupingTracker interface { + add(id MessageID) + + addCumulative(id MessageID) + + isDuplicate(id MessageID) bool + + flush() + + flushAndClean() + + close() +} + +type ackFlushType int + +const ( + flushOnly ackFlushType = iota + flushAndClean + flushAndClose +) + +func newAckGroupingTracker(options *AckGroupingOptions, + ackIndividual func(id MessageID), + ackCumulative func(id MessageID)) ackGroupingTracker { + if options == nil { + options = &AckGroupingOptions{ + MaxSize: 1000, + MaxTime: 100 * time.Millisecond, + } + } + + if options.MaxSize <= 1 { + return &immediateAckGroupingTracker{ + ackIndividual: ackIndividual, + ackCumulative: ackCumulative, + } + } + + c := &cachedAcks{ + singleAcks: make([]MessageID, options.MaxSize), + pendingAcks: make(map[int64]*bitset.BitSet), + lastCumulativeAck: EarliestMessageID(), + ackIndividual: ackIndividual, + ackCumulative: ackCumulative, + ackList: func(ids []MessageID) { + // TODO: support ack a list of MessageIDs + for _, id := range ids { + ackIndividual(id) + } + }, + } + + timeout := time.NewTicker(time.Hour) + if options.MaxTime > 0 { + timeout = time.NewTicker(options.MaxTime) + } else { + timeout.Stop() + } + t := &timedAckGroupingTracker{ + ackIndividualCh: make(chan MessageID), + ackCumulativeCh: make(chan MessageID), + duplicateIDCh: make(chan MessageID), + duplicateResultCh: make(chan bool), + flushCh: make(chan ackFlushType), + waitFlushCh: make(chan bool), + } + go func() { + for { + select { + case id := <-t.ackIndividualCh: + if c.addAndCheckIfFull(id) { + c.flushIndividualAcks() + if options.MaxTime > 0 { + timeout.Reset(options.MaxTime) + } + } + case id := <-t.ackCumulativeCh: + c.tryUpdateLastCumulativeAck(id) + if options.MaxTime <= 0 { + c.flushCumulativeAck() + } + case id := <-t.duplicateIDCh: + t.duplicateResultCh <- c.isDuplicate(id) + case <-timeout.C: + c.flush() + case ackFlushType := <-t.flushCh: + timeout.Stop() + c.flush() + if ackFlushType == flushAndClean { + c.clean() + } + t.waitFlushCh <- true + if ackFlushType == flushAndClose { + return + } + } + } + }() + return t +} + +type immediateAckGroupingTracker struct { + ackIndividual func(id MessageID) + ackCumulative func(id MessageID) +} + +func (i *immediateAckGroupingTracker) add(id MessageID) { + i.ackIndividual(id) +} + +func (i *immediateAckGroupingTracker) addCumulative(id MessageID) { + i.ackCumulative(id) +} + +func (i *immediateAckGroupingTracker) isDuplicate(id MessageID) bool { + return false +} + +func (i *immediateAckGroupingTracker) flush() { +} + +func (i *immediateAckGroupingTracker) flushAndClean() { +} + +func (i *immediateAckGroupingTracker) close() { +} + +type cachedAcks struct { + singleAcks []MessageID + index int + + // Key is the hash code of the ledger id and the netry id, + // Value is the bit set that represents which messages are acknowledged if the entry stores a batch. + // The bit 1 represents the message has been acknowledged, i.e. the bits "111" represents all messages + // in the batch whose batch size is 3 are not acknowledged. + // After the 1st message (i.e. batch index is 0) is acknowledged, the bits will become "011". + // Value is nil if the entry represents a single message. + pendingAcks map[int64]*bitset.BitSet + + lastCumulativeAck MessageID + cumulativeAckRequired bool + + ackIndividual func(id MessageID) + ackCumulative func(id MessageID) + ackList func(ids []MessageID) +} + +func (t *cachedAcks) addAndCheckIfFull(id MessageID) bool { + t.singleAcks[t.index] = id + t.index++ + key := messageIDHash(id) + ackSet, found := t.pendingAcks[key] + if !found { + if messageIDIsBatch(id) { + ackSet = bitset.New(uint(id.BatchSize())) + for i := 0; i < int(id.BatchSize()); i++ { + ackSet.Set(uint(i)) + } + t.pendingAcks[key] = ackSet + } else { + t.pendingAcks[key] = nil + } + } + if ackSet != nil { + ackSet.Clear(uint(id.BatchIdx())) + } + return t.index == len(t.singleAcks) +} + +func (t *cachedAcks) tryUpdateLastCumulativeAck(id MessageID) { + if messageIDCompare(t.lastCumulativeAck, id) < 0 { + t.lastCumulativeAck = id + t.cumulativeAckRequired = true + } +} + +func (t *cachedAcks) isDuplicate(id MessageID) bool { + if messageIDCompare(t.lastCumulativeAck, id) >= 0 { + return true + } + ackSet, found := t.pendingAcks[messageIDHash(id)] + if !found { + return false + } + if ackSet == nil || !messageIDIsBatch(id) { + // NOTE: should we panic when ackSet != nil and messageIDIsBatch(id) is true? + return true + } + // 0 represents the message has been acknowledged + return !ackSet.Test(uint(id.BatchIdx())) +} + +func (t *cachedAcks) flushIndividualAcks() { + if t.index > 0 { + t.ackList(t.singleAcks[0:t.index]) + for _, id := range t.singleAcks[0:t.index] { + key := messageIDHash(id) + ackSet, found := t.pendingAcks[key] + if !found { + continue + } + if ackSet == nil { + delete(t.pendingAcks, key) + } else { + ackSet.Clear(uint(id.BatchIdx())) + if ackSet.None() { // all messages have been acknowledged + delete(t.pendingAcks, key) + } + } + delete(t.pendingAcks, messageIDHash(id)) + } + t.index = 0 + } +} + +func (t *cachedAcks) flushCumulativeAck() { + if t.cumulativeAckRequired { + t.ackCumulative(t.lastCumulativeAck) + t.cumulativeAckRequired = false + } +} + +func (t *cachedAcks) flush() { + t.flushIndividualAcks() + t.flushCumulativeAck() +} + +func (t *cachedAcks) clean() { + maxSize := len(t.singleAcks) + t.singleAcks = make([]MessageID, maxSize) + t.index = 0 + t.pendingAcks = make(map[int64]*bitset.BitSet) + t.lastCumulativeAck = EarliestMessageID() + t.cumulativeAckRequired = false +} + +type timedAckGroupingTracker struct { + ackIndividualCh chan MessageID + ackCumulativeCh chan MessageID + duplicateIDCh chan MessageID + duplicateResultCh chan bool + flushCh chan ackFlushType + waitFlushCh chan bool +} + +func (t *timedAckGroupingTracker) add(id MessageID) { + t.ackIndividualCh <- id +} + +func (t *timedAckGroupingTracker) addCumulative(id MessageID) { + t.ackCumulativeCh <- id +} + +func (t *timedAckGroupingTracker) isDuplicate(id MessageID) bool { + t.duplicateIDCh <- id + return <-t.duplicateResultCh +} + +func (t *timedAckGroupingTracker) flush() { + t.flushCh <- flushOnly + <-t.waitFlushCh +} + +func (t *timedAckGroupingTracker) flushAndClean() { + t.flushCh <- flushAndClean + <-t.waitFlushCh +} + +func (t *timedAckGroupingTracker) close() { + t.flushCh <- flushAndClose + <-t.waitFlushCh +} diff --git a/pulsar/ack_grouping_tracker_test.go b/pulsar/ack_grouping_tracker_test.go new file mode 100644 index 0000000000..d7903e8f93 --- /dev/null +++ b/pulsar/ack_grouping_tracker_test.go @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNoCacheTracker(t *testing.T) { + tests := []AckGroupingOptions{ + { + MaxSize: 0, + MaxTime: 10 * time.Hour, + }, + { + MaxSize: 1, + MaxTime: 10 * time.Hour, + }, + } + for _, option := range tests { + t.Run(fmt.Sprintf("TestAckImmediately_size_%v_time_%vs", option.MaxSize, option.MaxTime.Seconds()), + func(t *testing.T) { + ledgerID0 := int64(-1) + ledgerID1 := int64(-1) + tracker := newAckGroupingTracker(&option, + func(id MessageID) { ledgerID0 = id.LedgerID() }, + func(id MessageID) { ledgerID1 = id.LedgerID() }) + + tracker.add(&messageID{ledgerID: 1}) + assert.Equal(t, atomic.LoadInt64(&ledgerID0), int64(1)) + tracker.addCumulative(&messageID{ledgerID: 2}) + assert.Equal(t, atomic.LoadInt64(&ledgerID1), int64(2)) + }) + } +} + +type mockAcker struct { + sync.Mutex + ledgerIDs []int64 + cumulativeLedgerID int64 +} + +func (a *mockAcker) ack(id MessageID) { + defer a.Unlock() + a.Lock() + a.ledgerIDs = append(a.ledgerIDs, id.LedgerID()) +} + +func (a *mockAcker) ackCumulative(id MessageID) { + atomic.StoreInt64(&a.cumulativeLedgerID, id.LedgerID()) +} + +func (a *mockAcker) getLedgerIDs() []int64 { + defer a.Unlock() + a.Lock() + return a.ledgerIDs +} + +func (a *mockAcker) getCumulativeLedgerID() int64 { + return atomic.LoadInt64(&a.cumulativeLedgerID) +} + +func (a *mockAcker) reset() { + a.ledgerIDs = make([]int64, 0) + a.cumulativeLedgerID = int64(0) +} + +func TestCachedTracker(t *testing.T) { + var acker mockAcker + tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, MaxTime: 0}, + func(id MessageID) { acker.ack(id) }, func(id MessageID) { acker.ackCumulative(id) }) + + tracker.add(&messageID{ledgerID: 1}) + tracker.add(&messageID{ledgerID: 2}) + for i := 1; i <= 2; i++ { + assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(i)})) + } + assert.Equal(t, 0, len(acker.getLedgerIDs())) + tracker.add(&messageID{ledgerID: 3}) + assert.Eventually(t, func() bool { return len(acker.getLedgerIDs()) > 0 }, + 10*time.Millisecond, 2*time.Millisecond) + assert.Equal(t, []int64{1, 2, 3}, acker.getLedgerIDs()) + for i := 1; i <= 3; i++ { + assert.False(t, tracker.isDuplicate(&messageID{ledgerID: int64(i)})) + } + + tracker.add(&messageID{ledgerID: 4}) + // 4 won't be added because the cache is not full + assert.Equal(t, []int64{1, 2, 3}, acker.getLedgerIDs()) + + assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 5})) + tracker.addCumulative(&messageID{ledgerID: 5}) + for i := 0; i <= 5; i++ { + assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(i)})) + } + assert.Equal(t, int64(5), acker.getCumulativeLedgerID()) + assert.False(t, tracker.isDuplicate(&messageID{ledgerID: int64(6)})) + + tracker.flush() + assert.Eventually(t, func() bool { return len(acker.getLedgerIDs()) > 3 }, + 10*time.Millisecond, 2*time.Millisecond) + assert.Equal(t, []int64{1, 2, 3, 4}, acker.getLedgerIDs()) +} + +func TestTimedTrackerIndividualAck(t *testing.T) { + var acker mockAcker + // MaxSize: 1000, MaxTime: 100ms + tracker := newAckGroupingTracker(nil, func(id MessageID) { acker.ack(id) }, nil) + + expected := make([]int64, 0) + for i := 0; i < 999; i++ { + tracker.add(&messageID{ledgerID: int64(i)}) + expected = append(expected, int64(i)) + } + assert.Equal(t, 0, len(acker.getLedgerIDs())) + + // case 1: flush because the tracker timed out + assert.Eventually(t, func() bool { return len(acker.getLedgerIDs()) == 999 }, + 150*time.Millisecond, 10*time.Millisecond) + assert.Equal(t, expected, acker.getLedgerIDs()) + + // case 2: flush because cache is full + time.Sleep(50) // see case 3 + acker.reset() + expected = append(expected, 999) + for i := 0; i < 1001; i++ { + tracker.add(&messageID{ledgerID: int64(i)}) + } + assert.Equal(t, expected, acker.getLedgerIDs()) + + // case 3: flush will reset the timer + start := time.Now() + assert.Eventually(t, func() bool { return len(acker.getLedgerIDs()) > 1000 }, + 150*time.Millisecond, 10*time.Millisecond) + elapsed := time.Since(start) + assert.GreaterOrEqual(t, elapsed, int64(100), "elapsed", elapsed) + assert.Equal(t, append(expected, 1000), acker.getLedgerIDs()) +} + +func TestTimedTrackerCumulativeAck(t *testing.T) { + var acker mockAcker + // MaxTime is 100ms + tracker := newAckGroupingTracker(nil, nil, func(id MessageID) { acker.ackCumulative(id) }) + + // case 1: flush because of the timeout + tracker.addCumulative(&messageID{ledgerID: 1}) + assert.NotEqual(t, int64(1), acker.getCumulativeLedgerID()) + assert.Eventually(t, func() bool { return acker.getCumulativeLedgerID() == int64(1) }, + 150*time.Millisecond, 10*time.Millisecond) + assert.Equal(t, int64(1), acker.getCumulativeLedgerID()) + + // case 2: flush manually + tracker.addCumulative(&messageID{ledgerID: 2}) + tracker.flush() + assert.Equal(t, int64(2), acker.getCumulativeLedgerID()) + + // case 3: older MessageID cannot be acknowledged + tracker.addCumulative(&messageID{ledgerID: 1}) + tracker.flush() + assert.Equal(t, int64(2), acker.getCumulativeLedgerID()) +} + +func TestTimedTrackerIsDuplicate(t *testing.T) { + tracker := newAckGroupingTracker(nil, func(id MessageID) {}, func(id MessageID) {}) + + tracker.add(messageID{batchIdx: 0, batchSize: 3}) + tracker.add(messageID{batchIdx: 2, batchSize: 3}) + assert.True(t, tracker.isDuplicate(messageID{batchIdx: 0, batchSize: 3})) + assert.False(t, tracker.isDuplicate(messageID{batchIdx: 1, batchSize: 3})) + assert.True(t, tracker.isDuplicate(messageID{batchIdx: 2, batchSize: 3})) + + tracker.flush() + assert.False(t, tracker.isDuplicate(messageID{batchIdx: 0, batchSize: 3})) + assert.False(t, tracker.isDuplicate(messageID{batchIdx: 1, batchSize: 3})) + assert.False(t, tracker.isDuplicate(messageID{batchIdx: 2, batchSize: 3})) +} diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 8bae57d950..9576d7aabc 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -80,6 +80,20 @@ type DLQPolicy struct { RetryLetterTopic string } +// AckGroupingOptions controls how to group ACK requests +// If maxSize is 0 or 1, any ACK request will be sent immediately. +// Otherwise, the ACK requests will be cached until one of the following conditions meets: +// 1. There are `MaxSize` pending ACK requests. +// 2. `MaxTime` is greater than 1 microsecond and ACK requests have been cached for `maxTime`. +// Specially, for cumulative acknowledgment, only the latest ACK is cached and it will only be sent after `MaxTime`. +type AckGroupingOptions struct { + // The maximum number of ACK requests to cache + MaxSize uint32 + + // The maximum time to cache ACK requests + MaxTime time.Duration +} + // ConsumerOptions is used to configure and create instances of Consumer. type ConsumerOptions struct { // Topic specifies the topic this consumer will subscribe on. @@ -215,6 +229,13 @@ type ConsumerOptions struct { // Enable or disable batch index acknowledgment. To enable this feature, ensure batch index acknowledgment // is enabled on the broker side. (default: false) EnableBatchIndexAcknowledgment bool + + // Controls how to group ACK requests, the default value is nil, which means: + // MaxSize: 1000 + // MaxTime: 100*time.Millisecond + // NOTE: This option does not work if AckWithResponse is true + // because there are only synchronous APIs for acknowledgment + AckGroupingOptions *AckGroupingOptions } // Consumer is an interface that abstracts behavior of Pulsar's consumer diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index bf136c8e83..8ee1822c1b 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -398,6 +398,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { autoAckIncompleteChunk: c.options.AutoAckIncompleteChunk, consumerEventListener: c.options.EventListener, enableBatchIndexAck: c.options.EnableBatchIndexAcknowledgment, + ackGroupingOptions: c.options.AckGroupingOptions, } cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics) ch <- ConsumerError{ diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index e723f8afa6..95a8d3240d 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -116,6 +116,7 @@ type partitionConsumerOpts struct { // in failover mode, this callback will be called when consumer change consumerEventListener ConsumerEventListener enableBatchIndexAck bool + ackGroupingOptions *AckGroupingOptions } type ConsumerEventListener interface { @@ -167,6 +168,7 @@ type partitionConsumer struct { chunkedMsgCtxMap *chunkedMsgCtxMap unAckChunksTracker *unAckChunksTracker + ackGroupingTracker ackGroupingTracker } func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) { @@ -310,6 +312,9 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon pc.availablePermits = &availablePermits{pc: pc} pc.chunkedMsgCtxMap = newChunkedMsgCtxMap(options.maxPendingChunkedMessage, pc) pc.unAckChunksTracker = newUnAckChunksTracker(pc) + pc.ackGroupingTracker = newAckGroupingTracker(options.ackGroupingOptions, + func(id MessageID) { pc.sendIndividualAck(id) }, + func(id MessageID) { pc.sendCumulativeAck(id) }) pc.setConsumerState(consumerInit) pc.log = client.log.SubLogger(log.Fields{ "name": pc.name, @@ -467,30 +472,35 @@ func (pc *partitionConsumer) ackID(msgID MessageID, withResponse bool) error { return errors.New("failed to convert trackingMessageID") } - ackReq := new(ackRequest) - ackReq.doneCh = make(chan struct{}) - ackReq.ackType = individualAck if !trackingID.Undefined() && trackingID.ack() { pc.metrics.AcksCounter.Inc() pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9) - ackReq.msgID = trackingID - // send ack request to eventsCh - pc.eventsCh <- ackReq - - if withResponse { - <-ackReq.doneCh - } - - pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) - } else if pc.options.enableBatchIndexAck { - ackReq.msgID = trackingID - pc.eventsCh <- ackReq + } else if !pc.options.enableBatchIndexAck { + return nil } + var ackReq *ackRequest if withResponse { - return ackReq.err + ackReq := pc.sendIndividualAck(&trackingID) + <-ackReq.doneCh + } else { + pc.ackGroupingTracker.add(&trackingID) } - return nil + pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) + if ackReq == nil { + return nil + } + return ackReq.err +} + +func (pc *partitionConsumer) sendIndividualAck(msgID MessageID) *ackRequest { + ackReq := &ackRequest{ + doneCh: make(chan struct{}), + ackType: individualAck, + msgID: *msgID.(*trackingMessageID), + } + pc.eventsCh <- ackReq + return ackReq } func (pc *partitionConsumer) AckIDWithResponse(msgID MessageID) error { @@ -524,14 +534,12 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withRespon return nil } - ackReq := new(ackRequest) - ackReq.doneCh = make(chan struct{}) - ackReq.ackType = cumulativeAck + var msgIDToAck trackingMessageID if trackingID.ackCumulative() || pc.options.enableBatchIndexAck { - ackReq.msgID = trackingID + msgIDToAck = trackingID } else if !trackingID.tracker.hasPrevBatchAcked() { // get previous batch message id - ackReq.msgID = trackingID.prev() + msgIDToAck = trackingID.prev() trackingID.tracker.setPrevBatchAcked() } else { // waiting for all the msgs are acked in this batch @@ -540,12 +548,13 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withRespon pc.metrics.AcksCounter.Inc() pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9) - // send ack request to eventsCh - pc.eventsCh <- ackReq + var ackReq *ackRequest if withResponse { - // wait for the request to complete if withResponse set true + ackReq := pc.sendCumulativeAck(&msgIDToAck) <-ackReq.doneCh + } else { + pc.ackGroupingTracker.addCumulative(&msgIDToAck) } pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) @@ -554,7 +563,20 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withRespon pc.unAckChunksTracker.remove(cmid) } - return nil + if ackReq == nil { + return nil + } + return ackReq.err +} + +func (pc *partitionConsumer) sendCumulativeAck(msgID MessageID) *ackRequest { + ackReq := &ackRequest{ + doneCh: make(chan struct{}), + ackType: cumulativeAck, + msgID: *msgID.(*trackingMessageID), + } + pc.eventsCh <- ackReq + return ackReq } func (pc *partitionConsumer) NackID(msgID MessageID) { @@ -631,6 +653,9 @@ func (pc *partitionConsumer) Close() { return } + // flush all pending ACK requests and terminate the timer goroutine + pc.ackGroupingTracker.close() + // close chunkedMsgCtxMap pc.chunkedMsgCtxMap.Close() @@ -658,6 +683,7 @@ func (pc *partitionConsumer) Seek(msgID MessageID) error { return errors.New("unhandled messageID type") } + pc.ackGroupingTracker.flushAndClean() pc.eventsCh <- req // wait for the request to complete @@ -715,6 +741,7 @@ func (pc *partitionConsumer) SeekByTime(time time.Time) error { doneCh: make(chan struct{}), publishTime: time, } + pc.ackGroupingTracker.flushAndClean() pc.eventsCh <- req // wait for the request to complete @@ -957,6 +984,10 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header msgID = trackingMsgID } + if pc.ackGroupingTracker.isDuplicate(msgID) { + continue + } + var messageIndex *uint64 var brokerPublishTime *time.Time if brokerMetadata != nil { diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go index fd50b703fb..b9a9a02c3f 100644 --- a/pulsar/consumer_partition_test.go +++ b/pulsar/consumer_partition_test.go @@ -37,6 +37,8 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) { metrics: newTestMetrics(), decryptor: crypto.NewNoopDecryptor(), } + pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0}, + func(id MessageID) { pc.sendIndividualAck(id) }, nil) headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage) if err := pc.MessageReceived(nil, headersAndPayload); err != nil { @@ -73,6 +75,8 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) { metrics: newTestMetrics(), decryptor: crypto.NewNoopDecryptor(), } + pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0}, + func(id MessageID) { pc.sendIndividualAck(id) }, nil) headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1) if err := pc.MessageReceived(nil, headersAndPayload); err != nil { @@ -105,6 +109,8 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) { metrics: newTestMetrics(), decryptor: crypto.NewNoopDecryptor(), } + pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0}, + func(id MessageID) { pc.sendIndividualAck(id) }, nil) headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10) if err := pc.MessageReceived(nil, headersAndPayload); err != nil { diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 0eb7aae572..de90c0e7dd 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -3853,37 +3853,38 @@ func TestAckWithMessageID(t *testing.T) { } func TestBatchIndexAck(t *testing.T) { - tests := []struct { - AckWithResponse bool - Cumulative bool - }{ - { - AckWithResponse: true, - Cumulative: true, - }, - { - AckWithResponse: true, - Cumulative: false, - }, - { - AckWithResponse: false, - Cumulative: true, - }, - { - AckWithResponse: false, - Cumulative: false, - }, - } - for _, params := range tests { - t.Run(fmt.Sprintf("TestBatchIndexAck_WithResponse_%v_Cumulative_%v", - params.AckWithResponse, params.Cumulative), + type config struct { + ackWithResponse bool + cumulative bool + ackGroupingOptions *AckGroupingOptions + } + configs := make([]config, 0) + for _, option := range []*AckGroupingOptions{ + nil, // MaxSize: 1000, MaxTime: 10ms + {MaxSize: 0, MaxTime: 0}, + {MaxSize: 1000, MaxTime: 0}, + } { + configs = append(configs, config{true, true, option}) + configs = append(configs, config{true, false, option}) + configs = append(configs, config{false, true, option}) + configs = append(configs, config{false, false, option}) + } + + for _, params := range configs { + option := params.ackGroupingOptions + if option == nil { + option = &AckGroupingOptions{1000, 10 * time.Millisecond} + } + + t.Run(fmt.Sprintf("TestBatchIndexAck_WithResponse_%v_Cumulative_%v_AckGroupingOption_%v_%v", + params.ackWithResponse, params.cumulative, option.MaxSize, option.MaxTime.Milliseconds()), func(t *testing.T) { - runBatchIndexAckTest(t, params.AckWithResponse, params.Cumulative) + runBatchIndexAckTest(t, params.ackWithResponse, params.cumulative, params.ackGroupingOptions) }) } } -func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool) { +func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, option *AckGroupingOptions) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) @@ -3897,6 +3898,7 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool) { SubscriptionName: "my-sub", AckWithResponse: ackWithResponse, EnableBatchIndexAcknowledgment: true, + AckGroupingOptions: option, }) assert.Nil(t, err) return consumer diff --git a/pulsar/message.go b/pulsar/message.go index d37692b1a4..c44957d18d 100644 --- a/pulsar/message.go +++ b/pulsar/message.go @@ -181,3 +181,42 @@ func EarliestMessageID() MessageID { func LatestMessageID() MessageID { return latestMessageID } + +func messageIDCompare(lhs MessageID, rhs MessageID) int { + if lhs.LedgerID() < rhs.LedgerID() { + return -1 + } else if lhs.LedgerID() > rhs.LedgerID() { + return 1 + } + if lhs.EntryID() < rhs.EntryID() { + return -1 + } else if lhs.EntryID() > rhs.EntryID() { + return 1 + } + // When performing batch index ACK on a batched message whose batch size is N, + // the ACK order should be: + // (ledger, entry, 0) -> (ledger, entry, 1) -> ... -> (ledger, entry, N-1) -> (ledger, entry) + // So we have to treat any MessageID with the batch index precedes the MessageID without the batch index + // if they are in the same entry. + if lhs.BatchIdx() < 0 && rhs.BatchIdx() < 0 { + return 0 + } else if lhs.BatchIdx() >= 0 && rhs.BatchIdx() < 0 { + return -1 + } else if lhs.BatchIdx() < 0 && rhs.BatchIdx() >= 0 { + return 1 + } + if lhs.BatchIdx() < rhs.BatchIdx() { + return -1 + } else if lhs.BatchIdx() > rhs.BatchIdx() { + return 1 + } + return 0 +} + +func messageIDHash(id MessageID) int64 { + return id.LedgerID() + 31*id.EntryID() +} + +func messageIDIsBatch(id MessageID) bool { + return id.BatchIdx() >= 0 && id.BatchSize() > 0 +}