From 250ab88af84c7c078dd357dd735e4884e53efeab Mon Sep 17 00:00:00 2001 From: Will Sewell Date: Fri, 29 Mar 2024 14:25:27 +0000 Subject: [PATCH 1/3] Fix goroutine leaks in plugin/sampling/strategystore/adaptive This mainly involved ensuring that all goroutines started by the Processor are shut down in a Close method (which also blocks on them returning via a WaitGroup). Adding this flagged an issue where the `runUpdateProbabilitiesLoop` had a long delay, so tests need to be able to override the default Processor.followerRefreshInterval, or they take a long time to run. Signed-off-by: Will Sewell --- cmd/collector/app/collector_test.go | 4 +++ .../app/sampling/grpc_handler_test.go | 4 +++ .../app/sampling/strategystore/interface.go | 3 ++ cmd/collector/app/server/test.go | 4 +++ pkg/clientcfg/clientcfghttp/cfgmgr_test.go | 4 +++ .../strategystore/adaptive/factory.go | 26 ++++++++++------- .../strategystore/adaptive/factory_test.go | 5 +++- .../strategystore/adaptive/package_test.go | 14 +++++++++ .../strategystore/adaptive/processor.go | 26 ++++++++++++----- .../strategystore/adaptive/processor_test.go | 29 ++++++++++++++++--- .../strategystore/static/strategy_store.go | 3 +- 11 files changed, 98 insertions(+), 24 deletions(-) create mode 100644 plugin/sampling/strategystore/adaptive/package_test.go diff --git a/cmd/collector/app/collector_test.go b/cmd/collector/app/collector_test.go index 6e08ce76a19..8c92cd81e53 100644 --- a/cmd/collector/app/collector_test.go +++ b/cmd/collector/app/collector_test.go @@ -129,6 +129,10 @@ func (m *mockStrategyStore) GetSamplingStrategy(_ context.Context, serviceName s return &api_v2.SamplingStrategyResponse{}, nil } +func (m *mockStrategyStore) Close() error { + return nil +} + func TestCollector_PublishOpts(t *testing.T) { // prepare hc := healthcheck.New() diff --git a/cmd/collector/app/sampling/grpc_handler_test.go b/cmd/collector/app/sampling/grpc_handler_test.go index 3a6590bf1ad..439e3bef646 100644 --- a/cmd/collector/app/sampling/grpc_handler_test.go +++ b/cmd/collector/app/sampling/grpc_handler_test.go @@ -37,6 +37,10 @@ func (s mockSamplingStore) GetSamplingStrategy(ctx context.Context, serviceName return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil } +func (s mockSamplingStore) Close() error { + return nil +} + func TestNewGRPCHandler(t *testing.T) { tests := []struct { req *api_v2.SamplingStrategyParameters diff --git a/cmd/collector/app/sampling/strategystore/interface.go b/cmd/collector/app/sampling/strategystore/interface.go index 8ee99491110..90d9464918d 100644 --- a/cmd/collector/app/sampling/strategystore/interface.go +++ b/cmd/collector/app/sampling/strategystore/interface.go @@ -24,6 +24,9 @@ import ( // StrategyStore keeps track of service specific sampling strategies. type StrategyStore interface { + // Close() from io.Closer stops the processor from calculating probabilities. + io.Closer + // GetSamplingStrategy retrieves the sampling strategy for the specified service. GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) } diff --git a/cmd/collector/app/server/test.go b/cmd/collector/app/server/test.go index 4f78e75866c..b670934a785 100644 --- a/cmd/collector/app/server/test.go +++ b/cmd/collector/app/server/test.go @@ -28,6 +28,10 @@ func (s mockSamplingStore) GetSamplingStrategy(_ context.Context, serviceName st return nil, nil } +func (s mockSamplingStore) Close() error { + return nil +} + type mockSpanProcessor struct{} func (p *mockSpanProcessor) Close() error { diff --git a/pkg/clientcfg/clientcfghttp/cfgmgr_test.go b/pkg/clientcfg/clientcfghttp/cfgmgr_test.go index f6da22f7909..7c24bbeda24 100644 --- a/pkg/clientcfg/clientcfghttp/cfgmgr_test.go +++ b/pkg/clientcfg/clientcfghttp/cfgmgr_test.go @@ -37,6 +37,10 @@ func (m *mockSamplingStore) GetSamplingStrategy(_ context.Context, serviceName s return m.samplingResponse, nil } +func (m *mockSamplingStore) Close() error { + return nil +} + type mockBaggageMgr struct { baggageResponse []*baggage.BaggageRestriction } diff --git a/plugin/sampling/strategystore/adaptive/factory.go b/plugin/sampling/strategystore/adaptive/factory.go index d45f8e61f9b..3e3fcaab469 100644 --- a/plugin/sampling/strategystore/adaptive/factory.go +++ b/plugin/sampling/strategystore/adaptive/factory.go @@ -17,6 +17,7 @@ package adaptive import ( "errors" "flag" + "time" "github.com/spf13/viper" "go.uber.org/zap" @@ -33,21 +34,25 @@ var _ plugin.Configurable = (*Factory)(nil) // Factory implements strategystore.Factory for an adaptive strategy store. type Factory struct { - options *Options - logger *zap.Logger - metricsFactory metrics.Factory - lock distributedlock.Lock - store samplingstore.Store + options *Options + // This is passed to any new Processors created by this Factory. + // It exists here so that test code can override the default. + followerRefreshInterval time.Duration + logger *zap.Logger + metricsFactory metrics.Factory + lock distributedlock.Lock + store samplingstore.Store } // NewFactory creates a new Factory. func NewFactory() *Factory { return &Factory{ - options: &Options{}, - logger: zap.NewNop(), - metricsFactory: metrics.NullFactory, - lock: nil, - store: nil, + options: &Options{}, + followerRefreshInterval: defaultFollowerProbabilityInterval, + logger: zap.NewNop(), + metricsFactory: metrics.NullFactory, + lock: nil, + store: nil, } } @@ -87,6 +92,7 @@ func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategyst if err != nil { return nil, nil, err } + p.followerRefreshInterval = f.followerRefreshInterval p.Start() a := NewAggregator(f.metricsFactory, f.options.CalculationInterval, f.store) a.Start() diff --git a/plugin/sampling/strategystore/adaptive/factory_test.go b/plugin/sampling/strategystore/adaptive/factory_test.go index 71a7bf0bb60..e8dc519daad 100644 --- a/plugin/sampling/strategystore/adaptive/factory_test.go +++ b/plugin/sampling/strategystore/adaptive/factory_test.go @@ -72,8 +72,11 @@ func TestFactory(t *testing.T) { assert.Equal(t, time.Second*2, f.options.FollowerLeaseRefreshInterval) require.NoError(t, f.Initialize(metrics.NullFactory, &mockSamplingStoreFactory{}, zap.NewNop())) - _, _, err := f.CreateStrategyStore() + f.followerRefreshInterval = time.Millisecond + store, aggregator, err := f.CreateStrategyStore() require.NoError(t, err) + require.NoError(t, store.Close()) + require.NoError(t, aggregator.Close()) } func TestBadConfigFail(t *testing.T) { diff --git a/plugin/sampling/strategystore/adaptive/package_test.go b/plugin/sampling/strategystore/adaptive/package_test.go new file mode 100644 index 00000000000..64db46d18bc --- /dev/null +++ b/plugin/sampling/strategystore/adaptive/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptive + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/plugin/sampling/strategystore/adaptive/processor.go b/plugin/sampling/strategystore/adaptive/processor.go index a87e98f8fc9..5e3129b4fce 100644 --- a/plugin/sampling/strategystore/adaptive/processor.go +++ b/plugin/sampling/strategystore/adaptive/processor.go @@ -110,7 +110,8 @@ type Processor struct { serviceCache []SamplingCache - shutdown chan struct{} + shutdown chan struct{} + bgFinished sync.WaitGroup operationsCalculatedGauge metrics.Gauge calculateProbabilitiesLatency metrics.Timer @@ -170,19 +171,28 @@ func (p *Processor) Start() error { p.shutdown = make(chan struct{}) p.loadProbabilities() p.generateStrategyResponses() - go p.runCalculationLoop() - go p.runUpdateProbabilitiesLoop() + p.runBackground(p.runCalculationLoop) + p.runBackground(p.runUpdateProbabilitiesLoop) return nil } +func (p *Processor) runBackground(f func()) { + p.bgFinished.Add(1) + go func() { + f() + p.bgFinished.Done() + }() +} + // Close stops the processor from calculating probabilities. func (p *Processor) Close() error { p.logger.Info("stopping adaptive sampling processor") - if err := p.electionParticipant.Close(); err != nil { - return err + err := p.electionParticipant.Close() + if p.shutdown != nil { + close(p.shutdown) } - close(p.shutdown) - return nil + p.bgFinished.Wait() + return err } func (p *Processor) loadProbabilities() { @@ -272,7 +282,7 @@ func (p *Processor) runCalculationLoop() { // be way longer than the time to run the calculations. p.generateStrategyResponses() p.calculateProbabilitiesLatency.Record(time.Since(startTime)) - go p.saveProbabilitiesAndQPS() + p.runBackground(p.saveProbabilitiesAndQPS) } case <-p.shutdown: return diff --git a/plugin/sampling/strategystore/adaptive/processor_test.go b/plugin/sampling/strategystore/adaptive/processor_test.go index c63ea246a71..4d6e8b64791 100644 --- a/plugin/sampling/strategystore/adaptive/processor_test.go +++ b/plugin/sampling/strategystore/adaptive/processor_test.go @@ -359,6 +359,7 @@ func TestRunCalculationLoop(t *testing.T) { } p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger) require.NoError(t, err) + p.followerRefreshInterval = time.Millisecond p.Start() for i := 0; i < 1000; i++ { @@ -392,9 +393,17 @@ func TestRunCalculationLoop_GetThroughputError(t *testing.T) { } p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger) require.NoError(t, err) + p.followerRefreshInterval = time.Millisecond p.shutdown = make(chan struct{}) - defer close(p.shutdown) - go p.runCalculationLoop() + done := make(chan struct{}) + go func() { + p.runCalculationLoop() + close(done) + }() + defer func() { + close(p.shutdown) + <-done + }() for i := 0; i < 1000; i++ { // match logs specific to getThroughputErrMsg. We expect to see more than 2, once during @@ -432,10 +441,17 @@ func TestRunUpdateProbabilitiesLoop(t *testing.T) { followerRefreshInterval: time.Millisecond, electionParticipant: mockEP, } - defer close(p.shutdown) require.Nil(t, p.probabilities) require.Nil(t, p.strategyResponses) - go p.runUpdateProbabilitiesLoop() + done := make(chan struct{}) + go func() { + p.runUpdateProbabilitiesLoop() + close(done) + }() + defer func() { + close(p.shutdown) + <-done + }() for i := 0; i < 1000; i++ { p.RLock() @@ -449,6 +465,7 @@ func TestRunUpdateProbabilitiesLoop(t *testing.T) { p.RLock() assert.NotNil(t, p.probabilities) assert.NotNil(t, p.strategyResponses) + p.RUnlock() } func TestRealisticRunCalculationLoop(t *testing.T) { @@ -481,6 +498,7 @@ func TestRealisticRunCalculationLoop(t *testing.T) { } p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger) require.NoError(t, err) + p.followerRefreshInterval = time.Millisecond p.Start() for i := 0; i < 100; i++ { @@ -879,7 +897,9 @@ func TestErrors(t *testing.T) { p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, zap.NewNop()) require.NoError(t, err) + p.followerRefreshInterval = time.Millisecond require.Error(t, p.Start()) + require.Error(t, p.Close()) // close errors mockEP = &epmocks.ElectionParticipant{} @@ -889,6 +909,7 @@ func TestErrors(t *testing.T) { p, err = newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, zap.NewNop()) require.NoError(t, err) + p.followerRefreshInterval = time.Millisecond require.NoError(t, p.Start()) require.Error(t, p.Close()) } diff --git a/plugin/sampling/strategystore/static/strategy_store.go b/plugin/sampling/strategystore/static/strategy_store.go index cf8544630e9..feb6d134197 100644 --- a/plugin/sampling/strategystore/static/strategy_store.go +++ b/plugin/sampling/strategystore/static/strategy_store.go @@ -91,8 +91,9 @@ func (h *strategyStore) GetSamplingStrategy(_ context.Context, serviceName strin } // Close stops updating the strategies -func (h *strategyStore) Close() { +func (h *strategyStore) Close() error { h.cancelFunc() + return nil } func (h *strategyStore) downloadSamplingStrategies(url string) ([]byte, error) { From dfe7adc7fd34389e2e507d9512eb64b5e19048f7 Mon Sep 17 00:00:00 2001 From: Will Sewell Date: Fri, 29 Mar 2024 14:46:46 +0000 Subject: [PATCH 2/3] Allow runUpdateProbabilitiesLoop to be unblocked in delay phase Rather than having to set the "delay phase" to a low value, we instead make it possible for the `shutdown` channel to unblock the delay. Signed-off-by: Will Sewell --- .../strategystore/adaptive/factory.go | 26 +++++++------------ .../strategystore/adaptive/factory_test.go | 1 - .../strategystore/adaptive/processor.go | 14 ++++++---- .../strategystore/adaptive/processor_test.go | 5 ---- 4 files changed, 19 insertions(+), 27 deletions(-) diff --git a/plugin/sampling/strategystore/adaptive/factory.go b/plugin/sampling/strategystore/adaptive/factory.go index 3e3fcaab469..d45f8e61f9b 100644 --- a/plugin/sampling/strategystore/adaptive/factory.go +++ b/plugin/sampling/strategystore/adaptive/factory.go @@ -17,7 +17,6 @@ package adaptive import ( "errors" "flag" - "time" "github.com/spf13/viper" "go.uber.org/zap" @@ -34,25 +33,21 @@ var _ plugin.Configurable = (*Factory)(nil) // Factory implements strategystore.Factory for an adaptive strategy store. type Factory struct { - options *Options - // This is passed to any new Processors created by this Factory. - // It exists here so that test code can override the default. - followerRefreshInterval time.Duration - logger *zap.Logger - metricsFactory metrics.Factory - lock distributedlock.Lock - store samplingstore.Store + options *Options + logger *zap.Logger + metricsFactory metrics.Factory + lock distributedlock.Lock + store samplingstore.Store } // NewFactory creates a new Factory. func NewFactory() *Factory { return &Factory{ - options: &Options{}, - followerRefreshInterval: defaultFollowerProbabilityInterval, - logger: zap.NewNop(), - metricsFactory: metrics.NullFactory, - lock: nil, - store: nil, + options: &Options{}, + logger: zap.NewNop(), + metricsFactory: metrics.NullFactory, + lock: nil, + store: nil, } } @@ -92,7 +87,6 @@ func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategyst if err != nil { return nil, nil, err } - p.followerRefreshInterval = f.followerRefreshInterval p.Start() a := NewAggregator(f.metricsFactory, f.options.CalculationInterval, f.store) a.Start() diff --git a/plugin/sampling/strategystore/adaptive/factory_test.go b/plugin/sampling/strategystore/adaptive/factory_test.go index e8dc519daad..99c02b8c1f2 100644 --- a/plugin/sampling/strategystore/adaptive/factory_test.go +++ b/plugin/sampling/strategystore/adaptive/factory_test.go @@ -72,7 +72,6 @@ func TestFactory(t *testing.T) { assert.Equal(t, time.Second*2, f.options.FollowerLeaseRefreshInterval) require.NoError(t, f.Initialize(metrics.NullFactory, &mockSamplingStoreFactory{}, zap.NewNop())) - f.followerRefreshInterval = time.Millisecond store, aggregator, err := f.CreateStrategyStore() require.NoError(t, err) require.NoError(t, store.Close()) diff --git a/plugin/sampling/strategystore/adaptive/processor.go b/plugin/sampling/strategystore/adaptive/processor.go index 5e3129b4fce..7ac5c1834fa 100644 --- a/plugin/sampling/strategystore/adaptive/processor.go +++ b/plugin/sampling/strategystore/adaptive/processor.go @@ -210,7 +210,12 @@ func (p *Processor) loadProbabilities() { // runUpdateProbabilitiesLoop is a loop that reads probabilities from storage. // The follower updates its local cache with the latest probabilities and serves them. func (p *Processor) runUpdateProbabilitiesLoop() { - addJitter(p.followerRefreshInterval) + select { + case <-time.After(addJitter(p.followerRefreshInterval)): + case <-p.shutdown: + return + } + ticker := time.NewTicker(p.followerRefreshInterval) defer ticker.Stop() for { @@ -231,13 +236,12 @@ func (p *Processor) isLeader() bool { return p.electionParticipant.IsLeader() } -// addJitter sleeps for a random amount of time. Without jitter, if the host holding the leader +// addJitter adds a random amount of time. Without jitter, if the host holding the leader // lock were to die, then all other collectors can potentially wait for a full cycle before // trying to acquire the lock. With jitter, we can reduce the average amount of time before a // new leader is elected. Furthermore, jitter can be used to spread out read load on storage. -func addJitter(jitterAmount time.Duration) { - delay := (jitterAmount / 2) + time.Duration(rand.Int63n(int64(jitterAmount/2))) - time.Sleep(delay) +func addJitter(jitterAmount time.Duration) time.Duration { + return (jitterAmount / 2) + time.Duration(rand.Int63n(int64(jitterAmount/2))) } func (p *Processor) runCalculationLoop() { diff --git a/plugin/sampling/strategystore/adaptive/processor_test.go b/plugin/sampling/strategystore/adaptive/processor_test.go index 4d6e8b64791..4615b65fa49 100644 --- a/plugin/sampling/strategystore/adaptive/processor_test.go +++ b/plugin/sampling/strategystore/adaptive/processor_test.go @@ -359,7 +359,6 @@ func TestRunCalculationLoop(t *testing.T) { } p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger) require.NoError(t, err) - p.followerRefreshInterval = time.Millisecond p.Start() for i := 0; i < 1000; i++ { @@ -393,7 +392,6 @@ func TestRunCalculationLoop_GetThroughputError(t *testing.T) { } p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger) require.NoError(t, err) - p.followerRefreshInterval = time.Millisecond p.shutdown = make(chan struct{}) done := make(chan struct{}) go func() { @@ -498,7 +496,6 @@ func TestRealisticRunCalculationLoop(t *testing.T) { } p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger) require.NoError(t, err) - p.followerRefreshInterval = time.Millisecond p.Start() for i := 0; i < 100; i++ { @@ -897,7 +894,6 @@ func TestErrors(t *testing.T) { p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, zap.NewNop()) require.NoError(t, err) - p.followerRefreshInterval = time.Millisecond require.Error(t, p.Start()) require.Error(t, p.Close()) @@ -909,7 +905,6 @@ func TestErrors(t *testing.T) { p, err = newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, zap.NewNop()) require.NoError(t, err) - p.followerRefreshInterval = time.Millisecond require.NoError(t, p.Start()) require.Error(t, p.Close()) } From f4560390875996446739c6aebc82d37b39b454a4 Mon Sep 17 00:00:00 2001 From: Will Sewell Date: Fri, 29 Mar 2024 16:25:34 +0000 Subject: [PATCH 3/3] Don't block on goroutines returning in processor_test.go Blocking on all goroutines fully returning is not necssary to appearse goleak. See: https://github.com/jaegertracing/jaeger/pull/5310/files/dfe7adc7fd34389e2e507d9512eb64b5e19048f7#r1544603132 Signed-off-by: Will Sewell --- .../strategystore/adaptive/processor_test.go | 22 ++++--------------- 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/plugin/sampling/strategystore/adaptive/processor_test.go b/plugin/sampling/strategystore/adaptive/processor_test.go index 4615b65fa49..03ae6b7b7bb 100644 --- a/plugin/sampling/strategystore/adaptive/processor_test.go +++ b/plugin/sampling/strategystore/adaptive/processor_test.go @@ -393,15 +393,8 @@ func TestRunCalculationLoop_GetThroughputError(t *testing.T) { p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger) require.NoError(t, err) p.shutdown = make(chan struct{}) - done := make(chan struct{}) - go func() { - p.runCalculationLoop() - close(done) - }() - defer func() { - close(p.shutdown) - <-done - }() + defer close(p.shutdown) + go p.runCalculationLoop() for i := 0; i < 1000; i++ { // match logs specific to getThroughputErrMsg. We expect to see more than 2, once during @@ -439,17 +432,10 @@ func TestRunUpdateProbabilitiesLoop(t *testing.T) { followerRefreshInterval: time.Millisecond, electionParticipant: mockEP, } + defer close(p.shutdown) require.Nil(t, p.probabilities) require.Nil(t, p.strategyResponses) - done := make(chan struct{}) - go func() { - p.runUpdateProbabilitiesLoop() - close(done) - }() - defer func() { - close(p.shutdown) - <-done - }() + go p.runUpdateProbabilitiesLoop() for i := 0; i < 1000; i++ { p.RLock()