From f30eb49f4c7af23f1e62ab5a450a04f799a1be54 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 24 Jun 2021 11:33:46 +0800 Subject: [PATCH] fix resend pendingItems race condition --- pulsar/producer_partition.go | 42 ++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b365f83995..19aa06ea02 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -231,10 +231,29 @@ func (p *partitionProducer) grabCnx() error { p.log.WithField("cnx", res.Cnx.ID()).Debug("Connected producer") pendingItems := p.pendingQueue.ReadableSlice() - if len(pendingItems) > 0 { - p.log.Infof("Resending %d pending batches", len(pendingItems)) - for _, pi := range pendingItems { - p.cnx.WriteData(pi.(*pendingItem).batchData) + viewSize := len(pendingItems) + if viewSize > 0 { + p.log.Infof("Resending %d pending batches", viewSize) + lastViewItem := pendingItems[viewSize-1].(*pendingItem) + + // iterate at most pending items + for i := 0; i < viewSize; i++ { + item := p.pendingQueue.Poll() + if item == nil { + continue + } + pi := item.(*pendingItem) + // when resending pending batches, we update the sendAt timestamp and put to the back of queue + // to avoid pending item been removed by failTimeoutMessages and cause race condition + pi.Lock() + pi.sentAt = time.Now() + pi.Unlock() + p.pendingQueue.Put(pi) + p.cnx.WriteData(pi.batchData) + + if pi == lastViewItem { + break + } } } return nil @@ -523,8 +542,7 @@ func (p *partitionProducer) failTimeoutMessages() { } // flag the send has completed with error, flush make no effect - pi.completed = true - buffersPool.Put(pi.batchData) + pi.Complete() pi.Unlock() // finally reached the last view item, current iteration ends @@ -706,9 +724,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) } // Mark this pending item as done - pi.completed = true - // Return buffer to the pool since we're now done using it - buffersPool.Put(pi.batchData) + pi.Complete() } func (p *partitionProducer) internalClose(req *closeProducer) { @@ -800,3 +816,11 @@ type flushRequest struct { waitGroup *sync.WaitGroup err error } + +func (i *pendingItem) Complete() { + if i.completed { + return + } + i.completed = true + buffersPool.Put(i.batchData) +}