diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 43ae68fdef..87d1d691b5 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -386,11 +386,7 @@ func (p *partitionProducer) runEventsLoop() { return } case <-p.batchFlushTicker.C: - if p.batchBuilder.IsMultiBatches() { - p.internalFlushCurrentBatches() - } else { - p.internalFlushCurrentBatch() - } + p.internalFlushCurrentBatch() } } } @@ -485,11 +481,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) { msg.ReplicationClusters, deliverAt) if !added { // The current batch is full.. flush it and retry - if p.batchBuilder.IsMultiBatches() { - p.internalFlushCurrentBatches() - } else { - p.internalFlushCurrentBatch() - } + + p.internalFlushCurrentBatch() // after flushing try again to add the current payload if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request, @@ -504,11 +497,9 @@ func (p *partitionProducer) internalSend(request *sendRequest) { } if !sendAsBatch || request.flushImmediately { - if p.batchBuilder.IsMultiBatches() { - p.internalFlushCurrentBatches() - } else { - p.internalFlushCurrentBatch() - } + + p.internalFlushCurrentBatch() + } } @@ -522,6 +513,11 @@ type pendingItem struct { } func (p *partitionProducer) internalFlushCurrentBatch() { + if p.batchBuilder.IsMultiBatches() { + p.internalFlushCurrentBatches() + return + } + batchData, sequenceID, callbacks, err := p.batchBuilder.Flush() if batchData == nil { return @@ -683,11 +679,8 @@ func (p *partitionProducer) internalFlushCurrentBatches() { } func (p *partitionProducer) internalFlush(fr *flushRequest) { - if p.batchBuilder.IsMultiBatches() { - p.internalFlushCurrentBatches() - } else { - p.internalFlushCurrentBatch() - } + + p.internalFlushCurrentBatch() pi, ok := p.pendingQueue.PeekLast().(*pendingItem) if !ok {