From a04157da5f3fc98911308a4153a8995464482862 Mon Sep 17 00:00:00 2001 From: xiaolongran Date: Wed, 25 May 2022 11:25:18 +0800 Subject: [PATCH 1/4] Introduce doneCh for ack error Signed-off-by: xiaolongran --- pulsar/consumer_partition.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 06ccfd5dfa..90e5a0232e 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -325,6 +325,7 @@ func (pc *partitionConsumer) AckID(msgID trackingMessageID) error { } ackReq := new(ackRequest) + ackReq.doneCh = make(chan struct{}) if !msgID.Undefined() && msgID.ack() { pc.metrics.AcksCounter.Inc() pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9) @@ -335,6 +336,8 @@ func (pc *partitionConsumer) AckID(msgID trackingMessageID) error { pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) } + // wait for the request to complete + <-ackReq.doneCh return ackReq.err } @@ -514,6 +517,7 @@ func (pc *partitionConsumer) clearMessageChannels() { } func (pc *partitionConsumer) internalAck(req *ackRequest) { + defer close(req.doneCh) if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer") return @@ -933,8 +937,9 @@ func (pc *partitionConsumer) dispatcher() { } type ackRequest struct { - msgID trackingMessageID - err error + doneCh chan struct{} + msgID trackingMessageID + err error } type unsubscribeRequest struct { From a9b70dfbd7053763d698fd150c1d97cfabb38077 Mon Sep 17 00:00:00 2001 From: xiaolongran Date: Wed, 25 May 2022 16:55:07 +0800 Subject: [PATCH 2/4] remove consumer partition test file Signed-off-by: xiaolongran --- pulsar/consumer_partition.go | 4 +- pulsar/consumer_partition_test.go | 211 ------------------------------ 2 files changed, 2 insertions(+), 213 deletions(-) delete mode 100644 pulsar/consumer_partition_test.go diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 90e5a0232e..3abb26577b 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -332,12 +332,12 @@ func (pc *partitionConsumer) AckID(msgID trackingMessageID) error { ackReq.msgID = msgID // send ack request to eventsCh pc.eventsCh <- ackReq + // wait for the request to complete + <-ackReq.doneCh pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) } - // wait for the request to complete - <-ackReq.doneCh return ackReq.err } diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go deleted file mode 100644 index 2255e52e8b..0000000000 --- a/pulsar/consumer_partition_test.go +++ /dev/null @@ -1,211 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package pulsar - -import ( - "sync" - "testing" - - "github.com/apache/pulsar-client-go/pulsar/internal" - "github.com/apache/pulsar-client-go/pulsar/internal/crypto" - "github.com/stretchr/testify/assert" -) - -func TestSingleMessageIDNoAckTracker(t *testing.T) { - eventsCh := make(chan interface{}, 1) - pc := partitionConsumer{ - queueCh: make(chan []*message, 1), - eventsCh: eventsCh, - compressionProviders: sync.Map{}, - options: &partitionConsumerOpts{}, - metrics: internal.NewMetricsProvider(4, map[string]string{}).GetLeveledMetrics("topic"), - decryptor: crypto.NewNoopDecryptor(), - } - - headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage) - if err := pc.MessageReceived(nil, headersAndPayload); err != nil { - t.Fatal(err) - } - - // ensure the tracker was set on the message id - messages := <-pc.queueCh - for _, m := range messages { - assert.Nil(t, m.ID().(trackingMessageID).tracker) - } - - // ack the message id - pc.AckID(messages[0].msgID.(trackingMessageID)) - - select { - case <-eventsCh: - default: - t.Error("Expected an ack request to be triggered!") - } -} - -func TestBatchMessageIDNoAckTracker(t *testing.T) { - eventsCh := make(chan interface{}, 1) - pc := partitionConsumer{ - queueCh: make(chan []*message, 1), - eventsCh: eventsCh, - compressionProviders: sync.Map{}, - options: &partitionConsumerOpts{}, - metrics: internal.NewMetricsProvider(4, map[string]string{}).GetLeveledMetrics("topic"), - decryptor: crypto.NewNoopDecryptor(), - } - - headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1) - if err := pc.MessageReceived(nil, headersAndPayload); err != nil { - t.Fatal(err) - } - - // ensure the tracker was set on the message id - messages := <-pc.queueCh - for _, m := range messages { - assert.Nil(t, m.ID().(trackingMessageID).tracker) - } - - // ack the message id - pc.AckID(messages[0].msgID.(trackingMessageID)) - - select { - case <-eventsCh: - default: - t.Error("Expected an ack request to be triggered!") - } -} - -func TestBatchMessageIDWithAckTracker(t *testing.T) { - eventsCh := make(chan interface{}, 1) - pc := partitionConsumer{ - queueCh: make(chan []*message, 1), - eventsCh: eventsCh, - compressionProviders: sync.Map{}, - options: &partitionConsumerOpts{}, - metrics: internal.NewMetricsProvider(4, map[string]string{}).GetLeveledMetrics("topic"), - decryptor: crypto.NewNoopDecryptor(), - } - - headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10) - if err := pc.MessageReceived(nil, headersAndPayload); err != nil { - t.Fatal(err) - } - - // ensure the tracker was set on the message id - messages := <-pc.queueCh - for _, m := range messages { - assert.NotNil(t, m.ID().(trackingMessageID).tracker) - } - - // ack all message ids except the last one - for i := 0; i < 9; i++ { - pc.AckID(messages[i].msgID.(trackingMessageID)) - } - - select { - case <-eventsCh: - t.Error("The message id should not be acked!") - default: - } - - // ack last message - pc.AckID(messages[9].msgID.(trackingMessageID)) - - select { - case <-eventsCh: - default: - t.Error("Expected an ack request to be triggered!") - } -} - -// Raw single message in old format -// metadata properties: properties: -// payload = "hello" -var rawCompatSingleMessage = []byte{ - 0x0e, 0x01, 0x08, 0x36, 0xb4, 0x66, 0x00, 0x00, - 0x00, 0x31, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e, - 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x37, - 0x34, 0x2d, 0x30, 0x10, 0x00, 0x18, 0xac, 0xef, - 0xe8, 0xa0, 0xe2, 0x2d, 0x22, 0x06, 0x0a, 0x01, - 0x61, 0x12, 0x01, 0x31, 0x22, 0x06, 0x0a, 0x01, - 0x62, 0x12, 0x01, 0x32, 0x48, 0x05, 0x60, 0x05, - 0x82, 0x01, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f, -} - -// Message with batch of 1 -// singe message metadata properties: properties: -// payload = "hello" -var rawBatchMessage1 = []byte{ - 0x0e, 0x01, 0x1f, 0x80, 0x09, 0x68, 0x00, 0x00, - 0x00, 0x1f, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e, - 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x37, - 0x34, 0x2d, 0x31, 0x10, 0x00, 0x18, 0xdb, 0x80, - 0xf4, 0xa0, 0xe2, 0x2d, 0x58, 0x01, 0x82, 0x01, - 0x00, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, - 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, - 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28, - 0x05, 0x40, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f, -} - -var rawBatchMessage10 = []byte{ - 0x0e, 0x01, 0x7b, 0x28, 0x8c, 0x08, - 0x00, 0x00, 0x00, 0x1f, 0x0a, 0x0f, 0x73, 0x74, - 0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, - 0x2d, 0x37, 0x34, 0x2d, 0x32, 0x10, 0x00, 0x18, - 0xd0, 0xc2, 0xfa, 0xa0, 0xe2, 0x2d, 0x58, 0x0a, - 0x82, 0x01, 0x00, 0x00, 0x00, 0x00, 0x16, 0x0a, - 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, - 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, - 0x05, 0x28, 0x05, 0x40, 0x00, 0x68, 0x65, 0x6c, - 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, - 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, - 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, - 0x28, 0x05, 0x40, 0x01, 0x68, 0x65, 0x6c, 0x6c, - 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, - 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, - 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28, - 0x05, 0x40, 0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f, - 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01, - 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01, - 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28, 0x05, - 0x40, 0x03, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, - 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61, - 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62, - 0x12, 0x01, 0x32, 0x18, 0x05, 0x28, 0x05, 0x40, - 0x04, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, - 0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12, - 0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12, - 0x01, 0x32, 0x18, 0x05, 0x28, 0x05, 0x40, 0x05, - 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, - 0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, - 0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, - 0x32, 0x18, 0x05, 0x28, 0x05, 0x40, 0x06, 0x68, - 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, - 0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, - 0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, - 0x18, 0x05, 0x28, 0x05, 0x40, 0x07, 0x68, 0x65, - 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, - 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, - 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, - 0x05, 0x28, 0x05, 0x40, 0x08, 0x68, 0x65, 0x6c, - 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, - 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, - 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, - 0x28, 0x05, 0x40, 0x09, 0x68, 0x65, 0x6c, 0x6c, - 0x6f, -} From 35d76e9185ad1d7e3f3f8a1e6ea0ac093a24f443 Mon Sep 17 00:00:00 2001 From: xiaolongran Date: Wed, 14 Sep 2022 11:21:30 +0800 Subject: [PATCH 3/4] Refactor ack response func Signed-off-by: xiaolongran --- pulsar/consumer_impl.go | 6 + pulsar/consumer_multitopic.go | 4 + pulsar/consumer_partition.go | 23 +++- pulsar/consumer_partition_test.go | 211 ++++++++++++++++++++++++++++++ pulsar/consumer_regex.go | 4 + pulsar/impl_message.go | 11 ++ 6 files changed, 258 insertions(+), 1 deletion(-) create mode 100644 pulsar/consumer_partition_test.go diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index e88753861b..4754056106 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -35,7 +35,9 @@ import ( const defaultNackRedeliveryDelay = 1 * time.Minute type acker interface { + // AckID does not handle errors returned by the Broker side, so no need to wait for doneCh to finish. AckID(id trackingMessageID) error + AckIDWithResponse(id trackingMessageID) error NackID(id trackingMessageID) NackMsg(msg Message) } @@ -461,6 +463,10 @@ func (c *consumer) AckID(msgID MessageID) error { return mid.Ack() } + if c.options.AckWithResponse { + return c.consumers[mid.partitionIdx].AckIDWithResponse(mid) + } + return c.consumers[mid.partitionIdx].AckID(mid) } diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index 1d75a2477b..380dd75379 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -136,6 +136,10 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) error { return errors.New("unable to ack message because consumer is nil") } + if c.options.AckWithResponse { + return mid.AckWithResponse() + } + return mid.Ack() } diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 3abb26577b..24d75f0468 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -318,7 +318,7 @@ func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error return convertToMessageID(id), nil } -func (pc *partitionConsumer) AckID(msgID trackingMessageID) error { +func (pc *partitionConsumer) AckIDWithResponse(msgID trackingMessageID) error { if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer") return errors.New("consumer state is closed") @@ -341,6 +341,27 @@ func (pc *partitionConsumer) AckID(msgID trackingMessageID) error { return ackReq.err } +func (pc *partitionConsumer) AckID(msgID trackingMessageID) error { + if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { + pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer") + return errors.New("consumer state is closed") + } + + ackReq := new(ackRequest) + if !msgID.Undefined() && msgID.ack() { + pc.metrics.AcksCounter.Inc() + pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9) + ackReq.msgID = msgID + // send ack request to eventsCh + pc.eventsCh <- ackReq + // No need to wait for ackReq.doneCh to finish + + pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) + } + + return ackReq.err +} + func (pc *partitionConsumer) NackID(msgID trackingMessageID) { pc.nackTracker.Add(msgID.messageID) pc.metrics.NacksCounter.Inc() diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go new file mode 100644 index 0000000000..2255e52e8b --- /dev/null +++ b/pulsar/consumer_partition_test.go @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "sync" + "testing" + + "github.com/apache/pulsar-client-go/pulsar/internal" + "github.com/apache/pulsar-client-go/pulsar/internal/crypto" + "github.com/stretchr/testify/assert" +) + +func TestSingleMessageIDNoAckTracker(t *testing.T) { + eventsCh := make(chan interface{}, 1) + pc := partitionConsumer{ + queueCh: make(chan []*message, 1), + eventsCh: eventsCh, + compressionProviders: sync.Map{}, + options: &partitionConsumerOpts{}, + metrics: internal.NewMetricsProvider(4, map[string]string{}).GetLeveledMetrics("topic"), + decryptor: crypto.NewNoopDecryptor(), + } + + headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage) + if err := pc.MessageReceived(nil, headersAndPayload); err != nil { + t.Fatal(err) + } + + // ensure the tracker was set on the message id + messages := <-pc.queueCh + for _, m := range messages { + assert.Nil(t, m.ID().(trackingMessageID).tracker) + } + + // ack the message id + pc.AckID(messages[0].msgID.(trackingMessageID)) + + select { + case <-eventsCh: + default: + t.Error("Expected an ack request to be triggered!") + } +} + +func TestBatchMessageIDNoAckTracker(t *testing.T) { + eventsCh := make(chan interface{}, 1) + pc := partitionConsumer{ + queueCh: make(chan []*message, 1), + eventsCh: eventsCh, + compressionProviders: sync.Map{}, + options: &partitionConsumerOpts{}, + metrics: internal.NewMetricsProvider(4, map[string]string{}).GetLeveledMetrics("topic"), + decryptor: crypto.NewNoopDecryptor(), + } + + headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1) + if err := pc.MessageReceived(nil, headersAndPayload); err != nil { + t.Fatal(err) + } + + // ensure the tracker was set on the message id + messages := <-pc.queueCh + for _, m := range messages { + assert.Nil(t, m.ID().(trackingMessageID).tracker) + } + + // ack the message id + pc.AckID(messages[0].msgID.(trackingMessageID)) + + select { + case <-eventsCh: + default: + t.Error("Expected an ack request to be triggered!") + } +} + +func TestBatchMessageIDWithAckTracker(t *testing.T) { + eventsCh := make(chan interface{}, 1) + pc := partitionConsumer{ + queueCh: make(chan []*message, 1), + eventsCh: eventsCh, + compressionProviders: sync.Map{}, + options: &partitionConsumerOpts{}, + metrics: internal.NewMetricsProvider(4, map[string]string{}).GetLeveledMetrics("topic"), + decryptor: crypto.NewNoopDecryptor(), + } + + headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10) + if err := pc.MessageReceived(nil, headersAndPayload); err != nil { + t.Fatal(err) + } + + // ensure the tracker was set on the message id + messages := <-pc.queueCh + for _, m := range messages { + assert.NotNil(t, m.ID().(trackingMessageID).tracker) + } + + // ack all message ids except the last one + for i := 0; i < 9; i++ { + pc.AckID(messages[i].msgID.(trackingMessageID)) + } + + select { + case <-eventsCh: + t.Error("The message id should not be acked!") + default: + } + + // ack last message + pc.AckID(messages[9].msgID.(trackingMessageID)) + + select { + case <-eventsCh: + default: + t.Error("Expected an ack request to be triggered!") + } +} + +// Raw single message in old format +// metadata properties: properties: +// payload = "hello" +var rawCompatSingleMessage = []byte{ + 0x0e, 0x01, 0x08, 0x36, 0xb4, 0x66, 0x00, 0x00, + 0x00, 0x31, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e, + 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x37, + 0x34, 0x2d, 0x30, 0x10, 0x00, 0x18, 0xac, 0xef, + 0xe8, 0xa0, 0xe2, 0x2d, 0x22, 0x06, 0x0a, 0x01, + 0x61, 0x12, 0x01, 0x31, 0x22, 0x06, 0x0a, 0x01, + 0x62, 0x12, 0x01, 0x32, 0x48, 0x05, 0x60, 0x05, + 0x82, 0x01, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f, +} + +// Message with batch of 1 +// singe message metadata properties: properties: +// payload = "hello" +var rawBatchMessage1 = []byte{ + 0x0e, 0x01, 0x1f, 0x80, 0x09, 0x68, 0x00, 0x00, + 0x00, 0x1f, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e, + 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x37, + 0x34, 0x2d, 0x31, 0x10, 0x00, 0x18, 0xdb, 0x80, + 0xf4, 0xa0, 0xe2, 0x2d, 0x58, 0x01, 0x82, 0x01, + 0x00, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, + 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, + 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28, + 0x05, 0x40, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f, +} + +var rawBatchMessage10 = []byte{ + 0x0e, 0x01, 0x7b, 0x28, 0x8c, 0x08, + 0x00, 0x00, 0x00, 0x1f, 0x0a, 0x0f, 0x73, 0x74, + 0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, + 0x2d, 0x37, 0x34, 0x2d, 0x32, 0x10, 0x00, 0x18, + 0xd0, 0xc2, 0xfa, 0xa0, 0xe2, 0x2d, 0x58, 0x0a, + 0x82, 0x01, 0x00, 0x00, 0x00, 0x00, 0x16, 0x0a, + 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, + 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, + 0x05, 0x28, 0x05, 0x40, 0x00, 0x68, 0x65, 0x6c, + 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, + 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, + 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, + 0x28, 0x05, 0x40, 0x01, 0x68, 0x65, 0x6c, 0x6c, + 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, + 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, + 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28, + 0x05, 0x40, 0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f, + 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01, + 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01, + 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28, 0x05, + 0x40, 0x03, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, + 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61, + 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62, + 0x12, 0x01, 0x32, 0x18, 0x05, 0x28, 0x05, 0x40, + 0x04, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, + 0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12, + 0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12, + 0x01, 0x32, 0x18, 0x05, 0x28, 0x05, 0x40, 0x05, + 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, + 0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, + 0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, + 0x32, 0x18, 0x05, 0x28, 0x05, 0x40, 0x06, 0x68, + 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, + 0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, + 0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, + 0x18, 0x05, 0x28, 0x05, 0x40, 0x07, 0x68, 0x65, + 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, + 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, + 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, + 0x05, 0x28, 0x05, 0x40, 0x08, 0x68, 0x65, 0x6c, + 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, + 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, + 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, + 0x28, 0x05, 0x40, 0x09, 0x68, 0x65, 0x6c, 0x6c, + 0x6f, +} diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index e4d2077ac7..c55a1c1143 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -180,6 +180,10 @@ func (c *regexConsumer) AckID(msgID MessageID) error { return errors.New("consumer is nil in consumer_regex") } + if c.options.AckWithResponse { + return mid.AckWithResponse() + } + return mid.Ack() } diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index 8248b1a9ae..64376db095 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -75,6 +75,17 @@ func (id trackingMessageID) Ack() error { return nil } +func (id trackingMessageID) AckWithResponse() error { + if id.consumer == nil { + return errors.New("consumer is nil in trackingMessageID") + } + if id.ack() { + return id.consumer.AckIDWithResponse(id) + } + + return nil +} + func (id trackingMessageID) Nack() { if id.consumer == nil { return From bae052f2b0ad3e4e90a12187d634e8c1440a7e02 Mon Sep 17 00:00:00 2001 From: xiaolongran Date: Wed, 14 Sep 2022 11:51:33 +0800 Subject: [PATCH 4/4] fix ack error Signed-off-by: xiaolongran --- pulsar/consumer_partition.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index dcd49e1665..cc9e7100fe 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -396,6 +396,7 @@ func (pc *partitionConsumer) AckID(msgID trackingMessageID) error { } ackReq := new(ackRequest) + ackReq.doneCh = make(chan struct{}) if !msgID.Undefined() && msgID.ack() { pc.metrics.AcksCounter.Inc() pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9)