Skip to content

Commit

Permalink
[feature] Support batch index ACK
Browse files Browse the repository at this point in the history
Fixes #894

### Modifications

Add an `EnableBatchIndexAcknowledgment` to specify whether batch index
ACK is enabled. Since this feature requires the conversion between a bit
set and its underlying long array, which is similar to Java's `BitSet`,
this commit introduces github.com/bits-and-blooms/bitset dependency to
replace the `big.Int` based implementation of the bit set.

Add a `BatchSize()` method to `MessageId` to indicate the size of the
`ack_set` field. When the batch index ACK happens, convert the
`[]uint64` to the `[]int64` as the `ack_set` field in `CommandAck`. When
receiving messages, convert the `ack_set` field in `CommandMessage` to
filter the acknowledged single messages.

Remove the duplicated code in `AckID` and `AckIDWithResponse`.

### Verifications

`TestBatchIndexAck` is added to cover the case whether `AckWithResponse`
is enabled and both individual and cumulative ACK.
  • Loading branch information
BewareMyPower committed Jan 3, 2023
1 parent 1f3747e commit 2dfdabd
Show file tree
Hide file tree
Showing 12 changed files with 261 additions and 66 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/99designs/keyring v1.2.1
github.com/AthenZ/athenz v1.10.39
github.com/DataDog/zstd v1.5.0
github.com/bits-and-blooms/bitset v1.4.0
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
github.com/davecgh/go-spew v1.1.1
github.com/golang-jwt/jwt v3.2.1+incompatible
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bits-and-blooms/bitset v1.4.0 h1:+YZ8ePm+He2pU3dZlIZiOeAKfrBkXi1lSrXJ/Xzgbu8=
github.com/bits-and-blooms/bitset v1.4.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
Expand Down
2 changes: 2 additions & 0 deletions integration-tests/conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,5 @@ globalZookeeperServers=

# Deprecated. Use brokerDeleteInactiveTopicsFrequencySeconds
brokerServicePurgeInactiveFrequencyInSeconds=60

acknowledgmentAtBatchIndexLevelEnabled=true
4 changes: 4 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ type ConsumerOptions struct {
// AutoAckIncompleteChunk sets whether consumer auto acknowledges incomplete chunked message when it should
// be removed (e.g.the chunked message pending queue is full). (default: false)
AutoAckIncompleteChunk bool

// Enable or disable batch index acknowledgment. To enable this feature, ensure batch index acknowledgment
// is enabled on the broker side. (default: false)
EnableBatchIndexAcknowledgment bool
}

// Consumer is an interface that abstracts behavior of Pulsar's consumer
Expand Down
1 change: 1 addition & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
expireTimeOfIncompleteChunk: c.options.ExpireTimeOfIncompleteChunk,
autoAckIncompleteChunk: c.options.AutoAckIncompleteChunk,
consumerEventListener: c.options.EventListener,
enableBatchIndexAck: c.options.EnableBatchIndexAcknowledgment,
}
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
Expand Down
80 changes: 44 additions & 36 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
cryptointernal "github.com/apache/pulsar-client-go/pulsar/internal/crypto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
"github.com/bits-and-blooms/bitset"

uAtomic "go.uber.org/atomic"
)
Expand Down Expand Up @@ -114,6 +115,7 @@ type partitionConsumerOpts struct {
autoAckIncompleteChunk bool
// in failover mode, this callback will be called when consumer change
consumerEventListener ConsumerEventListener
enableBatchIndexAck bool
}

type ConsumerEventListener interface {
Expand Down Expand Up @@ -434,7 +436,7 @@ func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error
return convertToMessageID(id), nil
}

func (pc *partitionConsumer) AckIDWithResponse(msgID MessageID) error {
func (pc *partitionConsumer) ackID(msgID MessageID, withResponse bool) error {
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
return errors.New("consumer state is closed")
Expand All @@ -458,47 +460,31 @@ func (pc *partitionConsumer) AckIDWithResponse(msgID MessageID) error {
ackReq.msgID = trackingID
// send ack request to eventsCh
pc.eventsCh <- ackReq
// wait for the request to complete
<-ackReq.doneCh

pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
}

return ackReq.err
}

func (pc *partitionConsumer) AckID(msgID MessageID) error {
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
return errors.New("consumer state is closed")
}

if cmid, ok := toChunkedMessageID(msgID); ok {
return pc.unAckChunksTracker.ack(cmid)
}

trackingID, ok := toTrackingMessageID(msgID)
if !ok {
return errors.New("failed to convert trackingMessageID")
}
if withResponse {
<-ackReq.doneCh
}

ackReq := new(ackRequest)
ackReq.doneCh = make(chan struct{})
ackReq.ackType = individualAck
if !trackingID.Undefined() && trackingID.ack() {
pc.metrics.AcksCounter.Inc()
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9)
pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
} else if pc.options.enableBatchIndexAck {
ackReq.msgID = trackingID
// send ack request to eventsCh
pc.eventsCh <- ackReq
// No need to wait for ackReq.doneCh to finish

pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
}

