diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 41a1b9b9a7..02ee7cf070 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -397,7 +397,11 @@ func (p *partitionProducer) internalSend(request *sendRequest) { msg.ReplicationClusters, deliverAt) if !added { // The current batch is full.. flush it and retry - p.internalFlushCurrentBatch() + if p.batchBuilder.IsMultiBatches() { + p.internalFlushCurrentBatches() + } else { + p.internalFlushCurrentBatch() + } // after flushing try again to add the current payload if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request,