Skip to content

Commit

Permalink
Refactor complete message and add unit test for kafka partition ack m…
Browse files Browse the repository at this point in the history
…anager
  • Loading branch information
neil-xie committed Dec 27, 2023
1 parent cc766a3 commit cb3e393
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 7 deletions.
8 changes: 7 additions & 1 deletion common/messaging/kafka/consumerImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,13 @@ func (h *consumerHandlerImpl) completeMessage(message *messageImpl, isAck bool)
tag.KafkaOffset(message.Offset()))
}
}
ackLevel := h.manager.CompleteMessage(message.Partition(), message.Offset(), isAck)
ackLevel, err := h.manager.CompleteMessage(message.Partition(), message.Offset(), isAck)
if err != nil {
h.logger.Error("complete an message that hasn't been added",
tag.KafkaPartition(message.Partition()),
tag.KafkaOffset(message.Offset()))
return
}
h.currentSession.MarkOffset(h.topic, message.Partition(), ackLevel+1, "")
}

Expand Down
10 changes: 4 additions & 6 deletions common/messaging/kafka/partitionAckManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package kafka

import (
"errors"
"sync"

"github.com/uber/cadence/common/log"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (pam *partitionAckManager) AddMessage(partitionID int32, messageID int64) {
}

// CompleteMessage complete the message from ack/nack kafka message
func (pam *partitionAckManager) CompleteMessage(partitionID int32, messageID int64, isAck bool) (ackLevel int64) {
func (pam *partitionAckManager) CompleteMessage(partitionID int32, messageID int64, isAck bool) (ackLevel int64, err error) {
pam.RLock()
defer pam.RUnlock()
if am, ok := pam.ackMgrs[partitionID]; ok {
Expand All @@ -91,10 +92,7 @@ func (pam *partitionAckManager) CompleteMessage(partitionID int32, messageID int
pam.scopes[partitionID].IncCounter(metrics.KafkaConsumerMessageNack)
}
} else {
pam.logger.Fatal("complete an message that hasn't been added",
tag.KafkaPartition(partitionID),
tag.KafkaOffset(messageID))
ackLevel = -1
return -1, errors.New("complete an message that hasn't been added")
}
return ackLevel
return ackLevel, nil
}
110 changes: 110 additions & 0 deletions common/messaging/kafka/partitionAckManager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package kafka

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/uber-go/tally"

"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/metrics"
)

func TestAddMessage(t *testing.T) {
metricsClient := metrics.NewClient(tally.NoopScope, metrics.History)
logger := testlogger.New(t) // Mocked
pam := newPartitionAckManager(metricsClient, logger)
partitionID := int32(1)
messageID := int64(100)
// Test adding a message
pam.AddMessage(partitionID, messageID)

// Verify the message is added to the ack manager
pam.RLock() // Read lock since we are only reading data
_, ok := pam.ackMgrs[partitionID]
pam.RUnlock()

assert.True(t, ok, "AckManager for partition %v was not created", partitionID)
}

func TestCompleteMessage(t *testing.T) {
// Setup
metricsClient := metrics.NewClient(tally.NoopScope, metrics.History)
logger := testlogger.New(t)
pam := newPartitionAckManager(metricsClient, logger)

partitionID := int32(1)
messageID := int64(100)

// Add a message first to simulate a real-world scenario, this will create ackMgr for partition
pam.AddMessage(partitionID, messageID)

testCases := []struct {
name string
partitionID int32
messageID int64
isAck bool
expected int64
hasErr bool
}{
{
name: "Acknowledge the message",
partitionID: partitionID,
messageID: messageID,
isAck: true,
expected: int64(100),
hasErr: false,
},
{
name: "Not acknowledge the message",
partitionID: partitionID,
messageID: messageID,
isAck: false,
expected: int64(100),
hasErr: false,
},
{
name: "Not exist partition",
partitionID: partitionID + 1,
messageID: messageID,
isAck: true,
expected: int64(-1),
hasErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ackLevel, err := pam.CompleteMessage(tc.partitionID, tc.messageID, tc.isAck)
assert.True(t, ackLevel == tc.expected, "Test case %s failed: expected ackLevel %d, got %d", tc.name, tc.expected, ackLevel)

if tc.hasErr {
assert.Error(t, err, "Expected an error but none was found")
} else {
assert.NoError(t, err, "An error was not expected but got %v", err)
}
})
}
}

0 comments on commit cb3e393

Please sign in to comment.