Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix goroutine leaks in plugin/sampling/strategystore/adaptive #5310

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func (m *mockStrategyStore) GetSamplingStrategy(_ context.Context, serviceName s
return &api_v2.SamplingStrategyResponse{}, nil
}

func (m *mockStrategyStore) Close() error {
return nil
}

func TestCollector_PublishOpts(t *testing.T) {
// prepare
hc := healthcheck.New()
Expand Down
4 changes: 4 additions & 0 deletions cmd/collector/app/sampling/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (s mockSamplingStore) GetSamplingStrategy(ctx context.Context, serviceName
return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil
}

func (s mockSamplingStore) Close() error {
return nil
}

func TestNewGRPCHandler(t *testing.T) {
tests := []struct {
req *api_v2.SamplingStrategyParameters
Expand Down
3 changes: 3 additions & 0 deletions cmd/collector/app/sampling/strategystore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (

// StrategyStore keeps track of service specific sampling strategies.
type StrategyStore interface {
// Close() from io.Closer stops the processor from calculating probabilities.
io.Closer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You added this to the interface, but I am not seeing any non-test code that's actually calling it.

Adding Closer to interface it often contentious, some people argue that if you create an object via NewX() *X, you already have the ability to call Close on it without adding Close function to the interface that X implements. This doesn't work well when factories are involved since the factory does return an interface, not an actual struct. One other workaround to that is doing a runtime check for io.Closer interface and only then calling close - this is why I am asking about prod code calling it.

I'm ok to keep io.Closer in the interface because both real implementations are now closable (static store used to not have close before we added file watcher to it)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think that makes sense.

This doesn't work well when factories are involved since the factory does return an interface, not an actual struct.

Is there a fundamental reason why factories shouldn't return a struct instead of an interface? (Other than it being a breaking change to make in this instance).

One other workaround to that is doing a runtime check for io.Closer interface and only then calling close - this is why I am asking about prod code calling it.

Prod code is not calling Close - do you have a preference between the current implementation vs the runtime check in tests? I don't feel strongly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a fundamental reason why factories shouldn't return a struct instead of an interface?

Yes - polymorphism. The whole point of a factory is to abstract what underlying implementation it creates, which means it always returns an interface.

Prod code is not calling Close

I actually think our pattern is that the main code only calls factory.Close() and the factory is generally responsible for releasing any resources. E.g. we don't call Close on SpanReader that we obtain from the factory.


// GetSamplingStrategy retrieves the sampling strategy for the specified service.
GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error)
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/collector/app/server/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
return nil, nil
}

func (s mockSamplingStore) Close() error {
return nil

Check warning on line 32 in cmd/collector/app/server/test.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/server/test.go#L31-L32

Added lines #L31 - L32 were not covered by tests
}

type mockSpanProcessor struct{}

func (p *mockSpanProcessor) Close() error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/clientcfg/clientcfghttp/cfgmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (m *mockSamplingStore) GetSamplingStrategy(_ context.Context, serviceName s
return m.samplingResponse, nil
}

func (m *mockSamplingStore) Close() error {
return nil
}

type mockBaggageMgr struct {
baggageResponse []*baggage.BaggageRestriction
}
Expand Down
4 changes: 3 additions & 1 deletion plugin/sampling/strategystore/adaptive/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ func TestFactory(t *testing.T) {
assert.Equal(t, time.Second*2, f.options.FollowerLeaseRefreshInterval)

require.NoError(t, f.Initialize(metrics.NullFactory, &mockSamplingStoreFactory{}, zap.NewNop()))
_, _, err := f.CreateStrategyStore()
store, aggregator, err := f.CreateStrategyStore()
require.NoError(t, err)
require.NoError(t, store.Close())
require.NoError(t, aggregator.Close())
}

func TestBadConfigFail(t *testing.T) {
Expand Down
14 changes: 14 additions & 0 deletions plugin/sampling/strategystore/adaptive/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package adaptive

import (
"testing"

"github.com/jaegertracing/jaeger/pkg/testutils"
)

func TestMain(m *testing.M) {
testutils.VerifyGoLeaks(m)
}
40 changes: 27 additions & 13 deletions plugin/sampling/strategystore/adaptive/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ type Processor struct {

serviceCache []SamplingCache

shutdown chan struct{}
shutdown chan struct{}
bgFinished sync.WaitGroup

operationsCalculatedGauge metrics.Gauge
calculateProbabilitiesLatency metrics.Timer
Expand Down Expand Up @@ -170,19 +171,28 @@ func (p *Processor) Start() error {
p.shutdown = make(chan struct{})
p.loadProbabilities()
p.generateStrategyResponses()
go p.runCalculationLoop()
go p.runUpdateProbabilitiesLoop()
p.runBackground(p.runCalculationLoop)
p.runBackground(p.runUpdateProbabilitiesLoop)
return nil
}

func (p *Processor) runBackground(f func()) {
p.bgFinished.Add(1)
go func() {
f()
p.bgFinished.Done()
}()
}

// Close stops the processor from calculating probabilities.
func (p *Processor) Close() error {
p.logger.Info("stopping adaptive sampling processor")
if err := p.electionParticipant.Close(); err != nil {
return err
err := p.electionParticipant.Close()
if p.shutdown != nil {
close(p.shutdown)
}
close(p.shutdown)
return nil
p.bgFinished.Wait()
return err
}

func (p *Processor) loadProbabilities() {
Expand All @@ -200,7 +210,12 @@ func (p *Processor) loadProbabilities() {
// runUpdateProbabilitiesLoop is a loop that reads probabilities from storage.
// The follower updates its local cache with the latest probabilities and serves them.
func (p *Processor) runUpdateProbabilitiesLoop() {
addJitter(p.followerRefreshInterval)
select {
case <-time.After(addJitter(p.followerRefreshInterval)):
case <-p.shutdown:
return
}

ticker := time.NewTicker(p.followerRefreshInterval)
defer ticker.Stop()
for {
Expand All @@ -221,13 +236,12 @@ func (p *Processor) isLeader() bool {
return p.electionParticipant.IsLeader()
}

// addJitter sleeps for a random amount of time. Without jitter, if the host holding the leader
// addJitter adds a random amount of time. Without jitter, if the host holding the leader
// lock were to die, then all other collectors can potentially wait for a full cycle before
// trying to acquire the lock. With jitter, we can reduce the average amount of time before a
// new leader is elected. Furthermore, jitter can be used to spread out read load on storage.
func addJitter(jitterAmount time.Duration) {
delay := (jitterAmount / 2) + time.Duration(rand.Int63n(int64(jitterAmount/2)))
time.Sleep(delay)
func addJitter(jitterAmount time.Duration) time.Duration {
return (jitterAmount / 2) + time.Duration(rand.Int63n(int64(jitterAmount/2)))
}

func (p *Processor) runCalculationLoop() {
Expand Down Expand Up @@ -272,7 +286,7 @@ func (p *Processor) runCalculationLoop() {
// be way longer than the time to run the calculations.
p.generateStrategyResponses()
p.calculateProbabilitiesLatency.Record(time.Since(startTime))
go p.saveProbabilitiesAndQPS()
p.runBackground(p.saveProbabilitiesAndQPS)
}
case <-p.shutdown:
return
Expand Down
2 changes: 2 additions & 0 deletions plugin/sampling/strategystore/adaptive/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ func TestRunUpdateProbabilitiesLoop(t *testing.T) {
p.RLock()
assert.NotNil(t, p.probabilities)
assert.NotNil(t, p.strategyResponses)
p.RUnlock()
}

func TestRealisticRunCalculationLoop(t *testing.T) {
Expand Down Expand Up @@ -880,6 +881,7 @@ func TestErrors(t *testing.T) {
p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
require.Error(t, p.Start())
require.Error(t, p.Close())

// close errors
mockEP = &epmocks.ElectionParticipant{}
Expand Down
3 changes: 2 additions & 1 deletion plugin/sampling/strategystore/static/strategy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ func (h *strategyStore) GetSamplingStrategy(_ context.Context, serviceName strin
}

// Close stops updating the strategies
func (h *strategyStore) Close() {
func (h *strategyStore) Close() error {
h.cancelFunc()
return nil
}

func (h *strategyStore) downloadSamplingStrategies(url string) ([]byte, error) {
Expand Down
Loading