From cb3e393b2483b01bbd71ae8bbe7debd258b55c48 Mon Sep 17 00:00:00 2001 From: Neil Xie Date: Tue, 26 Dec 2023 17:06:43 -0800 Subject: [PATCH] Refactor complete message and add unit test for kafka partition ack manager --- common/messaging/kafka/consumerImpl.go | 8 +- common/messaging/kafka/partitionAckManager.go | 10 +- .../kafka/partitionAckManager_test.go | 110 ++++++++++++++++++ 3 files changed, 121 insertions(+), 7 deletions(-) create mode 100644 common/messaging/kafka/partitionAckManager_test.go diff --git a/common/messaging/kafka/consumerImpl.go b/common/messaging/kafka/consumerImpl.go index 56140bdb3fe..26b3d08774b 100644 --- a/common/messaging/kafka/consumerImpl.go +++ b/common/messaging/kafka/consumerImpl.go @@ -210,7 +210,13 @@ func (h *consumerHandlerImpl) completeMessage(message *messageImpl, isAck bool) tag.KafkaOffset(message.Offset())) } } - ackLevel := h.manager.CompleteMessage(message.Partition(), message.Offset(), isAck) + ackLevel, err := h.manager.CompleteMessage(message.Partition(), message.Offset(), isAck) + if err != nil { + h.logger.Error("complete an message that hasn't been added", + tag.KafkaPartition(message.Partition()), + tag.KafkaOffset(message.Offset())) + return + } h.currentSession.MarkOffset(h.topic, message.Partition(), ackLevel+1, "") } diff --git a/common/messaging/kafka/partitionAckManager.go b/common/messaging/kafka/partitionAckManager.go index 2bbc95711e5..1a350667104 100644 --- a/common/messaging/kafka/partitionAckManager.go +++ b/common/messaging/kafka/partitionAckManager.go @@ -21,6 +21,7 @@ package kafka import ( + "errors" "sync" "github.com/uber/cadence/common/log" @@ -80,7 +81,7 @@ func (pam *partitionAckManager) AddMessage(partitionID int32, messageID int64) { } // CompleteMessage complete the message from ack/nack kafka message -func (pam *partitionAckManager) CompleteMessage(partitionID int32, messageID int64, isAck bool) (ackLevel int64) { +func (pam *partitionAckManager) CompleteMessage(partitionID int32, messageID int64, isAck bool) (ackLevel int64, err error) { pam.RLock() defer pam.RUnlock() if am, ok := pam.ackMgrs[partitionID]; ok { @@ -91,10 +92,7 @@ func (pam *partitionAckManager) CompleteMessage(partitionID int32, messageID int pam.scopes[partitionID].IncCounter(metrics.KafkaConsumerMessageNack) } } else { - pam.logger.Fatal("complete an message that hasn't been added", - tag.KafkaPartition(partitionID), - tag.KafkaOffset(messageID)) - ackLevel = -1 + return -1, errors.New("complete an message that hasn't been added") } - return ackLevel + return ackLevel, nil } diff --git a/common/messaging/kafka/partitionAckManager_test.go b/common/messaging/kafka/partitionAckManager_test.go new file mode 100644 index 00000000000..487561dbe34 --- /dev/null +++ b/common/messaging/kafka/partitionAckManager_test.go @@ -0,0 +1,110 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package kafka + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/uber-go/tally" + + "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/metrics" +) + +func TestAddMessage(t *testing.T) { + metricsClient := metrics.NewClient(tally.NoopScope, metrics.History) + logger := testlogger.New(t) // Mocked + pam := newPartitionAckManager(metricsClient, logger) + partitionID := int32(1) + messageID := int64(100) + // Test adding a message + pam.AddMessage(partitionID, messageID) + + // Verify the message is added to the ack manager + pam.RLock() // Read lock since we are only reading data + _, ok := pam.ackMgrs[partitionID] + pam.RUnlock() + + assert.True(t, ok, "AckManager for partition %v was not created", partitionID) +} + +func TestCompleteMessage(t *testing.T) { + // Setup + metricsClient := metrics.NewClient(tally.NoopScope, metrics.History) + logger := testlogger.New(t) + pam := newPartitionAckManager(metricsClient, logger) + + partitionID := int32(1) + messageID := int64(100) + + // Add a message first to simulate a real-world scenario, this will create ackMgr for partition + pam.AddMessage(partitionID, messageID) + + testCases := []struct { + name string + partitionID int32 + messageID int64 + isAck bool + expected int64 + hasErr bool + }{ + { + name: "Acknowledge the message", + partitionID: partitionID, + messageID: messageID, + isAck: true, + expected: int64(100), + hasErr: false, + }, + { + name: "Not acknowledge the message", + partitionID: partitionID, + messageID: messageID, + isAck: false, + expected: int64(100), + hasErr: false, + }, + { + name: "Not exist partition", + partitionID: partitionID + 1, + messageID: messageID, + isAck: true, + expected: int64(-1), + hasErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ackLevel, err := pam.CompleteMessage(tc.partitionID, tc.messageID, tc.isAck) + assert.True(t, ackLevel == tc.expected, "Test case %s failed: expected ackLevel %d, got %d", tc.name, tc.expected, ackLevel) + + if tc.hasErr { + assert.Error(t, err, "Expected an error but none was found") + } else { + assert.NoError(t, err, "An error was not expected but got %v", err) + } + }) + } +}