Skip to content

Commit

Permalink
Allow runUpdateProbabilitiesLoop to be unblocked in delay phase
Browse files Browse the repository at this point in the history
Rather than having to set the "delay phase" to a low value, we
instead make it possible for the `shutdown` channel to unblock
the delay.

Signed-off-by: Will Sewell <[email protected]>
  • Loading branch information
Will Sewell committed Mar 29, 2024
1 parent 250ab88 commit dfe7adc
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 27 deletions.
26 changes: 10 additions & 16 deletions plugin/sampling/strategystore/adaptive/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package adaptive
import (
"errors"
"flag"
"time"

"github.com/spf13/viper"
"go.uber.org/zap"
Expand All @@ -34,25 +33,21 @@ var _ plugin.Configurable = (*Factory)(nil)

// Factory implements strategystore.Factory for an adaptive strategy store.
type Factory struct {
options *Options
// This is passed to any new Processors created by this Factory.
// It exists here so that test code can override the default.
followerRefreshInterval time.Duration
logger *zap.Logger
metricsFactory metrics.Factory
lock distributedlock.Lock
store samplingstore.Store
options *Options
logger *zap.Logger
metricsFactory metrics.Factory
lock distributedlock.Lock
store samplingstore.Store
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
options: &Options{},
followerRefreshInterval: defaultFollowerProbabilityInterval,
logger: zap.NewNop(),
metricsFactory: metrics.NullFactory,
lock: nil,
store: nil,
options: &Options{},
logger: zap.NewNop(),
metricsFactory: metrics.NullFactory,
lock: nil,
store: nil,
}
}

Expand Down Expand Up @@ -92,7 +87,6 @@ func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategyst
if err != nil {
return nil, nil, err
}
p.followerRefreshInterval = f.followerRefreshInterval
p.Start()
a := NewAggregator(f.metricsFactory, f.options.CalculationInterval, f.store)
a.Start()
Expand Down
1 change: 0 additions & 1 deletion plugin/sampling/strategystore/adaptive/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func TestFactory(t *testing.T) {
assert.Equal(t, time.Second*2, f.options.FollowerLeaseRefreshInterval)

require.NoError(t, f.Initialize(metrics.NullFactory, &mockSamplingStoreFactory{}, zap.NewNop()))
f.followerRefreshInterval = time.Millisecond
store, aggregator, err := f.CreateStrategyStore()
require.NoError(t, err)
require.NoError(t, store.Close())
Expand Down
14 changes: 9 additions & 5 deletions plugin/sampling/strategystore/adaptive/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,12 @@ func (p *Processor) loadProbabilities() {
// runUpdateProbabilitiesLoop is a loop that reads probabilities from storage.
// The follower updates its local cache with the latest probabilities and serves them.
func (p *Processor) runUpdateProbabilitiesLoop() {
addJitter(p.followerRefreshInterval)
select {
case <-time.After(addJitter(p.followerRefreshInterval)):
case <-p.shutdown:
return
}

ticker := time.NewTicker(p.followerRefreshInterval)
defer ticker.Stop()
for {
Expand All @@ -231,13 +236,12 @@ func (p *Processor) isLeader() bool {
return p.electionParticipant.IsLeader()
}

// addJitter sleeps for a random amount of time. Without jitter, if the host holding the leader
// addJitter adds a random amount of time. Without jitter, if the host holding the leader
// lock were to die, then all other collectors can potentially wait for a full cycle before
// trying to acquire the lock. With jitter, we can reduce the average amount of time before a
// new leader is elected. Furthermore, jitter can be used to spread out read load on storage.
func addJitter(jitterAmount time.Duration) {
delay := (jitterAmount / 2) + time.Duration(rand.Int63n(int64(jitterAmount/2)))
time.Sleep(delay)
func addJitter(jitterAmount time.Duration) time.Duration {
return (jitterAmount / 2) + time.Duration(rand.Int63n(int64(jitterAmount/2)))
}

func (p *Processor) runCalculationLoop() {
Expand Down
5 changes: 0 additions & 5 deletions plugin/sampling/strategystore/adaptive/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ func TestRunCalculationLoop(t *testing.T) {
}
p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger)
require.NoError(t, err)
p.followerRefreshInterval = time.Millisecond
p.Start()

for i := 0; i < 1000; i++ {
Expand Down Expand Up @@ -393,7 +392,6 @@ func TestRunCalculationLoop_GetThroughputError(t *testing.T) {
}
p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger)
require.NoError(t, err)
p.followerRefreshInterval = time.Millisecond
p.shutdown = make(chan struct{})
done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -498,7 +496,6 @@ func TestRealisticRunCalculationLoop(t *testing.T) {
}
p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger)
require.NoError(t, err)
p.followerRefreshInterval = time.Millisecond
p.Start()

for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -897,7 +894,6 @@ func TestErrors(t *testing.T) {

p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
p.followerRefreshInterval = time.Millisecond
require.Error(t, p.Start())
require.Error(t, p.Close())

Expand All @@ -909,7 +905,6 @@ func TestErrors(t *testing.T) {

p, err = newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
p.followerRefreshInterval = time.Millisecond
require.NoError(t, p.Start())
require.Error(t, p.Close())
}

0 comments on commit dfe7adc

Please sign in to comment.