if withResponse {
return ackReq.err
}
return nil
}

func (pc *partitionConsumer) AckIDWithResponse(msgID MessageID) error {
return pc.ackID(msgID, true)
}

func (pc *partitionConsumer) AckID(msgID MessageID) error {
return pc.ackID(msgID, false)
}

func (pc *partitionConsumer) AckIDCumulative(msgID MessageID) error {
return pc.internalAckIDCumulative(msgID, false)
}
Expand All @@ -525,7 +511,7 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withRespon
ackReq := new(ackRequest)
ackReq.doneCh = make(chan struct{})
ackReq.ackType = cumulativeAck
if trackingID.ackCumulative() {
if trackingID.ackCumulative() || pc.options.enableBatchIndexAck {
ackReq.msgID = trackingID
} else if !trackingID.tracker.hasPrevBatchAcked() {
// get previous batch message id
Expand Down Expand Up @@ -764,6 +750,12 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) {
LedgerId: proto.Uint64(uint64(msgID.ledgerID)),
EntryId: proto.Uint64(uint64(msgID.entryID)),
}
if pc.options.enableBatchIndexAck && msgID.tracker != nil {
ackSet := msgID.tracker.toAckSet()
if ackSet != nil {
messageIDs[0].AckSet = ackSet
}
}

reqID := pc.client.rpcClient.NewRequestID()
cmdAck := &pb.CommandAck{
Expand Down Expand Up @@ -822,7 +814,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
switch crypToFailureAction {
case crypto.ConsumerCryptoFailureActionFail:
pc.log.Errorf("consuming message failed due to decryption err :%v", err)
pc.NackID(newTrackingMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), 0, 0, nil))
pc.NackID(newTrackingMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), 0, 0, 0, nil))
return err
case crypto.ConsumerCryptoFailureActionDiscard:
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecryptionError)
Expand All @@ -842,6 +834,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
int64(pbMsgID.GetEntryId()),
pbMsgID.GetBatchIndex(),
pc.partitionIdx,
pbMsgID.GetBatchSize(),
),
payLoad: headersAndPayload.ReadableSlice(),
schema: pc.options.schema,
Expand Down Expand Up @@ -889,7 +882,17 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
var ackTracker *ackTracker
// are there multiple messages in this batch?
if numMsgs > 1 {
ackTracker = newAckTracker(numMsgs)
ackTracker = newAckTracker(uint(numMsgs))
}

var ackSet *bitset.BitSet
if response.GetAckSet() != nil {
ackSetFromResponse := response.GetAckSet()
buf := make([]uint64, len(ackSetFromResponse))
for i := 0; i < len(buf); i++ {
buf[i] = uint64(ackSetFromResponse[i])
}
ackSet = bitset.From(buf)
}

pc.metrics.MessagesReceived.Add(float64(numMsgs))
Expand All @@ -901,6 +904,10 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_BatchDeSerializeError)
return err
}
if ackSet != nil && !ackSet.Test(uint(i)) {
pc.log.Debugf("Ignoring message from %vth message, which has been acknowledged", i)
continue
}

pc.metrics.BytesReceived.Add(float64(len(payload)))
pc.metrics.PrefetchedBytes.Add(float64(len(payload)))
Expand All @@ -910,6 +917,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
int64(pbMsgID.GetEntryId()),
int32(i),
pc.partitionIdx,
int32(numMsgs),
ackTracker)
// set the consumer so we know how to ack the message id
trackingMsgID.consumer = pc
Expand Down
137 changes: 136 additions & 1 deletion pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3841,7 +3841,142 @@ func TestAckWithMessageID(t *testing.T) {
assert.Nil(t, err)

id := message.ID()
newID := NewMessageID(id.LedgerID(), id.EntryID(), id.BatchIdx(), id.PartitionIdx())
newID := NewMessageID(id.LedgerID(), id.EntryID(), id.BatchIdx(), id.PartitionIdx(), 0)
err = consumer.AckID(newID)
assert.Nil(t, err)
}

