diff --git a/cmd/collector/app/collector_test.go b/cmd/collector/app/collector_test.go index 6e08ce76a19..8c92cd81e53 100644 --- a/cmd/collector/app/collector_test.go +++ b/cmd/collector/app/collector_test.go @@ -129,6 +129,10 @@ func (m *mockStrategyStore) GetSamplingStrategy(_ context.Context, serviceName s return &api_v2.SamplingStrategyResponse{}, nil } +func (m *mockStrategyStore) Close() error { + return nil +} + func TestCollector_PublishOpts(t *testing.T) { // prepare hc := healthcheck.New() diff --git a/cmd/collector/app/sampling/grpc_handler_test.go b/cmd/collector/app/sampling/grpc_handler_test.go index 3a6590bf1ad..439e3bef646 100644 --- a/cmd/collector/app/sampling/grpc_handler_test.go +++ b/cmd/collector/app/sampling/grpc_handler_test.go @@ -37,6 +37,10 @@ func (s mockSamplingStore) GetSamplingStrategy(ctx context.Context, serviceName return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil } +func (s mockSamplingStore) Close() error { + return nil +} + func TestNewGRPCHandler(t *testing.T) { tests := []struct { req *api_v2.SamplingStrategyParameters diff --git a/cmd/collector/app/sampling/strategystore/interface.go b/cmd/collector/app/sampling/strategystore/interface.go index 8ee99491110..90d9464918d 100644 --- a/cmd/collector/app/sampling/strategystore/interface.go +++ b/cmd/collector/app/sampling/strategystore/interface.go @@ -24,6 +24,9 @@ import ( // StrategyStore keeps track of service specific sampling strategies. type StrategyStore interface { + // Close() from io.Closer stops the processor from calculating probabilities. + io.Closer + // GetSamplingStrategy retrieves the sampling strategy for the specified service. GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) } diff --git a/cmd/collector/app/server/test.go b/cmd/collector/app/server/test.go index 4f78e75866c..b670934a785 100644 --- a/cmd/collector/app/server/test.go +++ b/cmd/collector/app/server/test.go @@ -28,6 +28,10 @@ func (s mockSamplingStore) GetSamplingStrategy(_ context.Context, serviceName st return nil, nil } +func (s mockSamplingStore) Close() error { + return nil +} + type mockSpanProcessor struct{} func (p *mockSpanProcessor) Close() error { diff --git a/pkg/clientcfg/clientcfghttp/cfgmgr_test.go b/pkg/clientcfg/clientcfghttp/cfgmgr_test.go index f6da22f7909..7c24bbeda24 100644 --- a/pkg/clientcfg/clientcfghttp/cfgmgr_test.go +++ b/pkg/clientcfg/clientcfghttp/cfgmgr_test.go @@ -37,6 +37,10 @@ func (m *mockSamplingStore) GetSamplingStrategy(_ context.Context, serviceName s return m.samplingResponse, nil } +func (m *mockSamplingStore) Close() error { + return nil +} + type mockBaggageMgr struct { baggageResponse []*baggage.BaggageRestriction } diff --git a/plugin/sampling/strategystore/adaptive/factory.go b/plugin/sampling/strategystore/adaptive/factory.go index d45f8e61f9b..3e3fcaab469 100644 --- a/plugin/sampling/strategystore/adaptive/factory.go +++ b/plugin/sampling/strategystore/adaptive/factory.go @@ -17,6 +17,7 @@ package adaptive import ( "errors" "flag" + "time" "github.com/spf13/viper" "go.uber.org/zap" @@ -33,21 +34,25 @@ var _ plugin.Configurable = (*Factory)(nil) // Factory implements strategystore.Factory for an adaptive strategy store. type Factory struct { - options *Options - logger *zap.Logger - metricsFactory metrics.Factory - lock distributedlock.Lock - store samplingstore.Store + options *Options + // This is passed to any new Processors created by this Factory. + // It exists here so that test code can override the default. + followerRefreshInterval time.Duration + logger *zap.Logger + metricsFactory metrics.Factory + lock distributedlock.Lock + store samplingstore.Store } // NewFactory creates a new Factory. func NewFactory() *Factory { return &Factory{ - options: &Options{}, - logger: zap.NewNop(), - metricsFactory: metrics.NullFactory, - lock: nil, - store: nil, + options: &Options{}, + followerRefreshInterval: defaultFollowerProbabilityInterval, + logger: zap.NewNop(), + metricsFactory: metrics.NullFactory, + lock: nil, + store: nil, } } @@ -87,6 +92,7 @@ func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategyst if err != nil { return nil, nil, err } + p.followerRefreshInterval = f.followerRefreshInterval p.Start() a := NewAggregator(f.metricsFactory, f.options.CalculationInterval, f.store) a.Start() diff --git a/plugin/sampling/strategystore/adaptive/factory_test.go b/plugin/sampling/strategystore/adaptive/factory_test.go index 71a7bf0bb60..e8dc519daad 100644 --- a/plugin/sampling/strategystore/adaptive/factory_test.go +++ b/plugin/sampling/strategystore/adaptive/factory_test.go @@ -72,8 +72,11 @@ func TestFactory(t *testing.T) { assert.Equal(t, time.Second*2, f.options.FollowerLeaseRefreshInterval) require.NoError(t, f.Initialize(metrics.NullFactory, &mockSamplingStoreFactory{}, zap.NewNop())) - _, _, err := f.CreateStrategyStore() + f.followerRefreshInterval = time.Millisecond + store, aggregator, err := f.CreateStrategyStore() require.NoError(t, err) + require.NoError(t, store.Close()) + require.NoError(t, aggregator.Close()) } func TestBadConfigFail(t *testing.T) { diff --git a/plugin/sampling/strategystore/adaptive/package_test.go b/plugin/sampling/strategystore/adaptive/package_test.go new file mode 100644 index 00000000000..64db46d18bc --- /dev/null +++ b/plugin/sampling/strategystore/adaptive/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptive + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/plugin/sampling/strategystore/adaptive/processor.go b/plugin/sampling/strategystore/adaptive/processor.go index a87e98f8fc9..5e3129b4fce 100644 --- a/plugin/sampling/strategystore/adaptive/processor.go +++ b/plugin/sampling/strategystore/adaptive/processor.go @@ -110,7 +110,8 @@ type Processor struct { serviceCache []SamplingCache - shutdown chan struct{} + shutdown chan struct{} + bgFinished sync.WaitGroup operationsCalculatedGauge metrics.Gauge calculateProbabilitiesLatency metrics.Timer @@ -170,19 +171,28 @@ func (p *Processor) Start() error { p.shutdown = make(chan struct{}) p.loadProbabilities() p.generateStrategyResponses() - go p.runCalculationLoop() - go p.runUpdateProbabilitiesLoop() + p.runBackground(p.runCalculationLoop) + p.runBackground(p.runUpdateProbabilitiesLoop) return nil } +func (p *Processor) runBackground(f func()) { + p.bgFinished.Add(1) + go func() { + f() + p.bgFinished.Done() + }() +} + // Close stops the processor from calculating probabilities. func (p *Processor) Close() error { p.logger.Info("stopping adaptive sampling processor") - if err := p.electionParticipant.Close(); err != nil { - return err + err := p.electionParticipant.Close() + if p.shutdown != nil { + close(p.shutdown) } - close(p.shutdown) - return nil + p.bgFinished.Wait() + return err } func (p *Processor) loadProbabilities() { @@ -272,7 +282,7 @@ func (p *Processor) runCalculationLoop() { // be way longer than the time to run the calculations. p.generateStrategyResponses() p.calculateProbabilitiesLatency.Record(time.Since(startTime)) - go p.saveProbabilitiesAndQPS() + p.runBackground(p.saveProbabilitiesAndQPS) } case <-p.shutdown: return diff --git a/plugin/sampling/strategystore/adaptive/processor_test.go b/plugin/sampling/strategystore/adaptive/processor_test.go index c63ea246a71..4d6e8b64791 100644 --- a/plugin/sampling/strategystore/adaptive/processor_test.go +++ b/plugin/sampling/strategystore/adaptive/processor_test.go @@ -359,6 +359,7 @@ func TestRunCalculationLoop(t *testing.T) { } p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger) require.NoError(t, err) + p.followerRefreshInterval = time.Millisecond p.Start() for i := 0; i < 1000; i++ { @@ -392,9 +393,17 @@ func TestRunCalculationLoop_GetThroughputError(t *testing.T) { } p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger) require.NoError(t, err) + p.followerRefreshInterval = time.Millisecond p.shutdown = make(chan struct{}) - defer close(p.shutdown) - go p.runCalculationLoop() + done := make(chan struct{}) + go func() { + p.runCalculationLoop() + close(done) + }() + defer func() { + close(p.shutdown) + <-done + }() for i := 0; i < 1000; i++ { // match logs specific to getThroughputErrMsg. We expect to see more than 2, once during @@ -432,10 +441,17 @@ func TestRunUpdateProbabilitiesLoop(t *testing.T) { followerRefreshInterval: time.Millisecond, electionParticipant: mockEP, } - defer close(p.shutdown) require.Nil(t, p.probabilities) require.Nil(t, p.strategyResponses) - go p.runUpdateProbabilitiesLoop() + done := make(chan struct{}) + go func() { + p.runUpdateProbabilitiesLoop() + close(done) + }() + defer func() { + close(p.shutdown) + <-done + }() for i := 0; i < 1000; i++ { p.RLock() @@ -449,6 +465,7 @@ func TestRunUpdateProbabilitiesLoop(t *testing.T) { p.RLock() assert.NotNil(t, p.probabilities) assert.NotNil(t, p.strategyResponses) + p.RUnlock() } func TestRealisticRunCalculationLoop(t *testing.T) { @@ -481,6 +498,7 @@ func TestRealisticRunCalculationLoop(t *testing.T) { } p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger) require.NoError(t, err) + p.followerRefreshInterval = time.Millisecond p.Start() for i := 0; i < 100; i++ { @@ -879,7 +897,9 @@ func TestErrors(t *testing.T) { p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, zap.NewNop()) require.NoError(t, err) + p.followerRefreshInterval = time.Millisecond require.Error(t, p.Start()) + require.Error(t, p.Close()) // close errors mockEP = &epmocks.ElectionParticipant{} @@ -889,6 +909,7 @@ func TestErrors(t *testing.T) { p, err = newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, zap.NewNop()) require.NoError(t, err) + p.followerRefreshInterval = time.Millisecond require.NoError(t, p.Start()) require.Error(t, p.Close()) } diff --git a/plugin/sampling/strategystore/static/strategy_store.go b/plugin/sampling/strategystore/static/strategy_store.go index cf8544630e9..feb6d134197 100644 --- a/plugin/sampling/strategystore/static/strategy_store.go +++ b/plugin/sampling/strategystore/static/strategy_store.go @@ -91,8 +91,9 @@ func (h *strategyStore) GetSamplingStrategy(_ context.Context, serviceName strin } // Close stops updating the strategies -func (h *strategyStore) Close() { +func (h *strategyStore) Close() error { h.cancelFunc() + return nil } func (h *strategyStore) downloadSamplingStrategies(url string) ([]byte, error) {