Skip to content

Commit

Permalink
Support sending a list of MessageID instances in an ACK request
Browse files Browse the repository at this point in the history
### Motivation

apache#957 adds the support for
grouping MessageID instances to acknowledge. However, when flushing N
cached MessageID instances, N CommandAck requests will be sent. It
downgrades the performance.

### Modifications

When more than one MessageID instances are acknowledged, use a single
CommandAck request to carry all of them.
  • Loading branch information
BewareMyPower committed Mar 6, 2023
1 parent c595677 commit 9c70b3f
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 18 deletions.
16 changes: 8 additions & 8 deletions pulsar/ack_grouping_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ const (

func newAckGroupingTracker(options *AckGroupingOptions,
ackIndividual func(id MessageID),
ackCumulative func(id MessageID)) ackGroupingTracker {
ackCumulative func(id MessageID),
ackList func(ids []MessageID)) ackGroupingTracker {
if options == nil {
options = &AckGroupingOptions{
MaxSize: 1000,
Expand All @@ -68,12 +69,7 @@ func newAckGroupingTracker(options *AckGroupingOptions,
lastCumulativeAck: EarliestMessageID(),
ackIndividual: ackIndividual,
ackCumulative: ackCumulative,
ackList: func(ids []MessageID) {
// TODO: support ack a list of MessageIDs
for _, id := range ids {
ackIndividual(id)
}
},
ackList: ackList,
}

timeout := time.NewTicker(time.Hour)
Expand Down Expand Up @@ -218,7 +214,11 @@ func (t *cachedAcks) isDuplicate(id MessageID) bool {

func (t *cachedAcks) flushIndividualAcks() {
if t.index > 0 {
t.ackList(t.singleAcks[0:t.index])
if t.index > 1 {
t.ackList(t.singleAcks[0:t.index])
} else {
t.ackIndividual(t.singleAcks[0])
}
for _, id := range t.singleAcks[0:t.index] {
key := messageIDHash(id)
ackSet, found := t.pendingAcks[key]
Expand Down
21 changes: 16 additions & 5 deletions pulsar/ack_grouping_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ import (
"github.com/stretchr/testify/assert"
)

func newAckGroupingTrackerForTest(option *AckGroupingOptions,
ackIndividual func(id MessageID),
ackCumulative func(id MessageID)) ackGroupingTracker {
return newAckGroupingTracker(option, ackIndividual, ackCumulative,
func(ids []MessageID) {
for _, id := range ids {
ackIndividual(id)
}
})
}

func TestNoCacheTracker(t *testing.T) {
tests := []AckGroupingOptions{
{
Expand All @@ -43,7 +54,7 @@ func TestNoCacheTracker(t *testing.T) {
func(t *testing.T) {
ledgerID0 := int64(-1)
ledgerID1 := int64(-1)
tracker := newAckGroupingTracker(&option,
tracker := newAckGroupingTrackerForTest(&option,
func(id MessageID) { ledgerID0 = id.LedgerID() },
func(id MessageID) { ledgerID1 = id.LedgerID() })

Expand Down Expand Up @@ -88,7 +99,7 @@ func (a *mockAcker) reset() {

func TestCachedTracker(t *testing.T) {
var acker mockAcker
tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, MaxTime: 0},
tracker := newAckGroupingTrackerForTest(&AckGroupingOptions{MaxSize: 3, MaxTime: 0},
func(id MessageID) { acker.ack(id) }, func(id MessageID) { acker.ackCumulative(id) })

tracker.add(&messageID{ledgerID: 1})
Expand Down Expand Up @@ -126,7 +137,7 @@ func TestCachedTracker(t *testing.T) {
func TestTimedTrackerIndividualAck(t *testing.T) {
var acker mockAcker
// MaxSize: 1000, MaxTime: 100ms
tracker := newAckGroupingTracker(nil, func(id MessageID) { acker.ack(id) }, nil)
tracker := newAckGroupingTrackerForTest(nil, func(id MessageID) { acker.ack(id) }, nil)

expected := make([]int64, 0)
for i := 0; i < 999; i++ {
Expand Down Expand Up @@ -161,7 +172,7 @@ func TestTimedTrackerIndividualAck(t *testing.T) {
func TestTimedTrackerCumulativeAck(t *testing.T) {
var acker mockAcker
// MaxTime is 100ms
tracker := newAckGroupingTracker(nil, nil, func(id MessageID) { acker.ackCumulative(id) })
tracker := newAckGroupingTrackerForTest(nil, nil, func(id MessageID) { acker.ackCumulative(id) })

// case 1: flush because of the timeout
tracker.addCumulative(&messageID{ledgerID: 1})
Expand All @@ -182,7 +193,7 @@ func TestTimedTrackerCumulativeAck(t *testing.T) {
}

func TestTimedTrackerIsDuplicate(t *testing.T) {
tracker := newAckGroupingTracker(nil, func(id MessageID) {}, func(id MessageID) {})
tracker := newAckGroupingTrackerForTest(nil, func(id MessageID) {}, func(id MessageID) {})

tracker.add(&messageID{batchIdx: 0, batchSize: 3})
tracker.add(&messageID{batchIdx: 2, batchSize: 3})
Expand Down
49 changes: 47 additions & 2 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
pc.unAckChunksTracker = newUnAckChunksTracker(pc)
pc.ackGroupingTracker = newAckGroupingTracker(options.ackGroupingOptions,
func(id MessageID) { pc.sendIndividualAck(id) },
func(id MessageID) { pc.sendCumulativeAck(id) })
func(id MessageID) { pc.sendCumulativeAck(id) },
func(ids []MessageID) { pc.sendIndividualAcks(ids) })
pc.setConsumerState(consumerInit)
pc.log = client.log.SubLogger(log.Fields{
"name": pc.name,
Expand Down Expand Up @@ -500,6 +501,10 @@ func (pc *partitionConsumer) sendIndividualAck(msgID MessageID) *ackRequest {
return ackReq
}

func (pc *partitionConsumer) sendIndividualAcks(msgIDs []MessageID) {
pc.eventsCh <- msgIDs
}

func (pc *partitionConsumer) AckIDWithResponse(msgID MessageID) error {
return pc.ackID(msgID, true)
}
Expand Down Expand Up @@ -769,11 +774,13 @@ func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
}

func (pc *partitionConsumer) internalAck(req *ackRequest) {
defer close(req.doneCh)
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
close(req.doneCh)
return
}

defer close(req.doneCh)
msgID := req.msgID

messageIDs := make([]*pb.MessageIdData, 1)
Expand Down Expand Up @@ -818,6 +825,41 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) {
}
}

func (pc *partitionConsumer) internalAckList(msgIDs []MessageID) error {
n := len(msgIDs)
messageIDs := make([]*pb.MessageIdData, n)
for i := 0; i < n; i++ {
ledgerID := uint64(msgIDs[i].LedgerID())
entryID := uint64(msgIDs[i].EntryID())
messageIDs[i] = &pb.MessageIdData{
LedgerId: proto.Uint64(ledgerID),
EntryId: proto.Uint64(entryID),
}
if pc.options.enableBatchIndexAck {
trackingID, ok := msgIDs[i].(*trackingMessageID)
if ok && trackingID.tracker != nil {
ackSet := trackingID.tracker.toAckSet()
if ackSet != nil {
messageIDs[i].AckSet = ackSet
}
}
}
}

// This method can only be called by pc.ackGroupingTracker, which conflicts with ackWithResponse,
// so we don't need to handle the case when ackWithResponse is true.
cmdAck := &pb.CommandAck{
AckType: pb.CommandAck_Individual.Enum(),
ConsumerId: proto.Uint64(pc.consumerID),
MessageId: messageIDs,
}
err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck)
if err != nil {
pc.log.Error("Connection was closed when request ack cmd")
}
return err
}

func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
pbMsgID := response.GetMessageId()

Expand Down Expand Up @@ -1286,6 +1328,7 @@ const (
type ackRequest struct {
doneCh chan struct{}
msgID trackingMessageID
msgIDs []MessageID
ackType int
err error
}
Expand Down Expand Up @@ -1345,6 +1388,8 @@ func (pc *partitionConsumer) runEventsLoop() {
switch v := i.(type) {
case *ackRequest:
pc.internalAck(v)
case []MessageID:
pc.internalAckList(v)
case *redeliveryRequest:
pc.internalRedeliver(v)
case *unsubscribeRequest:
Expand Down
6 changes: 3 additions & 3 deletions pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
decryptor: crypto.NewNoopDecryptor(),
}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil)
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)

headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
decryptor: crypto.NewNoopDecryptor(),
}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil)
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)

headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
decryptor: crypto.NewNoopDecryptor(),
}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil)
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)

headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
Expand Down

0 comments on commit 9c70b3f

Please sign in to comment.