diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index e3db6707fe..e8542b3a88 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -56,8 +56,8 @@ var ErrConsumerClosed = errors.New("consumer closed") const defaultNackRedeliveryDelay = 1 * time.Minute type acker interface { - AckID(id messageID) - NackID(id messageID) + AckID(id trackingMessageID) + NackID(id trackingMessageID) } type consumer struct { @@ -259,7 +259,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { nackRedeliveryDelay: nackRedeliveryDelay, metadata: metadata, replicateSubscriptionState: c.options.ReplicateSubscriptionState, - startMessageID: messageID{}, + startMessageID: trackingMessageID{}, subscriptionMode: durable, readCompacted: c.options.ReadCompacted, } @@ -483,11 +483,11 @@ func toProtoInitialPosition(p SubscriptionInitialPosition) pb.CommandSubscribe_I return pb.CommandSubscribe_Latest } -func (c *consumer) messageID(msgID MessageID) (messageID, bool) { - mid, ok := msgID.(messageID) +func (c *consumer) messageID(msgID MessageID) (trackingMessageID, bool) { + mid, ok := toTrackingMessageID(msgID) if !ok { c.log.Warnf("invalid message id type %T", msgID) - return messageID{}, false + return trackingMessageID{}, false } partition := int(mid.partitionIdx) @@ -495,7 +495,7 @@ func (c *consumer) messageID(msgID MessageID) (messageID, bool) { if partition < 0 || partition >= len(c.consumers) { c.log.Warnf("invalid partition index %d expected a partition between [0-%d]", partition, len(c.consumers)) - return messageID{}, false + return trackingMessageID{}, false } return mid, true diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index f5264870ec..8d3420377f 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -120,7 +120,7 @@ func (c *multiTopicConsumer) Ack(msg Message) { // Ack the consumption of a single message, identified by its MessageID func (c *multiTopicConsumer) AckID(msgID MessageID) { - mid, ok := msgID.(messageID) + mid, ok := toTrackingMessageID(msgID) if !ok { c.log.Warnf("invalid message id type %T", msgID) return @@ -139,7 +139,7 @@ func (c *multiTopicConsumer) Nack(msg Message) { } func (c *multiTopicConsumer) NackID(msgID MessageID) { - mid, ok := msgID.(messageID) + mid, ok := toTrackingMessageID(msgID) if !ok { c.log.Warnf("invalid message id type %T", msgID) return diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index acf897da8c..1d315f865e 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -109,7 +109,7 @@ type partitionConsumerOpts struct { nackRedeliveryDelay time.Duration metadata map[string]string replicateSubscriptionState bool - startMessageID messageID + startMessageID trackingMessageID startMessageIDInclusive bool subscriptionMode subscriptionMode readCompacted bool @@ -140,13 +140,13 @@ type partitionConsumer struct { // the size of the queue channel for buffering messages queueSize int32 queueCh chan []*message - startMessageID messageID - lastDequeuedMsg messageID + startMessageID trackingMessageID + lastDequeuedMsg trackingMessageID eventsCh chan interface{} connectedCh chan struct{} closeCh chan struct{} - clearQueueCh chan func(id messageID) + clearQueueCh chan func(id trackingMessageID) nackTracker *negativeAcksTracker dlq *dlqRouter @@ -174,7 +174,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon connectedCh: make(chan struct{}), messageCh: messageCh, closeCh: make(chan struct{}), - clearQueueCh: make(chan func(id messageID)), + clearQueueCh: make(chan func(id trackingMessageID)), compressionProviders: make(map[pb.CompressionType]compression.Provider), dlq: dlq, log: log.WithField("topic", options.topic), @@ -238,7 +238,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) { pc.state = consumerClosed } -func (pc *partitionConsumer) getLastMessageID() (messageID, error) { +func (pc *partitionConsumer) getLastMessageID() (trackingMessageID, error) { req := &getLastMsgIDRequest{doneCh: make(chan struct{})} pc.eventsCh <- req @@ -266,8 +266,8 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) } } -func (pc *partitionConsumer) AckID(msgID messageID) { - if !msgID.IsZero() && msgID.ack() { +func (pc *partitionConsumer) AckID(msgID trackingMessageID) { + if !msgID.Undefined() && msgID.ack() { acksCounter.Inc() processingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9) req := &ackRequest{ @@ -277,8 +277,8 @@ func (pc *partitionConsumer) AckID(msgID messageID) { } } -func (pc *partitionConsumer) NackID(msgID messageID) { - pc.nackTracker.Add(msgID) +func (pc *partitionConsumer) NackID(msgID trackingMessageID) { + pc.nackTracker.Add(msgID.messageID) nacksCounter.Inc() } @@ -317,7 +317,7 @@ func (pc *partitionConsumer) Close() { <-req.doneCh } -func (pc *partitionConsumer) Seek(msgID messageID) error { +func (pc *partitionConsumer) Seek(msgID trackingMessageID) error { req := &seekRequest{ doneCh: make(chan struct{}), msgID: msgID, @@ -506,17 +506,17 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header return nil } -func (pc *partitionConsumer) messageShouldBeDiscarded(msgID messageID) bool { - if pc.startMessageID.IsZero() { +func (pc *partitionConsumer) messageShouldBeDiscarded(msgID trackingMessageID) bool { + if pc.startMessageID.Undefined() { return false } if pc.options.startMessageIDInclusive { - return pc.startMessageID.greater(msgID) + return pc.startMessageID.greater(msgID.messageID) } // Non inclusive - return pc.startMessageID.greaterEqual(msgID) + return pc.startMessageID.greaterEqual(msgID.messageID) } func (pc *partitionConsumer) ConnectionClosed() { @@ -631,7 +631,7 @@ func (pc *partitionConsumer) dispatcher() { case clearQueueCb := <-pc.clearQueueCh: // drain the message queue on any new connection by sending a // special nil message to the channel so we know when to stop dropping messages - var nextMessageInQueue messageID + var nextMessageInQueue trackingMessageID go func() { pc.queueCh <- nil }() @@ -639,8 +639,8 @@ func (pc *partitionConsumer) dispatcher() { // the queue has been drained if m == nil { break - } else if nextMessageInQueue.IsZero() { - nextMessageInQueue = m[0].msgID.(messageID) + } else if nextMessageInQueue.Undefined() { + nextMessageInQueue = m[0].msgID.(trackingMessageID) } } @@ -650,7 +650,7 @@ func (pc *partitionConsumer) dispatcher() { } type ackRequest struct { - msgID messageID + msgID trackingMessageID } type unsubscribeRequest struct { @@ -668,13 +668,13 @@ type redeliveryRequest struct { type getLastMsgIDRequest struct { doneCh chan struct{} - msgID messageID + msgID trackingMessageID err error } type seekRequest struct { doneCh chan struct{} - msgID messageID + msgID trackingMessageID err error } @@ -854,15 +854,15 @@ func (pc *partitionConsumer) grabConn() error { } } -func (pc *partitionConsumer) clearQueueAndGetNextMessage() messageID { +func (pc *partitionConsumer) clearQueueAndGetNextMessage() trackingMessageID { if pc.state != consumerReady { - return messageID{} + return trackingMessageID{} } wg := &sync.WaitGroup{} wg.Add(1) - var msgID messageID + var msgID trackingMessageID - pc.clearQueueCh <- func(id messageID) { + pc.clearQueueCh <- func(id trackingMessageID) { msgID = id wg.Done() } @@ -875,12 +875,12 @@ func (pc *partitionConsumer) clearQueueAndGetNextMessage() messageID { * Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was * not seen by the application */ -func (pc *partitionConsumer) clearReceiverQueue() messageID { +func (pc *partitionConsumer) clearReceiverQueue() trackingMessageID { nextMessageInQueue := pc.clearQueueAndGetNextMessage() - if !nextMessageInQueue.IsZero() { + if !nextMessageInQueue.Undefined() { return getPreviousMessage(nextMessageInQueue) - } else if !pc.lastDequeuedMsg.IsZero() { + } else if !pc.lastDequeuedMsg.Undefined() { // If the queue was empty we need to restart from the message just after the last one that has been dequeued // in the past return pc.lastDequeuedMsg @@ -890,22 +890,32 @@ func (pc *partitionConsumer) clearReceiverQueue() messageID { } } -func getPreviousMessage(mid messageID) messageID { +func getPreviousMessage(mid trackingMessageID) trackingMessageID { if mid.batchIdx >= 0 { - return messageID{ - ledgerID: mid.ledgerID, - entryID: mid.entryID, - batchIdx: mid.batchIdx - 1, - partitionIdx: mid.partitionIdx, + return trackingMessageID{ + messageID: messageID{ + ledgerID: mid.ledgerID, + entryID: mid.entryID, + batchIdx: mid.batchIdx - 1, + partitionIdx: mid.partitionIdx, + }, + tracker: mid.tracker, + consumer: mid.consumer, + receivedTime: mid.receivedTime, } } // Get on previous message in previous entry - return messageID{ - ledgerID: mid.ledgerID, - entryID: mid.entryID - 1, - batchIdx: mid.batchIdx, - partitionIdx: mid.partitionIdx, + return trackingMessageID{ + messageID: messageID{ + ledgerID: mid.ledgerID, + entryID: mid.entryID - 1, + batchIdx: mid.batchIdx, + partitionIdx: mid.partitionIdx, + }, + tracker: mid.tracker, + consumer: mid.consumer, + receivedTime: mid.receivedTime, } } @@ -961,8 +971,8 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData, }) } -func convertToMessageIDData(msgID messageID) *pb.MessageIdData { - if msgID.IsZero() { +func convertToMessageIDData(msgID trackingMessageID) *pb.MessageIdData { + if msgID.Undefined() { return nil } @@ -972,14 +982,16 @@ func convertToMessageIDData(msgID messageID) *pb.MessageIdData { } } -func convertToMessageID(id *pb.MessageIdData) messageID { +func convertToMessageID(id *pb.MessageIdData) trackingMessageID { if id == nil { - return messageID{} + return trackingMessageID{} } - msgID := messageID{ - ledgerID: int64(*id.LedgerId), - entryID: int64(*id.EntryId), + msgID := trackingMessageID{ + messageID: messageID{ + ledgerID: int64(*id.LedgerId), + entryID: int64(*id.EntryId), + }, } if id.BatchIndex != nil { msgID.batchIdx = *id.BatchIndex diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go index 0fcbdc5d0c..2e86bd09e8 100644 --- a/pulsar/consumer_partition_test.go +++ b/pulsar/consumer_partition_test.go @@ -44,11 +44,11 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) { // ensure the tracker was set on the message id messages := <-pc.queueCh for _, m := range messages { - assert.Nil(t, m.ID().(messageID).tracker) + assert.Nil(t, m.ID().(trackingMessageID).tracker) } // ack the message id - pc.AckID(messages[0].msgID.(messageID)) + pc.AckID(messages[0].msgID.(trackingMessageID)) select { case <-eventsCh: @@ -73,11 +73,11 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) { // ensure the tracker was set on the message id messages := <-pc.queueCh for _, m := range messages { - assert.Nil(t, m.ID().(messageID).tracker) + assert.Nil(t, m.ID().(trackingMessageID).tracker) } // ack the message id - pc.AckID(messages[0].msgID.(messageID)) + pc.AckID(messages[0].msgID.(trackingMessageID)) select { case <-eventsCh: @@ -102,12 +102,12 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) { // ensure the tracker was set on the message id messages := <-pc.queueCh for _, m := range messages { - assert.NotNil(t, m.ID().(messageID).tracker) + assert.NotNil(t, m.ID().(trackingMessageID).tracker) } // ack all message ids except the last one for i := 0; i < 9; i++ { - pc.AckID(messages[i].msgID.(messageID)) + pc.AckID(messages[i].msgID.(trackingMessageID)) } select { @@ -117,7 +117,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) { } // ack last message - pc.AckID(messages[9].msgID.(messageID)) + pc.AckID(messages[9].msgID.(trackingMessageID)) select { case <-eventsCh: diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index a6dfd56654..e0fdbcbf40 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -165,7 +165,7 @@ func (c *regexConsumer) Ack(msg Message) { // Ack the consumption of a single message, identified by its MessageID func (c *regexConsumer) AckID(msgID MessageID) { - mid, ok := msgID.(messageID) + mid, ok := toTrackingMessageID(msgID) if !ok { c.log.Warnf("invalid message id type %T", msgID) return @@ -184,7 +184,7 @@ func (c *regexConsumer) Nack(msg Message) { } func (c *regexConsumer) NackID(msgID MessageID) { - mid, ok := msgID.(messageID) + mid, ok := toTrackingMessageID(msgID) if !ok { c.log.Warnf("invalid message id type %T", msgID) return diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index 562dfb6439..d670a37841 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -33,17 +33,21 @@ type messageID struct { entryID int64 batchIdx int32 partitionIdx int32 +} + +type trackingMessageID struct { + messageID tracker *ackTracker consumer acker receivedTime time.Time } -func (id messageID) IsZero() bool { - return id == messageID{} +func (id trackingMessageID) Undefined() bool { + return id == trackingMessageID{} } -func (id messageID) Ack() { +func (id trackingMessageID) Ack() { if id.consumer == nil { return } @@ -52,14 +56,14 @@ func (id messageID) Ack() { } } -func (id messageID) Nack() { +func (id trackingMessageID) Nack() { if id.consumer == nil { return } id.consumer.NackID(id) } -func (id messageID) ack() bool { +func (id trackingMessageID) ack() bool { if id.tracker != nil && id.batchIdx > -1 { return id.tracker.ack(int(id.batchIdx)) } @@ -124,17 +128,32 @@ func newMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx in } func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32, - tracker *ackTracker) messageID { - return messageID{ - ledgerID: ledgerID, - entryID: entryID, - batchIdx: batchIdx, - partitionIdx: partitionIdx, + tracker *ackTracker) trackingMessageID { + return trackingMessageID{ + messageID: messageID{ + ledgerID: ledgerID, + entryID: entryID, + batchIdx: batchIdx, + partitionIdx: partitionIdx, + }, tracker: tracker, receivedTime: time.Now(), } } +func toTrackingMessageID(msgID MessageID) (trackingMessageID, bool) { + if mid, ok := msgID.(messageID); ok { + return trackingMessageID{ + messageID: mid, + receivedTime: time.Now(), + }, true + } else if mid, ok := msgID.(trackingMessageID); ok { + return mid, true + } else { + return trackingMessageID{}, false + } +} + func timeFromUnixTimestampMillis(timestamp uint64) time.Time { ts := int64(timestamp) * int64(time.Millisecond) seconds := ts / int64(time.Second) diff --git a/pulsar/impl_message_bench_test.go b/pulsar/impl_message_bench_test.go new file mode 100644 index 0000000000..4b6ca10cf2 --- /dev/null +++ b/pulsar/impl_message_bench_test.go @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "testing" +) + +var ( + usedByProducer messageID + usedByConsumer trackingMessageID +) + +func producerCall(id messageID) messageID { + id.entryID++ + return id +} + +func consumerCall(id trackingMessageID) trackingMessageID { + id.entryID++ + return id +} + +func BenchmarkProducerCall(b *testing.B) { + for i := 0; i < b.N; i++ { + usedByProducer = producerCall(usedByProducer) + } +} + +func BenchmarkConsumerCall(b *testing.B) { + for i := 0; i < b.N; i++ { + usedByConsumer = consumerCall(usedByConsumer) + } +} diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go index 8c6641138d..4e8b64451e 100644 --- a/pulsar/impl_message_test.go +++ b/pulsar/impl_message_test.go @@ -82,7 +82,7 @@ func TestAckingMessageIDBatchOne(t *testing.T) { func TestAckingMessageIDBatchTwo(t *testing.T) { tracker := newAckTracker(2) - ids := []messageID{ + ids := []trackingMessageID{ newTrackingMessageID(1, 1, 0, 0, tracker), newTrackingMessageID(1, 1, 1, 0, tracker), } @@ -93,7 +93,7 @@ func TestAckingMessageIDBatchTwo(t *testing.T) { // try reverse order tracker = newAckTracker(2) - ids = []messageID{ + ids = []trackingMessageID{ newTrackingMessageID(1, 1, 0, 0, tracker), newTrackingMessageID(1, 1, 1, 0, tracker), } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 8d389cbf4c..ff98af9bcd 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -322,7 +322,7 @@ func TestFlushInProducer(t *testing.T) { assert.Nil(t, err) msgCount++ - msgID := msg.ID().(messageID) + msgID := msg.ID().(trackingMessageID) // Since messages are batched, they will be sharing the same ledgerId/entryId if ledgerID == -1 { ledgerID = msgID.ledgerID diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index d97cc962c2..474d0db350 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -19,6 +19,8 @@ package pulsar import ( "context" + "fmt" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -45,7 +47,7 @@ var ( type reader struct { pc *partitionConsumer messageCh chan ConsumerMessage - lastMessageInBroker messageID + lastMessageInBroker trackingMessageID log *log.Entry } @@ -59,17 +61,19 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { return nil, newError(ResultInvalidConfiguration, "StartMessageID is required") } - var startMessageID messageID - var ok bool - if startMessageID, ok = options.StartMessageID.(messageID); !ok { - // a custom type satisfying MessageID may not be a messageID + startMessageID, ok := toTrackingMessageID(options.StartMessageID) + if !ok { + // a custom type satisfying MessageID may not be a messageID or trackingMessageID // so re-create messageID using its data deserMsgID, err := deserializeMessageID(options.StartMessageID.Serialize()) if err != nil { return nil, err } // de-serialized MessageID is a messageID - startMessageID = deserMsgID.(messageID) + startMessageID = trackingMessageID{ + messageID: deserMsgID.(messageID), + receivedTime: time.Now(), + } } subscriptionName := options.SubscriptionRolePrefix @@ -134,10 +138,13 @@ func (r *reader) Next(ctx context.Context) (Message, error) { // Acknowledge message immediately because the reader is based on non-durable subscription. When it reconnects, // it will specify the subscription position anyway - msgID := cm.Message.ID().(messageID) - r.pc.lastDequeuedMsg = msgID - r.pc.AckID(msgID) - return cm.Message, nil + msgID := cm.Message.ID() + if mid, ok := toTrackingMessageID(msgID); ok { + r.pc.lastDequeuedMsg = mid + r.pc.AckID(mid) + return cm.Message, nil + } + return nil, fmt.Errorf("invalid message id type %T", msgID) case <-ctx.Done(): return nil, ctx.Err() } @@ -145,7 +152,7 @@ func (r *reader) Next(ctx context.Context) (Message, error) { } func (r *reader) HasNext() bool { - if !r.lastMessageInBroker.IsZero() && r.hasMoreMessages() { + if !r.lastMessageInBroker.Undefined() && r.hasMoreMessages() { return true } @@ -164,16 +171,16 @@ func (r *reader) HasNext() bool { } func (r *reader) hasMoreMessages() bool { - if !r.pc.lastDequeuedMsg.IsZero() { - return r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg) + if !r.pc.lastDequeuedMsg.Undefined() { + return r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID) } if r.pc.options.startMessageIDInclusive { - return r.lastMessageInBroker.greaterEqual(r.pc.startMessageID) + return r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.messageID) } // Non-inclusive - return r.lastMessageInBroker.greater(r.pc.startMessageID) + return r.lastMessageInBroker.greater(r.pc.startMessageID.messageID) } func (r *reader) Close() {