Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support nack backoff policy for SDK #660

Merged
merged 12 commits into from
Nov 8, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ type ConsumerOptions struct {

// Decryption decryption related fields to decrypt the encrypted message
Decryption *MessageDecryptionInfo

// If enabled, the default implementation of NackBackoffPolicy will be used to calculate the delay time of
// nack backoff, Default: false.
EnableDefaultNackBackoffPolicy bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed EnableDefaultNackBackoffPolicy?. If the NackBackoffPolicy is not supplied we can just the default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no EnableDefaultNackBackoffPolicy, it will invade the existing code logic. When the NackBackoffPolicy policy is empty, suppose we use the default NackBackoffPolicy, then when the user uses the Nack(Message) interface, the new implementation will be used.

Copy link
Contributor

@cckellogg cckellogg Nov 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me a cleaner API is to just have NackBackoffPolicy and expose the basic/default policy. If the policy is not set than it uses the current behavior. This way there is only 1 configuration knob to worry about.

// current behavior
ConsumerOpts{}

// custom behavior
ConsumerOpts{
  NackBackoffPolicy: pulsar.NewExpNackBackoffPolicy(),
}


// NackBackoffPolicy is a redelivery backoff mechanism which we can achieve redelivery with different
// delays according to the number of times the message is retried.
//
// > Notice: the NackBackoffPolicy will not work with `consumer.NackID(MessageID)`
// > because we are not able to get the redeliveryCount from the message ID.
NackBackoffPolicy NackBackoffPolicy
}

// Consumer is an interface that abstracts behavior of Pulsar's consumer
Expand Down
20 changes: 20 additions & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const defaultNackRedeliveryDelay = 1 * time.Minute
type acker interface {
AckID(id trackingMessageID)
NackID(id trackingMessageID)
NackMsg(msg Message)
}

type consumer struct {
Expand Down Expand Up @@ -87,6 +88,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
}
}

if options.NackBackoffPolicy == nil && options.EnableDefaultNackBackoffPolicy {
options.NackBackoffPolicy = new(defaultNackBackoffPolicy)
}

// did the user pass in a message channel?
messageCh := options.MessageChannel
if options.MessageChannel == nil {
Expand Down Expand Up @@ -326,6 +331,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
partitionIdx: idx,
receiverQueueSize: receiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
nackBackoffPolicy: c.options.NackBackoffPolicy,
metadata: metadata,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: trackingMessageID{},
Expand Down Expand Up @@ -489,6 +495,20 @@ func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
}

func (c *consumer) Nack(msg Message) {
if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil {
mid, ok := c.messageID(msg.ID())
if !ok {
return
}

if mid.consumer != nil {
mid.Nack()
return
}
c.consumers[mid.partitionIdx].NackMsg(msg)
return
}

c.NackID(msg.ID())
}

Expand Down
16 changes: 16 additions & 0 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,22 @@ func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) {
}

func (c *multiTopicConsumer) Nack(msg Message) {
if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil {
msgID := msg.ID()
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
}

if mid.consumer == nil {
c.log.Warnf("unable to nack messageID=%+v can not determine topic", msgID)
return
}
mid.NackByMsg(msg)
return
}

c.NackID(msg.ID())
}

Expand Down
8 changes: 7 additions & 1 deletion pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type partitionConsumerOpts struct {
partitionIdx int
receiverQueueSize int
nackRedeliveryDelay time.Duration
nackBackoffPolicy NackBackoffPolicy
metadata map[string]string
replicateSubscriptionState bool
startMessageID trackingMessageID
Expand Down Expand Up @@ -201,7 +202,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon

pc.decryptor = decryptor

pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, pc.log)
pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log)

err := pc.grabConn()
if err != nil {
Expand Down Expand Up @@ -331,6 +332,11 @@ func (pc *partitionConsumer) NackID(msgID trackingMessageID) {
pc.metrics.NacksCounter.Inc()
}

func (pc *partitionConsumer) NackMsg(msg Message) {
pc.nackTracker.AddMessage(msg)
pc.metrics.NacksCounter.Inc()
}

func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
pc.eventsCh <- &redeliveryRequest{msgIds}

Expand Down
16 changes: 16 additions & 0 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,22 @@ func (c *regexConsumer) AckID(msgID MessageID) {
}

func (c *regexConsumer) Nack(msg Message) {
if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil {
msgID := msg.ID()
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
}

if mid.consumer == nil {
c.log.Warnf("unable to nack messageID=%+v can not determine topic", msgID)
return
}
mid.NackByMsg(msg)
return
}

c.NackID(msg.ID())
}

Expand Down
7 changes: 7 additions & 0 deletions pulsar/impl_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ func (id trackingMessageID) Nack() {
id.consumer.NackID(id)
}

func (id trackingMessageID) NackByMsg(msg Message) {
if id.consumer == nil {
return
}
id.consumer.NackMsg(msg)
}