func TestBatchIndexAck(t *testing.T) {
tests := []struct {
AckWithResponse bool
Cumulative bool
}{
{
AckWithResponse: true,
Cumulative: true,
},
{
AckWithResponse: true,
Cumulative: false,
},
{
AckWithResponse: false,
Cumulative: true,
},
{
AckWithResponse: false,
Cumulative: false,
},
}
for _, params := range tests {
t.Run(fmt.Sprintf("TestBatchIndexAck_WithResponse_%v_Cumulative_%v",
params.AckWithResponse, params.Cumulative),
func(t *testing.T) {
runBatchIndexAckTest(t, params.AckWithResponse, params.Cumulative)
})
}
}

func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)

topic := newTopicName()
createConsumer := func() Consumer {
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
AckWithResponse: ackWithResponse,
EnableBatchIndexAcknowledgment: true,
})
assert.Nil(t, err)
return consumer
}

consumer := createConsumer()

duration, err := time.ParseDuration("1h")
assert.Nil(t, err)

const BatchingMaxSize int = 2 * 5
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
BatchingMaxMessages: uint(BatchingMaxSize),
BatchingMaxSize: uint(1024 * 1024 * 10),
BatchingMaxPublishDelay: duration,
})
assert.Nil(t, err)
for i := 0; i < BatchingMaxSize; i++ {
producer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-%d", i)),
}, func(id MessageID, producerMessage *ProducerMessage, err error) {
assert.Nil(t, err)
log.Printf("Sent to %v:%d:%d", id, id.BatchIdx(), id.BatchSize())
})
}
assert.Nil(t, producer.Flush())

msgIds := make([]MessageID, BatchingMaxSize)
for i := 0; i < BatchingMaxSize; i++ {
message, err := consumer.Receive(context.Background())
assert.Nil(t, err)
msgIds[i] = message.ID()
log.Printf("Received %v from %v:%d:%d", string(message.Payload()), message.ID(),
message.ID().BatchIdx(), message.ID().BatchSize())
}

// Acknowledge half of the messages
if cumulative {
msgID := msgIds[BatchingMaxSize/2-1]
consumer.AckIDCumulative(msgID)
log.Printf("Acknowledge %v:%d cumulatively\n", msgID, msgID.BatchIdx())
} else {
for i := 0; i < BatchingMaxSize; i++ {
msgID := msgIds[i]
if i%2 == 0 {
consumer.AckID(msgID)
log.Printf("Acknowledge %v:%d\n", msgID, msgID.BatchIdx())
}
}
}
consumer.Close()
consumer = createConsumer()

for i := 0; i < BatchingMaxSize/2; i++ {
message, err := consumer.Receive(context.Background())
assert.Nil(t, err)
log.Printf("Received %v from %v:%d:%d", string(message.Payload()), message.ID(),
message.ID().BatchIdx(), message.ID().BatchSize())
index := i*2 + 1
if cumulative {
index = i + BatchingMaxSize/2
}
assert.Equal(t, []byte(fmt.Sprintf("msg-%d", index)), message.Payload())
assert.Equal(t, msgIds[index].BatchIdx(), message.ID().BatchIdx())
// We should not acknowledge message.ID() here because message.ID() shares a different
// tracker with msgIds
if !cumulative {
msgID := msgIds[index]
consumer.AckID(msgID)
log.Printf("Acknowledge %v:%d\n", msgID, msgID.BatchIdx())
}
}
if cumulative {
msgID := msgIds[BatchingMaxSize-1]
consumer.AckIDCumulative(msgID)
log.Printf("Acknowledge %v:%d cumulatively\n", msgID, msgID.BatchIdx())
}
consumer.Close()
consumer = createConsumer()
_, err = producer.Send(context.Background(), &ProducerMessage{Payload: []byte("end-marker")})
assert.Nil(t, err)
msg, err := consumer.Receive(context.Background())
assert.Nil(t, err)
assert.Equal(t, "end-marker", string(msg.Payload()))

client.Close()
}
Loading

0 comments on commit 2dfdabd

Please sign in to comment.