Skip to content

Commit

Permalink
Calling internalFlushCurrentBatch will be forwaded to internalFlushCu…
Browse files Browse the repository at this point in the history
…rrentBatches if the associated batch builder contains multi batches(KeyBasedBatchBuilder). (#750)

Doing transparent forwarding could centralized the branching logic at only one place.

Co-authored-by: Karma Shi <[email protected]>
  • Loading branch information
shileiyu and Karma Shi authored May 6, 2022
1 parent 5ee6330 commit ee379ac
Showing 1 changed file with 13 additions and 20 deletions.
33 changes: 13 additions & 20 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,11 +386,7 @@ func (p *partitionProducer) runEventsLoop() {
return
}
case <-p.batchFlushTicker.C:
if p.batchBuilder.IsMultiBatches() {
p.internalFlushCurrentBatches()
} else {
p.internalFlushCurrentBatch()
}
p.internalFlushCurrentBatch()
}
}
}
Expand Down Expand Up @@ -485,11 +481,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
msg.ReplicationClusters, deliverAt)
if !added {
// The current batch is full.. flush it and retry
if p.batchBuilder.IsMultiBatches() {
p.internalFlushCurrentBatches()
} else {
p.internalFlushCurrentBatch()
}

p.internalFlushCurrentBatch()

// after flushing try again to add the current payload
if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request,
Expand All @@ -504,11 +497,9 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
}

if !sendAsBatch || request.flushImmediately {
if p.batchBuilder.IsMultiBatches() {
p.internalFlushCurrentBatches()
} else {
p.internalFlushCurrentBatch()
}

p.internalFlushCurrentBatch()

}
}

Expand All @@ -522,6 +513,11 @@ type pendingItem struct {
}

func (p *partitionProducer) internalFlushCurrentBatch() {
if p.batchBuilder.IsMultiBatches() {
p.internalFlushCurrentBatches()
return
}

batchData, sequenceID, callbacks, err := p.batchBuilder.Flush()
if batchData == nil {
return
Expand Down Expand Up @@ -683,11 +679,8 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
}

func (p *partitionProducer) internalFlush(fr *flushRequest) {
if p.batchBuilder.IsMultiBatches() {
p.internalFlushCurrentBatches()
} else {
p.internalFlushCurrentBatch()
}

p.internalFlushCurrentBatch()

pi, ok := p.pendingQueue.PeekLast().(*pendingItem)
if !ok {
Expand Down

0 comments on commit ee379ac

Please sign in to comment.