diff --git a/async_producer.go b/async_producer.go index d0ce01b66..303ab45d4 100644 --- a/async_producer.go +++ b/async_producer.go @@ -535,6 +535,7 @@ func (pp *partitionProducer) dispatch() { } }() + leaderRetries := 0 for msg := range pp.input { if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil { select { @@ -579,10 +580,14 @@ func (pp *partitionProducer) dispatch() { if pp.brokerProducer == nil { if err := pp.updateLeader(); err != nil { + // Report the error on this message and wait for the backoff, then move on to the next one. pp.parent.returnError(msg, err) - pp.backoff(msg.retries) + pp.backoff(leaderRetries) + leaderRetries++ continue } + // On success, reset the retry count to 0. + leaderRetries = 0 Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID()) }