func (id trackingMessageID) ack() bool {
if id.tracker != nil && id.batchIdx > -1 {
return id.tracker.ack(int(id.batchIdx))
Expand Down
81 changes: 66 additions & 15 deletions pulsar/negative_acks_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,41 @@ type negativeAcksTracker struct {
doneOnce sync.Once
negativeAcks map[messageID]time.Time
rc redeliveryConsumer
tick *time.Ticker
nackBackoff NackBackoffPolicy
trackFlag bool
delay time.Duration
log log.Logger
}

func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
t := &negativeAcksTracker{
doneCh: make(chan interface{}),
negativeAcks: make(map[messageID]time.Time),
rc: rc,
tick: time.NewTicker(delay / 3),
delay: delay,
log: logger,
}
func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {

t := new(negativeAcksTracker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can just be var t *negativeAcksTracker

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are the same effect


// When using NackBackoffPolicy, the delay time needs to be calculated based on the RedeliveryCount field in
// the CommandMessage, so for the original default Nack() logic, we still keep the negativeAcksTracker created
// when we open a gorutine to execute the logic of `t.track()`. But for the NackBackoffPolicy method, we need
// to execute the logic of `t.track()` when AddMessage().
if nackBackoffPolicy != nil {
Copy link
Contributor

@cckellogg cckellogg Nov 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused on why we need an if statement. Shouldn't the default Implementation of the NackBackoffPolicy be what the current behavior is? The benefit of the interface is to simply the code and delegate to the implementation.

bp := nackBackoffPolicy
if bp == nil {
  bp = newDefaultBackoffPolicy(delay)
}
t = &negativeAcksTracker{
			doneCh:       make(chan interface{}),
			negativeAcks: make(map[messageID]time.Time),
			nackBackoff:  bp,
			rc:           rc,
			log:          logger,
		}

Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agree with your point of view. The problem here is because, for nackbackoff, we can't directly get the corresponding nackDelayTime, we need to get the redeliveryCount through the CommandMessage and then calculate the nackDelayTime, then we can determine the time.NewTicker based on the nackDelayTime. It is precisely because of such a relationship that the if statement is added

t = &negativeAcksTracker{
doneCh: make(chan interface{}),
negativeAcks: make(map[messageID]time.Time),
nackBackoff: nackBackoffPolicy,
rc: rc,
log: logger,
}
} else {
t = &negativeAcksTracker{
doneCh: make(chan interface{}),
negativeAcks: make(map[messageID]time.Time),
rc: rc,
nackBackoff: nil,
delay: delay,
log: logger,
}

go t.track()
go t.track(time.NewTicker(t.delay / 3))
}
return t
}

Expand All @@ -76,14 +95,48 @@ func (t *negativeAcksTracker) Add(msgID messageID) {
t.negativeAcks[batchMsgID] = targetTime
}

func (t *negativeAcksTracker) track() {
func (t *negativeAcksTracker) AddMessage(msg Message) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there a new method here?

Also, it looks like state is changing here without a lock. If multiple go routines call this at once multiple tracking routines could be started right?

Can the tracking go routine just be started at creation time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we need to get redeliveryCount through the Message interface

nackBackoffDelay := t.nackBackoff.Next(msg.RedeliveryCount())
t.delay = time.Duration(nackBackoffDelay)

// Use trackFlag to avoid opening a new gorutine to execute `t.track()` every AddMessage.
// In fact, we only need to execute it once.
if !t.trackFlag {
go t.track(time.NewTicker(t.delay / 3))
t.trackFlag = true
}

msgID := msg.ID()

// Always clear up the batch index since we want to track the nack
// for the entire batch
batchMsgID := messageID{
ledgerID: msgID.LedgerID(),
entryID: msgID.EntryID(),
batchIdx: 0,
}

t.Lock()
defer t.Unlock()

_, present := t.negativeAcks[batchMsgID]
if present {
// The batch is already being tracked
return
}

targetTime := time.Now().Add(time.Duration(nackBackoffDelay))
t.negativeAcks[batchMsgID] = targetTime
}

func (t *negativeAcksTracker) track(ticker *time.Ticker) {
for {
select {
case <-t.doneCh:
t.log.Debug("Closing nack tracker")
return

case <-t.tick.C:
case <-ticker.C:
{
now := time.Now()
msgIds := make([]messageID, 0)
Expand All @@ -105,15 +158,13 @@ func (t *negativeAcksTracker) track() {
t.rc.Redeliver(msgIds)
}
}

}
}
}

func (t *negativeAcksTracker) Close() {
// allow Close() to be invoked multiple times by consumer_partition to avoid panic
t.doneOnce.Do(func() {
t.tick.Stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the ticker getting cleanup now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current implementation situation, if we use the t.ticker in the struct, there will be a data race, so now we use the temporary variables of the ticker, and there is no good way to see how to close the temporarily created ticker.

t.doneCh <- nil
})
}
Loading