From c5dfc292e2d5bcf1d0dd81baab88c7234d81811e Mon Sep 17 00:00:00 2001 From: jonyhy96 Date: Thu, 25 Feb 2021 16:45:44 +0800 Subject: [PATCH] fix: batch flush method Signed-off-by: jonyhy96 --- pulsar/producer_partition.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 41a1b9b9a7..02ee7cf070 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -397,7 +397,11 @@ func (p *partitionProducer) internalSend(request *sendRequest) { msg.ReplicationClusters, deliverAt) if !added { // The current batch is full.. flush it and retry - p.internalFlushCurrentBatch() + if p.batchBuilder.IsMultiBatches() { + p.internalFlushCurrentBatches() + } else { + p.internalFlushCurrentBatch() + } // after flushing try again to add the current payload if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request,