Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use generics in jitter #3717

Merged
merged 9 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 29 additions & 21 deletions common/backoff/jitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,44 @@ import (
"time"
)

// JitDuration return random duration from (1-coefficient)*duration to (1+coefficient)*duration, inclusive, exclusive
func JitDuration(duration time.Duration, coefficient float64) time.Duration {
validateCoefficient(coefficient)
const fullCoefficient float64 = 1

return time.Duration(JitInt64(duration.Nanoseconds(), coefficient))
// FullJitter return random number from 0 to input, inclusive, exclusive
func FullJitter[T int64 | float64 | time.Duration](input T) T {
return Jitter(input, fullCoefficient) / 2
}

// JitInt64 return random number from (1-coefficient)*input to (1+coefficient)*input, inclusive, exclusive
func JitInt64(input int64, coefficient float64) int64 {
// Jitter return random number from (1-coefficient)*input to (1+coefficient)*input, inclusive, exclusive
func Jitter[T int64 | float64 | time.Duration](input T, coefficient float64) T {
validateCoefficient(coefficient)

if input == 0 {
return 0
}
if coefficient == 0 {
return input
}

base := int64(float64(input) * (1 - coefficient))
addon := rand.Int63n(2 * (input - base))
return base + addon
}

// JitFloat64 return random number from (1-coefficient)*input to (1+coefficient)*input, inclusive, exclusive
func JitFloat64(input float64, coefficient float64) float64 {
validateCoefficient(coefficient)

base := input * (1 - coefficient)
addon := rand.Float64() * 2 * (input - base)
return base + addon
var base float64
var addon float64
switch i := any(input).(type) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if the compiler is smart enough to compile this into separate functions that don't require a runtime type switch?

Ok, I tried it on compiler explorer and it looks like the answer is no: it makes two separate versions and still has a lot of logic around the type switch

case time.Duration:
input64 := i.Nanoseconds()
if input64 == 0 {
return input
}
base = float64(input64) * (1 - coefficient)
addon = rand.Float64() * 2 * (float64(input64) - base)
case int64:
if i == 0 {
return input
}
base = float64(i) * (1 - coefficient)
addon = rand.Float64() * 2 * (float64(i) - base)
case float64:
base = i * (1 - coefficient)
addon = rand.Float64() * 2 * (i - base)
default:
panic("The jitter type is not supported")
}
return T(base + addon)
}

func validateCoefficient(coefficient float64) {
Expand Down
41 changes: 31 additions & 10 deletions common/backoff/jitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,47 +46,68 @@ func TestJitterSuite(t *testing.T) {
func (s *jitterSuite) SetupSuite() {
}

func (s *jitterSuite) TestJitInt64() {
func (s *jitterSuite) TestJitter_Int64() {
input := int64(1048576)
coefficient := float64(0.25)
lowerBound := int64(float64(input) * (1 - coefficient))
upperBound := int64(float64(input) * (1 + coefficient))
fullJitterUpperBound := int64(float64(input) * 2)

for i := 0; i < 1048576; i++ {
result := JitInt64(input, coefficient)
result := Jitter(input, coefficient)
s.True(result >= lowerBound)
s.True(result < upperBound)

result = FullJitter(input)
s.True(result >= 0)
s.True(result < fullJitterUpperBound)
}
}

func (s *jitterSuite) TestJitFloat64() {
func (s *jitterSuite) TestJitter_Float64() {
input := float64(1048576.1048576)
coefficient := float64(0.16)
lowerBound := float64(input) * (1 - coefficient)
upperBound := float64(input) * (1 + coefficient)
fullJitterUpperBound := float64(input) * 2

for i := 0; i < 1048576; i++ {
result := JitFloat64(input, coefficient)
result := Jitter(input, coefficient)
s.True(result >= lowerBound)
s.True(result < upperBound)

result = FullJitter(input)
s.True(result >= 0)
s.True(result < fullJitterUpperBound)
}
}

func (s *jitterSuite) TestJitDuration() {
func (s *jitterSuite) TestJitter_Duration() {
input := time.Duration(1099511627776)
coefficient := float64(0.1)
lowerBound := time.Duration(int64(float64(input.Nanoseconds()) * (1 - coefficient)))
upperBound := time.Duration(int64(float64(input.Nanoseconds()) * (1 + coefficient)))
fullJitterUpperBound := time.Duration(int64(float64(input.Nanoseconds()) * 2))

for i := 0; i < 1048576; i++ {
result := JitDuration(input, coefficient)
result := Jitter(input, coefficient)
s.True(result >= lowerBound)
s.True(result < upperBound)

result = FullJitter(input)
s.True(result >= 0)
s.True(result < fullJitterUpperBound)
}
}

func (s *jitterSuite) TestJit_InputZeroValue() {
s.Zero(JitDuration(0, rand.Float64()))
s.Zero(JitInt64(0, rand.Float64()))
s.Zero(JitFloat64(0, rand.Float64()))
func (s *jitterSuite) TestJitter_InputZeroValue() {
s.Zero(Jitter(time.Duration(0), rand.Float64()))
s.Zero(Jitter(int64(0), rand.Float64()))
s.Zero(Jitter(float64(0), rand.Float64()))
}

func (s *jitterSuite) TestJitter_CoeffientZeroValue() {
s.Equal(time.Duration(1), Jitter(time.Duration(1), 0))
s.Equal(int64(1), Jitter(int64(1), 0))
s.Equal(float64(1), Jitter(float64(1), 0))
}
4 changes: 2 additions & 2 deletions common/tasks/fifo_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (f *FIFOScheduler[T]) TrySubmit(task T) bool {
func (f *FIFOScheduler[T]) workerMonitor() {
defer f.shutdownWG.Done()

timer := time.NewTimer(backoff.JitDuration(defaultMonitorTickerDuration, defaultMonitorTickerJitter))
timer := time.NewTimer(backoff.Jitter(defaultMonitorTickerDuration, defaultMonitorTickerJitter))
defer timer.Stop()

for {
Expand All @@ -154,7 +154,7 @@ func (f *FIFOScheduler[T]) workerMonitor() {
f.stopWorkers(len(f.workerShutdownCh))
return
case <-timer.C:
timer.Reset(backoff.JitDuration(defaultMonitorTickerDuration, defaultMonitorTickerJitter))
timer.Reset(backoff.Jitter(defaultMonitorTickerDuration, defaultMonitorTickerJitter))

targetWorkerNum := f.options.WorkerCount()
currentWorkerNum := len(f.workerShutdownCh)
Expand Down
2 changes: 1 addition & 1 deletion common/tasks/interleaved_weighted_round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) channels() iwrrChannels[T

func (s *InterleavedWeightedRoundRobinScheduler[T, K]) setupDispatchTimer() {
throttleDuration := iwrrMinDispatchThrottleDuration +
backoff.JitDuration(s.options.MaxDispatchThrottleDuration-iwrrMinDispatchThrottleDuration, 1)/2
backoff.FullJitter(s.options.MaxDispatchThrottleDuration-iwrrMinDispatchThrottleDuration)

s.dispatchTimerLock.Lock()
defer s.dispatchTimerLock.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (p *queueBase) Start() {
p.rescheduler.Start()
p.readerGroup.Start()

p.checkpointTimer = time.NewTimer(backoff.JitDuration(
p.checkpointTimer = time.NewTimer(backoff.Jitter(
p.options.CheckpointInterval(),
p.options.CheckpointIntervalJitterCoefficient(),
))
Expand Down Expand Up @@ -442,7 +442,7 @@ func (p *queueBase) resetCheckpointTimer(checkPointErr error) {
}

p.checkpointRetrier.Reset()
p.checkpointTimer.Reset(backoff.JitDuration(
p.checkpointTimer.Reset(backoff.Jitter(
p.options.CheckpointInterval(),
p.options.CheckpointIntervalJitterCoefficient(),
))
Expand Down
4 changes: 2 additions & 2 deletions service/history/queues/queue_immediate.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (p *immediateQueue) NotifyNewTasks(_ string, tasks []tasks.Task) {
func (p *immediateQueue) processEventLoop() {
defer p.shutdownWG.Done()

pollTimer := time.NewTimer(backoff.JitDuration(
pollTimer := time.NewTimer(backoff.Jitter(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
Expand All @@ -171,7 +171,7 @@ func (p *immediateQueue) processEventLoop() {
func (p *immediateQueue) processPollTimer(pollTimer *time.Timer) {
p.processNewRange()

pollTimer.Reset(backoff.JitDuration(
pollTimer.Reset(backoff.Jitter(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
Expand Down
4 changes: 2 additions & 2 deletions service/history/queues/queue_scheduled.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (p *scheduledQueue) processNewRange() {
// in which case no look ahead is needed.
// Notification will be sent when shard is reacquired, but
// still set a max poll timer here as a catch all case.
p.timerGate.Update(p.timeSource.Now().Add(backoff.JitDuration(
p.timerGate.Update(p.timeSource.Now().Add(backoff.Jitter(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
)))
Expand All @@ -251,7 +251,7 @@ func (p *scheduledQueue) lookAheadTask() {
}

lookAheadMinTime := p.nonReadableScope.Range.InclusiveMin.FireTime
lookAheadMaxTime := lookAheadMinTime.Add(backoff.JitDuration(
lookAheadMaxTime := lookAheadMinTime.Add(backoff.Jitter(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
Expand Down
4 changes: 2 additions & 2 deletions service/history/queues/rescheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (r *reschedulerImpl) Len() int {
func (r *reschedulerImpl) rescheduleLoop() {
defer r.shutdownWG.Done()

cleanupTimer := time.NewTimer(backoff.JitDuration(
cleanupTimer := time.NewTimer(backoff.Jitter(
reschedulerPQCleanupDuration,
reschedulerPQCleanupJitterCoefficient,
))
Expand All @@ -213,7 +213,7 @@ func (r *reschedulerImpl) rescheduleLoop() {
r.reschedule()
case <-cleanupTimer.C:
r.cleanupPQ()
cleanupTimer.Reset(backoff.JitDuration(
cleanupTimer.Reset(backoff.Jitter(
reschedulerPQCleanupDuration,
reschedulerPQCleanupJitterCoefficient,
))
Expand Down
2 changes: 1 addition & 1 deletion service/history/replication/ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (p *ackMgrImpl) taskIDsRange(

now := p.shard.GetTimeSource().Now()
if p.sanityCheckTime.IsZero() || p.sanityCheckTime.Before(now) {
p.sanityCheckTime = now.Add(backoff.JitDuration(
p.sanityCheckTime = now.Add(backoff.Jitter(
p.config.ReplicatorProcessorMaxPollInterval(),
p.config.ReplicatorProcessorMaxPollIntervalJitterCoefficient(),
))
Expand Down
9 changes: 5 additions & 4 deletions service/history/replication/task_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"sync/atomic"
"time"

"golang.org/x/exp/maps"

"go.temporal.io/server/api/adminservice/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
"go.temporal.io/server/client"
Expand All @@ -43,7 +45,6 @@ import (
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/rpc"
"go.temporal.io/server/service/history/configs"
"golang.org/x/exp/maps"
)

const (
Expand Down Expand Up @@ -356,7 +357,7 @@ func (f *replicationTaskFetcherWorker) Stop() {

// fetchTasks collects getReplicationTasks request from shards and send out aggregated request to source frontend.
func (f *replicationTaskFetcherWorker) fetchTasks() {
timer := time.NewTimer(backoff.JitDuration(
timer := time.NewTimer(backoff.Jitter(
f.config.ReplicationTaskFetcherAggregationInterval(),
f.config.ReplicationTaskFetcherTimerJitterCoefficient(),
))
Expand All @@ -371,12 +372,12 @@ func (f *replicationTaskFetcherWorker) fetchTasks() {
// When timer fires, we collect all the requests we have so far and attempt to send them to remote.
err := f.getMessages()
if err != nil {
timer.Reset(backoff.JitDuration(
timer.Reset(backoff.Jitter(
f.config.ReplicationTaskFetcherErrorRetryWait(),
f.config.ReplicationTaskFetcherTimerJitterCoefficient(),
))
} else {
timer.Reset(backoff.JitDuration(
timer.Reset(backoff.Jitter(
f.config.ReplicationTaskFetcherAggregationInterval(),
f.config.ReplicationTaskFetcherTimerJitterCoefficient(),
))
Expand Down
4 changes: 2 additions & 2 deletions service/history/replication/task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (p *taskProcessorImpl) Stop() {
}

func (p *taskProcessorImpl) eventLoop() {
syncShardTimer := time.NewTimer(backoff.JitDuration(
syncShardTimer := time.NewTimer(backoff.Jitter(
p.config.ShardSyncMinInterval(),
p.config.ShardSyncTimerJitterCoefficient(),
))
Expand All @@ -210,7 +210,7 @@ func (p *taskProcessorImpl) eventLoop() {
1,
metrics.OperationTag(metrics.HistorySyncShardStatusScope))
}
syncShardTimer.Reset(backoff.JitDuration(
syncShardTimer.Reset(backoff.Jitter(
p.config.ShardSyncMinInterval(),
p.config.ShardSyncTimerJitterCoefficient(),
))
Expand Down
4 changes: 2 additions & 2 deletions service/history/replication/task_processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate(

func (r *taskProcessorManagerImpl) completeReplicationTaskLoop() {
shardID := r.shard.GetShardID()
cleanupTimer := time.NewTimer(backoff.JitDuration(
cleanupTimer := time.NewTimer(backoff.Jitter(
r.config.ReplicationTaskProcessorCleanupInterval(shardID),
r.config.ReplicationTaskProcessorCleanupJitterCoefficient(shardID),
))
Expand All @@ -219,7 +219,7 @@ func (r *taskProcessorManagerImpl) completeReplicationTaskLoop() {
metrics.OperationTag(metrics.ReplicationTaskCleanupScope),
)
}
cleanupTimer.Reset(backoff.JitDuration(
cleanupTimer.Reset(backoff.Jitter(
r.config.ReplicationTaskProcessorCleanupInterval(shardID),
r.config.ReplicationTaskProcessorCleanupJitterCoefficient(shardID),
))
Expand Down
4 changes: 2 additions & 2 deletions service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
}
// We schedule the archival task for a random time in the near future to avoid sending a surge of tasks
// to the archival system at the same time
delay := backoff.JitDuration(r.config.ArchivalProcessorArchiveDelay(), archivalDelayJitterCoefficient) / 2
delay := backoff.Jitter(r.config.ArchivalProcessorArchiveDelay(), archivalDelayJitterCoefficient) / 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FullJitter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentionally skip this because of test verify the jitter. I will see if I can use the full jitter and have the test pass.

if delay > retention {
delay = retention
}
Expand Down Expand Up @@ -258,7 +258,7 @@ func (r *TaskGeneratorImpl) GenerateDeleteHistoryEventTask(closeTime time.Time,
return err
}

retentionJitterDuration := backoff.JitDuration(r.config.RetentionTimerJitterDuration(), 1) / 2
retentionJitterDuration := backoff.FullJitter(r.config.RetentionTimerJitterDuration())
deleteTime := closeTime.Add(retention).Add(retentionJitterDuration)
r.mutableState.AddTasks(&tasks.DeleteHistoryEventTask{
// TaskID is set by shard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (p *namespaceReplicationMessageProcessor) Stop() {
}

func getWaitDuration() time.Duration {
return backoff.JitDuration(time.Duration(pollIntervalSecs)*time.Second, pollTimerJitterCoefficient)
return backoff.Jitter(time.Duration(pollIntervalSecs)*time.Second, pollTimerJitterCoefficient)
}

func isTransientRetryableError(err error) bool {
Expand Down
2 changes: 1 addition & 1 deletion service/worker/scanner/executions/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func newTask(

// Run runs the task
func (t *task) Run() executor.TaskStatus {
time.Sleep(backoff.JitDuration(
time.Sleep(backoff.Jitter(
taskStartupDelayRatio*time.Duration(t.scavenger.numHistoryShards),
taskStartupDelayRandomizationRatio,
))
Expand Down