Skip to content

Commit

Permalink
Fix goroutine leaks in plugin/sampling/strategystore/adaptive
Browse files Browse the repository at this point in the history
This mainly involved ensuring that all goroutines started by the
Processor are shut down in a Close method (which also blocks on
them returning via a WaitGroup).

Adding this flagged an issue where the `runUpdateProbabilitiesLoop`
had a long delay, so tests need to be able to override the default
Processor.followerRefreshInterval, or they take a long time to run.

Signed-off-by: Will Sewell <[email protected]>
  • Loading branch information
Will Sewell committed Mar 29, 2024
1 parent 7c9dce4 commit 250ab88
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 24 deletions.
4 changes: 4 additions & 0 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func (m *mockStrategyStore) GetSamplingStrategy(_ context.Context, serviceName s
return &api_v2.SamplingStrategyResponse{}, nil
}

func (m *mockStrategyStore) Close() error {
return nil
}

func TestCollector_PublishOpts(t *testing.T) {
// prepare
hc := healthcheck.New()
Expand Down
4 changes: 4 additions & 0 deletions cmd/collector/app/sampling/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (s mockSamplingStore) GetSamplingStrategy(ctx context.Context, serviceName
return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil
}

func (s mockSamplingStore) Close() error {
return nil
}

func TestNewGRPCHandler(t *testing.T) {
tests := []struct {
req *api_v2.SamplingStrategyParameters
Expand Down
3 changes: 3 additions & 0 deletions cmd/collector/app/sampling/strategystore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (

// StrategyStore keeps track of service specific sampling strategies.
type StrategyStore interface {
// Close() from io.Closer stops the processor from calculating probabilities.
io.Closer

// GetSamplingStrategy retrieves the sampling strategy for the specified service.
GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error)
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/collector/app/server/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func (s mockSamplingStore) GetSamplingStrategy(_ context.Context, serviceName st
return nil, nil
}

func (s mockSamplingStore) Close() error {
return nil
}

type mockSpanProcessor struct{}

func (p *mockSpanProcessor) Close() error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/clientcfg/clientcfghttp/cfgmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (m *mockSamplingStore) GetSamplingStrategy(_ context.Context, serviceName s
return m.samplingResponse, nil
}

func (m *mockSamplingStore) Close() error {
return nil
}

type mockBaggageMgr struct {
baggageResponse []*baggage.BaggageRestriction
}
Expand Down
26 changes: 16 additions & 10 deletions plugin/sampling/strategystore/adaptive/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package adaptive
import (
"errors"
"flag"
"time"

"github.com/spf13/viper"
"go.uber.org/zap"
Expand All @@ -33,21 +34,25 @@ var _ plugin.Configurable = (*Factory)(nil)

// Factory implements strategystore.Factory for an adaptive strategy store.
type Factory struct {
options *Options
logger *zap.Logger
metricsFactory metrics.Factory
lock distributedlock.Lock
store samplingstore.Store
options *Options
// This is passed to any new Processors created by this Factory.
// It exists here so that test code can override the default.
followerRefreshInterval time.Duration
logger *zap.Logger
metricsFactory metrics.Factory
lock distributedlock.Lock
store samplingstore.Store
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
options: &Options{},
logger: zap.NewNop(),
metricsFactory: metrics.NullFactory,
lock: nil,
store: nil,
options: &Options{},
followerRefreshInterval: defaultFollowerProbabilityInterval,
logger: zap.NewNop(),
metricsFactory: metrics.NullFactory,
lock: nil,
store: nil,
}
}

Expand Down Expand Up @@ -87,6 +92,7 @@ func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategyst
if err != nil {
return nil, nil, err
}
p.followerRefreshInterval = f.followerRefreshInterval
p.Start()
a := NewAggregator(f.metricsFactory, f.options.CalculationInterval, f.store)
a.Start()
Expand Down
5 changes: 4 additions & 1 deletion plugin/sampling/strategystore/adaptive/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ func TestFactory(t *testing.T) {
assert.Equal(t, time.Second*2, f.options.FollowerLeaseRefreshInterval)

require.NoError(t, f.Initialize(metrics.NullFactory, &mockSamplingStoreFactory{}, zap.NewNop()))
_, _, err := f.CreateStrategyStore()
f.followerRefreshInterval = time.Millisecond
store, aggregator, err := f.CreateStrategyStore()
require.NoError(t, err)
require.NoError(t, store.Close())
require.NoError(t, aggregator.Close())
}

func TestBadConfigFail(t *testing.T) {
Expand Down
14 changes: 14 additions & 0 deletions plugin/sampling/strategystore/adaptive/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package adaptive

import (
"testing"

"github.com/jaegertracing/jaeger/pkg/testutils"
)

func TestMain(m *testing.M) {
testutils.VerifyGoLeaks(m)
}
26 changes: 18 additions & 8 deletions plugin/sampling/strategystore/adaptive/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ type Processor struct {

serviceCache []SamplingCache

shutdown chan struct{}
shutdown chan struct{}
bgFinished sync.WaitGroup

operationsCalculatedGauge metrics.Gauge
calculateProbabilitiesLatency metrics.Timer
Expand Down Expand Up @@ -170,19 +171,28 @@ func (p *Processor) Start() error {
p.shutdown = make(chan struct{})
p.loadProbabilities()
p.generateStrategyResponses()
go p.runCalculationLoop()
go p.runUpdateProbabilitiesLoop()
p.runBackground(p.runCalculationLoop)
p.runBackground(p.runUpdateProbabilitiesLoop)
return nil
}

func (p *Processor) runBackground(f func()) {
p.bgFinished.Add(1)
go func() {
f()
p.bgFinished.Done()
}()
}

// Close stops the processor from calculating probabilities.
func (p *Processor) Close() error {
p.logger.Info("stopping adaptive sampling processor")
if err := p.electionParticipant.Close(); err != nil {
return err
err := p.electionParticipant.Close()
if p.shutdown != nil {
close(p.shutdown)
}
close(p.shutdown)
return nil
p.bgFinished.Wait()
return err
}

func (p *Processor) loadProbabilities() {
Expand Down Expand Up @@ -272,7 +282,7 @@ func (p *Processor) runCalculationLoop() {
// be way longer than the time to run the calculations.
p.generateStrategyResponses()
p.calculateProbabilitiesLatency.Record(time.Since(startTime))
go p.saveProbabilitiesAndQPS()
p.runBackground(p.saveProbabilitiesAndQPS)
}
case <-p.shutdown:
return
Expand Down
29 changes: 25 additions & 4 deletions plugin/sampling/strategystore/adaptive/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ func TestRunCalculationLoop(t *testing.T) {
}
p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger)
require.NoError(t, err)
p.followerRefreshInterval = time.Millisecond
p.Start()

for i := 0; i < 1000; i++ {
Expand Down Expand Up @@ -392,9 +393,17 @@ func TestRunCalculationLoop_GetThroughputError(t *testing.T) {
}
p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger)
require.NoError(t, err)
p.followerRefreshInterval = time.Millisecond
p.shutdown = make(chan struct{})
defer close(p.shutdown)
go p.runCalculationLoop()
done := make(chan struct{})
go func() {
p.runCalculationLoop()
close(done)
}()
defer func() {
close(p.shutdown)
<-done
}()

for i := 0; i < 1000; i++ {
// match logs specific to getThroughputErrMsg. We expect to see more than 2, once during
Expand Down Expand Up @@ -432,10 +441,17 @@ func TestRunUpdateProbabilitiesLoop(t *testing.T) {
followerRefreshInterval: time.Millisecond,
electionParticipant: mockEP,
}
defer close(p.shutdown)
require.Nil(t, p.probabilities)
require.Nil(t, p.strategyResponses)
go p.runUpdateProbabilitiesLoop()
done := make(chan struct{})
go func() {
p.runUpdateProbabilitiesLoop()
close(done)
}()
defer func() {
close(p.shutdown)
<-done
}()

for i := 0; i < 1000; i++ {
p.RLock()
Expand All @@ -449,6 +465,7 @@ func TestRunUpdateProbabilitiesLoop(t *testing.T) {
p.RLock()
assert.NotNil(t, p.probabilities)
assert.NotNil(t, p.strategyResponses)
p.RUnlock()
}

func TestRealisticRunCalculationLoop(t *testing.T) {
Expand Down Expand Up @@ -481,6 +498,7 @@ func TestRealisticRunCalculationLoop(t *testing.T) {
}
p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger)
require.NoError(t, err)
p.followerRefreshInterval = time.Millisecond
p.Start()

for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -879,7 +897,9 @@ func TestErrors(t *testing.T) {

p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
p.followerRefreshInterval = time.Millisecond
require.Error(t, p.Start())
require.Error(t, p.Close())

// close errors
mockEP = &epmocks.ElectionParticipant{}
Expand All @@ -889,6 +909,7 @@ func TestErrors(t *testing.T) {

p, err = newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
p.followerRefreshInterval = time.Millisecond
require.NoError(t, p.Start())
require.Error(t, p.Close())
}
3 changes: 2 additions & 1 deletion plugin/sampling/strategystore/static/strategy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ func (h *strategyStore) GetSamplingStrategy(_ context.Context, serviceName strin
}

// Close stops updating the strategies
func (h *strategyStore) Close() {
func (h *strategyStore) Close() error {
h.cancelFunc()
return nil
}

func (h *strategyStore) downloadSamplingStrategies(url string) ([]byte, error) {
Expand Down

0 comments on commit 250ab88

Please sign in to comment.