Skip to content

Commit

Permalink
pulsar/producer_impl: rewrite partition discovery goroutine so that i…
Browse files Browse the repository at this point in the history
…t actually gets closed before close active producers
  • Loading branch information
PowerStateFailure committed Feb 28, 2021
1 parent a46c494 commit 2032bfd
Showing 1 changed file with 25 additions and 14 deletions.
39 changes: 25 additions & 14 deletions pulsar/producer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ type producer struct {
producersPtr unsafe.Pointer
numPartitions uint32
messageRouter func(*ProducerMessage, TopicMetadata) int
ticker *time.Ticker
tickerStop chan struct{}
stopDiscovery func()
log log.Logger
metrics *internal.TopicMetrics
}
Expand Down Expand Up @@ -125,24 +124,36 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
return nil, err
}

ticker := time.NewTicker(partitionsAutoDiscoveryInterval)
p.ticker = ticker
p.tickerStop = make(chan struct{})
p.stopDiscovery = p.runBackgroundPartitionDiscovery(partitionsAutoDiscoveryInterval)

p.metrics.ProducersOpened.Inc()
return p, nil
}

func (p *producer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) {
var wg sync.WaitGroup
stopDiscoveryCh := make(chan struct{})
ticker := time.NewTicker(period)

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stopDiscoveryCh:
return
case <-ticker.C:
p.log.Debug("Auto discovering new partitions")
p.internalCreatePartitionsProducers()
case <-p.tickerStop:
return
}
}
}()

p.metrics.ProducersOpened.Inc()
return p, nil
return func() {
ticker.Stop()
close(stopDiscoveryCh)
wg.Wait()
}
}

func (p *producer) internalCreatePartitionsProducers() error {
Expand Down Expand Up @@ -292,13 +303,13 @@ func (p *producer) Flush() error {
}

func (p *producer) Close() {
if p.stopDiscovery != nil {
p.stopDiscovery()
p.stopDiscovery = nil
}

p.Lock()
defer p.Unlock()
if p.ticker != nil {
p.ticker.Stop()
close(p.tickerStop)
p.ticker = nil
}

for _, pp := range p.producers {
pp.Close()
Expand Down

0 comments on commit 2032bfd

Please sign in to comment.