From 2dfdabde71f23fb06f8f1299f9d1c9670c681840 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 3 Jan 2023 14:26:50 +0800 Subject: [PATCH] [feature] Support batch index ACK Fixes https://github.com/apache/pulsar-client-go/issues/894 ### Modifications Add an `EnableBatchIndexAcknowledgment` to specify whether batch index ACK is enabled. Since this feature requires the conversion between a bit set and its underlying long array, which is similar to Java's `BitSet`, this commit introduces github.com/bits-and-blooms/bitset dependency to replace the `big.Int` based implementation of the bit set. Add a `BatchSize()` method to `MessageId` to indicate the size of the `ack_set` field. When the batch index ACK happens, convert the `[]uint64` to the `[]int64` as the `ack_set` field in `CommandAck`. When receiving messages, convert the `ack_set` field in `CommandMessage` to filter the acknowledged single messages. Remove the duplicated code in `AckID` and `AckIDWithResponse`. ### Verifications `TestBatchIndexAck` is added to cover the case whether `AckWithResponse` is enabled and both individual and cumulative ACK. --- go.mod | 1 + go.sum | 2 + integration-tests/conf/standalone.conf | 2 + pulsar/consumer.go | 4 + pulsar/consumer_impl.go | 1 + pulsar/consumer_partition.go | 80 ++++++++------- pulsar/consumer_test.go | 137 ++++++++++++++++++++++++- pulsar/impl_message.go | 61 +++++++---- pulsar/impl_message_test.go | 16 +-- pulsar/message.go | 7 +- pulsar/producer_partition.go | 12 +++ pulsar/reader_test.go | 4 + 12 files changed, 261 insertions(+), 66 deletions(-) diff --git a/go.mod b/go.mod index 0435db9136..e143b042c9 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/99designs/keyring v1.2.1 github.com/AthenZ/athenz v1.10.39 github.com/DataDog/zstd v1.5.0 + github.com/bits-and-blooms/bitset v1.4.0 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b github.com/davecgh/go-spew v1.1.1 github.com/golang-jwt/jwt v3.2.1+incompatible diff --git a/go.sum b/go.sum index 9dbd99f101..6b07567812 100644 --- a/go.sum +++ b/go.sum @@ -65,6 +65,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/bits-and-blooms/bitset v1.4.0 h1:+YZ8ePm+He2pU3dZlIZiOeAKfrBkXi1lSrXJ/Xzgbu8= +github.com/bits-and-blooms/bitset v1.4.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM= github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE= github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q= diff --git a/integration-tests/conf/standalone.conf b/integration-tests/conf/standalone.conf index 8cd28280ac..b9ff87bedb 100644 --- a/integration-tests/conf/standalone.conf +++ b/integration-tests/conf/standalone.conf @@ -292,3 +292,5 @@ globalZookeeperServers= # Deprecated. Use brokerDeleteInactiveTopicsFrequencySeconds brokerServicePurgeInactiveFrequencyInSeconds=60 + +acknowledgmentAtBatchIndexLevelEnabled=true diff --git a/pulsar/consumer.go b/pulsar/consumer.go index c9f5e89393..8bae57d950 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -211,6 +211,10 @@ type ConsumerOptions struct { // AutoAckIncompleteChunk sets whether consumer auto acknowledges incomplete chunked message when it should // be removed (e.g.the chunked message pending queue is full). (default: false) AutoAckIncompleteChunk bool + + // Enable or disable batch index acknowledgment. To enable this feature, ensure batch index acknowledgment + // is enabled on the broker side. (default: false) + EnableBatchIndexAcknowledgment bool } // Consumer is an interface that abstracts behavior of Pulsar's consumer diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 545b57c225..1ff3c887d9 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -397,6 +397,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { expireTimeOfIncompleteChunk: c.options.ExpireTimeOfIncompleteChunk, autoAckIncompleteChunk: c.options.AutoAckIncompleteChunk, consumerEventListener: c.options.EventListener, + enableBatchIndexAck: c.options.EnableBatchIndexAcknowledgment, } cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics) ch <- ConsumerError{ diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 7f5b6df711..5b43f9b730 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -36,6 +36,7 @@ import ( cryptointernal "github.com/apache/pulsar-client-go/pulsar/internal/crypto" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/bits-and-blooms/bitset" uAtomic "go.uber.org/atomic" ) @@ -114,6 +115,7 @@ type partitionConsumerOpts struct { autoAckIncompleteChunk bool // in failover mode, this callback will be called when consumer change consumerEventListener ConsumerEventListener + enableBatchIndexAck bool } type ConsumerEventListener interface { @@ -434,7 +436,7 @@ func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error return convertToMessageID(id), nil } -func (pc *partitionConsumer) AckIDWithResponse(msgID MessageID) error { +func (pc *partitionConsumer) ackID(msgID MessageID, withResponse bool) error { if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer") return errors.New("consumer state is closed") @@ -458,47 +460,31 @@ func (pc *partitionConsumer) AckIDWithResponse(msgID MessageID) error { ackReq.msgID = trackingID // send ack request to eventsCh pc.eventsCh <- ackReq - // wait for the request to complete - <-ackReq.doneCh - - pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) - } - - return ackReq.err -} - -func (pc *partitionConsumer) AckID(msgID MessageID) error { - if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { - pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer") - return errors.New("consumer state is closed") - } - - if cmid, ok := toChunkedMessageID(msgID); ok { - return pc.unAckChunksTracker.ack(cmid) - } - trackingID, ok := toTrackingMessageID(msgID) - if !ok { - return errors.New("failed to convert trackingMessageID") - } + if withResponse { + <-ackReq.doneCh + } - ackReq := new(ackRequest) - ackReq.doneCh = make(chan struct{}) - ackReq.ackType = individualAck - if !trackingID.Undefined() && trackingID.ack() { - pc.metrics.AcksCounter.Inc() - pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9) + pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) + } else if pc.options.enableBatchIndexAck { ackReq.msgID = trackingID - // send ack request to eventsCh pc.eventsCh <- ackReq - // No need to wait for ackReq.doneCh to finish - - pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) } + if withResponse { + return ackReq.err + } return nil } +func (pc *partitionConsumer) AckIDWithResponse(msgID MessageID) error { + return pc.ackID(msgID, true) +} + +func (pc *partitionConsumer) AckID(msgID MessageID) error { + return pc.ackID(msgID, false) +} + func (pc *partitionConsumer) AckIDCumulative(msgID MessageID) error { return pc.internalAckIDCumulative(msgID, false) } @@ -525,7 +511,7 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withRespon ackReq := new(ackRequest) ackReq.doneCh = make(chan struct{}) ackReq.ackType = cumulativeAck - if trackingID.ackCumulative() { + if trackingID.ackCumulative() || pc.options.enableBatchIndexAck { ackReq.msgID = trackingID } else if !trackingID.tracker.hasPrevBatchAcked() { // get previous batch message id @@ -764,6 +750,12 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) { LedgerId: proto.Uint64(uint64(msgID.ledgerID)), EntryId: proto.Uint64(uint64(msgID.entryID)), } + if pc.options.enableBatchIndexAck && msgID.tracker != nil { + ackSet := msgID.tracker.toAckSet() + if ackSet != nil { + messageIDs[0].AckSet = ackSet + } + } reqID := pc.client.rpcClient.NewRequestID() cmdAck := &pb.CommandAck{ @@ -822,7 +814,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header switch crypToFailureAction { case crypto.ConsumerCryptoFailureActionFail: pc.log.Errorf("consuming message failed due to decryption err :%v", err) - pc.NackID(newTrackingMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), 0, 0, nil)) + pc.NackID(newTrackingMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), 0, 0, 0, nil)) return err case crypto.ConsumerCryptoFailureActionDiscard: pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecryptionError) @@ -842,6 +834,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header int64(pbMsgID.GetEntryId()), pbMsgID.GetBatchIndex(), pc.partitionIdx, + pbMsgID.GetBatchSize(), ), payLoad: headersAndPayload.ReadableSlice(), schema: pc.options.schema, @@ -889,7 +882,17 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header var ackTracker *ackTracker // are there multiple messages in this batch? if numMsgs > 1 { - ackTracker = newAckTracker(numMsgs) + ackTracker = newAckTracker(uint(numMsgs)) + } + + var ackSet *bitset.BitSet + if response.GetAckSet() != nil { + ackSetFromResponse := response.GetAckSet() + buf := make([]uint64, len(ackSetFromResponse)) + for i := 0; i < len(buf); i++ { + buf[i] = uint64(ackSetFromResponse[i]) + } + ackSet = bitset.From(buf) } pc.metrics.MessagesReceived.Add(float64(numMsgs)) @@ -901,6 +904,10 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_BatchDeSerializeError) return err } + if ackSet != nil && !ackSet.Test(uint(i)) { + pc.log.Debugf("Ignoring message from %vth message, which has been acknowledged", i) + continue + } pc.metrics.BytesReceived.Add(float64(len(payload))) pc.metrics.PrefetchedBytes.Add(float64(len(payload))) @@ -910,6 +917,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header int64(pbMsgID.GetEntryId()), int32(i), pc.partitionIdx, + int32(numMsgs), ackTracker) // set the consumer so we know how to ack the message id trackingMsgID.consumer = pc diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 11e72a7240..afa94e4050 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -3841,7 +3841,142 @@ func TestAckWithMessageID(t *testing.T) { assert.Nil(t, err) id := message.ID() - newID := NewMessageID(id.LedgerID(), id.EntryID(), id.BatchIdx(), id.PartitionIdx()) + newID := NewMessageID(id.LedgerID(), id.EntryID(), id.BatchIdx(), id.PartitionIdx(), 0) err = consumer.AckID(newID) assert.Nil(t, err) } + +func TestBatchIndexAck(t *testing.T) { + tests := []struct { + AckWithResponse bool + Cumulative bool + }{ + { + AckWithResponse: true, + Cumulative: true, + }, + { + AckWithResponse: true, + Cumulative: false, + }, + { + AckWithResponse: false, + Cumulative: true, + }, + { + AckWithResponse: false, + Cumulative: false, + }, + } + for _, params := range tests { + t.Run(fmt.Sprintf("TestBatchIndexAck_WithResponse_%v_Cumulative_%v", + params.AckWithResponse, params.Cumulative), + func(t *testing.T) { + runBatchIndexAckTest(t, params.AckWithResponse, params.Cumulative) + }) + } +} + +func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + + topic := newTopicName() + createConsumer := func() Consumer { + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + AckWithResponse: ackWithResponse, + EnableBatchIndexAcknowledgment: true, + }) + assert.Nil(t, err) + return consumer + } + + consumer := createConsumer() + + duration, err := time.ParseDuration("1h") + assert.Nil(t, err) + + const BatchingMaxSize int = 2 * 5 + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + BatchingMaxMessages: uint(BatchingMaxSize), + BatchingMaxSize: uint(1024 * 1024 * 10), + BatchingMaxPublishDelay: duration, + }) + assert.Nil(t, err) + for i := 0; i < BatchingMaxSize; i++ { + producer.SendAsync(context.Background(), &ProducerMessage{ + Payload: []byte(fmt.Sprintf("msg-%d", i)), + }, func(id MessageID, producerMessage *ProducerMessage, err error) { + assert.Nil(t, err) + log.Printf("Sent to %v:%d:%d", id, id.BatchIdx(), id.BatchSize()) + }) + } + assert.Nil(t, producer.Flush()) + + msgIds := make([]MessageID, BatchingMaxSize) + for i := 0; i < BatchingMaxSize; i++ { + message, err := consumer.Receive(context.Background()) + assert.Nil(t, err) + msgIds[i] = message.ID() + log.Printf("Received %v from %v:%d:%d", string(message.Payload()), message.ID(), + message.ID().BatchIdx(), message.ID().BatchSize()) + } + + // Acknowledge half of the messages + if cumulative { + msgID := msgIds[BatchingMaxSize/2-1] + consumer.AckIDCumulative(msgID) + log.Printf("Acknowledge %v:%d cumulatively\n", msgID, msgID.BatchIdx()) + } else { + for i := 0; i < BatchingMaxSize; i++ { + msgID := msgIds[i] + if i%2 == 0 { + consumer.AckID(msgID) + log.Printf("Acknowledge %v:%d\n", msgID, msgID.BatchIdx()) + } + } + } + consumer.Close() + consumer = createConsumer() + + for i := 0; i < BatchingMaxSize/2; i++ { + message, err := consumer.Receive(context.Background()) + assert.Nil(t, err) + log.Printf("Received %v from %v:%d:%d", string(message.Payload()), message.ID(), + message.ID().BatchIdx(), message.ID().BatchSize()) + index := i*2 + 1 + if cumulative { + index = i + BatchingMaxSize/2 + } + assert.Equal(t, []byte(fmt.Sprintf("msg-%d", index)), message.Payload()) + assert.Equal(t, msgIds[index].BatchIdx(), message.ID().BatchIdx()) + // We should not acknowledge message.ID() here because message.ID() shares a different + // tracker with msgIds + if !cumulative { + msgID := msgIds[index] + consumer.AckID(msgID) + log.Printf("Acknowledge %v:%d\n", msgID, msgID.BatchIdx()) + } + } + if cumulative { + msgID := msgIds[BatchingMaxSize-1] + consumer.AckIDCumulative(msgID) + log.Printf("Acknowledge %v:%d cumulatively\n", msgID, msgID.BatchIdx()) + } + consumer.Close() + consumer = createConsumer() + _, err = producer.Send(context.Background(), &ProducerMessage{Payload: []byte("end-marker")}) + assert.Nil(t, err) + msg, err := consumer.Receive(context.Background()) + assert.Nil(t, err) + assert.Equal(t, "end-marker", string(msg.Payload())) + + client.Close() +} diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index d863da9bad..39db8e1cb9 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -21,8 +21,6 @@ import ( "errors" "fmt" "math" - "math/big" - "strings" "sync" "sync/atomic" "time" @@ -30,6 +28,7 @@ import ( "google.golang.org/protobuf/proto" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" + "github.com/bits-and-blooms/bitset" ) type messageID struct { @@ -37,6 +36,7 @@ type messageID struct { entryID int64 batchIdx int32 partitionIdx int32 + batchSize int32 } var latestMessageID = messageID{ @@ -44,6 +44,7 @@ var latestMessageID = messageID{ entryID: math.MaxInt64, batchIdx: -1, partitionIdx: -1, + batchSize: 0, } var earliestMessageID = messageID{ @@ -51,6 +52,7 @@ var earliestMessageID = messageID{ entryID: -1, batchIdx: -1, partitionIdx: -1, + batchSize: 0, } type trackingMessageID struct { @@ -159,6 +161,7 @@ func (id messageID) Serialize() []byte { EntryId: proto.Uint64(uint64(id.entryID)), BatchIndex: proto.Int32(id.batchIdx), Partition: proto.Int32(id.partitionIdx), + BatchSize: proto.Int32(id.batchSize), } data, _ := proto.Marshal(msgID) return data @@ -180,6 +183,10 @@ func (id messageID) PartitionIdx() int32 { return id.partitionIdx } +func (id messageID) BatchSize() int32 { + return id.batchSize +} + func (id messageID) String() string { return fmt.Sprintf("%d:%d:%d", id.ledgerID, id.entryID, id.partitionIdx) } @@ -195,20 +202,22 @@ func deserializeMessageID(data []byte) (MessageID, error) { int64(msgID.GetEntryId()), msgID.GetBatchIndex(), msgID.GetPartition(), + msgID.GetBatchSize(), ) return id, nil } -func newMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32) MessageID { +func newMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32, batchSize int32) MessageID { return messageID{ ledgerID: ledgerID, entryID: entryID, batchIdx: batchIdx, partitionIdx: partitionIdx, + batchSize: batchSize, } } -func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32, +func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32, batchSize int32, tracker *ackTracker) trackingMessageID { return trackingMessageID{ messageID: messageID{ @@ -216,6 +225,7 @@ func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int32, partiti entryID: entryID, batchIdx: batchIdx, partitionIdx: partitionIdx, + batchSize: batchSize, }, tracker: tracker, receivedTime: time.Now(), @@ -370,14 +380,10 @@ func (msg *message) BrokerPublishTime() *time.Time { return msg.brokerPublishTime } -func newAckTracker(size int) *ackTracker { - var batchIDs *big.Int - if size <= 64 { - shift := uint32(64 - size) - setBits := ^uint64(0) >> shift - batchIDs = new(big.Int).SetUint64(setBits) - } else { - batchIDs, _ = new(big.Int).SetString(strings.Repeat("1", size), 2) +func newAckTracker(size uint) *ackTracker { + batchIDs := bitset.New(size) + for i := uint(0); i < size; i++ { + batchIDs.Set(i) } return &ackTracker{ size: size, @@ -387,8 +393,8 @@ func newAckTracker(size int) *ackTracker { type ackTracker struct { sync.Mutex - size int - batchIDs *big.Int + size uint + batchIDs *bitset.BitSet prevBatchAcked uint32 } @@ -398,19 +404,20 @@ func (t *ackTracker) ack(batchID int) bool { } t.Lock() defer t.Unlock() - t.batchIDs = t.batchIDs.SetBit(t.batchIDs, batchID, 0) - return len(t.batchIDs.Bits()) == 0 + t.batchIDs.Clear(uint(batchID)) + return t.batchIDs.None() } func (t *ackTracker) ackCumulative(batchID int) bool { if batchID < 0 { return true } - mask := big.NewInt(-1) t.Lock() defer t.Unlock() - t.batchIDs.And(t.batchIDs, mask.Lsh(mask, uint(batchID+1))) - return len(t.batchIDs.Bits()) == 0 + for i := 0; i <= batchID; i++ { + t.batchIDs.Clear(uint(i)) + } + return t.batchIDs.None() } func (t *ackTracker) hasPrevBatchAcked() bool { @@ -424,7 +431,21 @@ func (t *ackTracker) setPrevBatchAcked() { func (t *ackTracker) completed() bool { t.Lock() defer t.Unlock() - return len(t.batchIDs.Bits()) == 0 + return t.batchIDs.None() +} + +func (t *ackTracker) toAckSet() []int64 { + t.Lock() + defer t.Unlock() + if t.batchIDs.None() { + return nil + } + bytes := t.batchIDs.Bytes() + ackSet := make([]int64, len(bytes)) + for i := 0; i < len(bytes); i++ { + ackSet[i] = int64(bytes[i]) + } + return ackSet } type chunkMessageID struct { diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go index 89aab8a982..413a39fc37 100644 --- a/pulsar/impl_message_test.go +++ b/pulsar/impl_message_test.go @@ -24,7 +24,7 @@ import ( ) func TestMessageId(t *testing.T) { - id := newMessageID(1, 2, 3, 4) + id := newMessageID(1, 2, 3, 4, 5) bytes := id.Serialize() id2, err := DeserializeMessageID(bytes) @@ -35,6 +35,7 @@ func TestMessageId(t *testing.T) { assert.Equal(t, int64(2), id2.(messageID).entryID) assert.Equal(t, int32(3), id2.(messageID).batchIdx) assert.Equal(t, int32(4), id2.(messageID).partitionIdx) + assert.Equal(t, int32(5), id2.(messageID).batchSize) id, err = DeserializeMessageID(nil) assert.Error(t, err) @@ -47,11 +48,12 @@ func TestMessageId(t *testing.T) { func TestMessageIdGetFuncs(t *testing.T) { // test LedgerId,EntryId,BatchIdx,PartitionIdx - id := newMessageID(1, 2, 3, 4) + id := newMessageID(1, 2, 3, 4, 5) assert.Equal(t, int64(1), id.LedgerID()) assert.Equal(t, int64(2), id.EntryID()) assert.Equal(t, int32(3), id.BatchIdx()) assert.Equal(t, int32(4), id.PartitionIdx()) + assert.Equal(t, int32(5), id.BatchSize()) } func TestAckTracker(t *testing.T) { @@ -101,7 +103,7 @@ func TestAckTracker(t *testing.T) { func TestAckingMessageIDBatchOne(t *testing.T) { tracker := newAckTracker(1) - msgID := newTrackingMessageID(1, 1, 0, 0, tracker) + msgID := newTrackingMessageID(1, 1, 0, 0, 0, tracker) assert.Equal(t, true, msgID.ack()) assert.Equal(t, true, tracker.completed()) } @@ -109,8 +111,8 @@ func TestAckingMessageIDBatchOne(t *testing.T) { func TestAckingMessageIDBatchTwo(t *testing.T) { tracker := newAckTracker(2) ids := []trackingMessageID{ - newTrackingMessageID(1, 1, 0, 0, tracker), - newTrackingMessageID(1, 1, 1, 0, tracker), + newTrackingMessageID(1, 1, 0, 0, 0, tracker), + newTrackingMessageID(1, 1, 1, 0, 0, tracker), } assert.Equal(t, false, ids[0].ack()) @@ -120,8 +122,8 @@ func TestAckingMessageIDBatchTwo(t *testing.T) { // try reverse order tracker = newAckTracker(2) ids = []trackingMessageID{ - newTrackingMessageID(1, 1, 0, 0, tracker), - newTrackingMessageID(1, 1, 1, 0, tracker), + newTrackingMessageID(1, 1, 0, 0, 0, tracker), + newTrackingMessageID(1, 1, 1, 0, 0, tracker), } assert.Equal(t, false, ids[1].ack()) assert.Equal(t, true, ids[0].ack()) diff --git a/pulsar/message.go b/pulsar/message.go index 7f2e07f8b6..505ca23bc5 100644 --- a/pulsar/message.go +++ b/pulsar/message.go @@ -154,6 +154,9 @@ type MessageID interface { // PartitionIdx returns the message partitionIdx PartitionIdx() int32 + + // BatchSize returns 0 or the batch size, which must be greater than BatchIdx() + BatchSize() int32 } // DeserializeMessageID reconstruct a MessageID object from its serialized representation @@ -162,8 +165,8 @@ func DeserializeMessageID(data []byte) (MessageID, error) { } // NewMessageID Custom Create MessageID -func NewMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32) MessageID { - return newMessageID(ledgerID, entryID, batchIdx, partitionIdx) +func NewMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32, batchSize int32) MessageID { + return newMessageID(ledgerID, entryID, batchIdx, partitionIdx, batchSize) } // EarliestMessageID returns a messageID that points to the earliest message available in a topic diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 6308b55fc6..b0467f5409 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1118,6 +1118,15 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) pi.Lock() defer pi.Unlock() p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9) + batchSize := int32(0) + for _, i := range pi.sendRequests { + sr := i.(*sendRequest) + if sr.msg != nil { + batchSize = batchSize + 1 + } else { // Flush request + break + } + } for idx, i := range pi.sendRequests { sr := i.(*sendRequest) if sr.msg != nil { @@ -1138,6 +1147,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) int64(response.MessageId.GetEntryId()), int32(idx), p.partitionIdx, + batchSize, ) if sr.totalChunks > 1 { @@ -1148,6 +1158,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) int64(response.MessageId.GetEntryId()), -1, p.partitionIdx, + 0, }) } else if sr.chunkID == sr.totalChunks-1 { sr.chunkRecorder.setLastChunkID( @@ -1156,6 +1167,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) int64(response.MessageId.GetEntryId()), -1, p.partitionIdx, + 0, }) // use chunkMsgID to set msgID msgID = sr.chunkRecorder.chunkedMsgID diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 07e7ed3c9f..9aa737d384 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -426,6 +426,10 @@ func (id *myMessageID) BatchIdx() int32 { return id.BatchIdx() } +func (id *myMessageID) BatchSize() int32 { + return id.BatchSize() +} + func (id *myMessageID) PartitionIdx() int32 { return id.PartitionIdx() }