Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic rate limiter #4390

Merged
merged 77 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from 72 commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
0045fb7
moving window average
pdoerner May 18, 2023
ee44c22
remove channel avg impl
pdoerner May 18, 2023
68766b7
add signal aggregator
pdoerner May 18, 2023
bfc7002
adjust record fn
pdoerner May 18, 2023
82abfa4
add health signal clients
pdoerner May 19, 2023
f38f142
inject signal aggregator
pdoerner May 19, 2023
c037475
fix tests
pdoerner May 19, 2023
9543764
Merge branch 'master' into rate-limiter-metrics
pdoerner May 22, 2023
859950e
add metric emission
pdoerner May 22, 2023
dec1704
add health request rate limiter
pdoerner May 22, 2023
15da4b7
WIP
pdoerner May 22, 2023
e283bfb
race condition
pdoerner May 23, 2023
50d2ea4
Merge branch 'rate-limiter-metrics' into dynamic-rate-limiter
pdoerner May 23, 2023
a5194e2
WIP
pdoerner May 23, 2023
bf3da77
add health request rate limiter
pdoerner May 23, 2023
0047dc8
bench test rate limiter
pdoerner May 23, 2023
d3f4bf8
WIP
pdoerner May 23, 2023
c43f1a1
Revert "race condition"
pdoerner May 24, 2023
dc5eb2a
Revert "add metric emission"
pdoerner May 24, 2023
217cb96
emit per shard RPS metric
pdoerner May 24, 2023
bf205a4
cleanup
pdoerner May 24, 2023
b8293dc
merge metric and signal clients
pdoerner May 24, 2023
27ec4c5
cleanup
pdoerner May 24, 2023
df989eb
cleanup
pdoerner May 24, 2023
6aa2e7f
Merge branch 'master' into rate-limiter-metrics
pdoerner May 24, 2023
f960396
linting
pdoerner May 24, 2023
7c6e5f4
Merge branch 'rate-limiter-metrics' into dynamic-rate-limiter
pdoerner May 24, 2023
806eb44
global dynamic rate limiter
pdoerner May 24, 2023
b9da952
cleanup
pdoerner May 24, 2023
3c409ce
remove generics
pdoerner May 24, 2023
3850986
Merge branch 'rate-limiter-metrics' into dynamic-rate-limiter
pdoerner May 24, 2023
88a7507
WIP
pdoerner May 24, 2023
fe2955d
cleanup
pdoerner May 24, 2023
f46ab3f
Merge branch 'rate-limiter-metrics' into dynamic-rate-limiter
pdoerner May 24, 2023
f412f60
fix deferred metric fn
pdoerner May 24, 2023
e634617
Merge branch 'rate-limiter-metrics' into dynamic-rate-limiter
pdoerner May 24, 2023
3caed40
fix defer metric fn
pdoerner May 24, 2023
564cf3e
Merge branch 'rate-limiter-metrics' into dynamic-rate-limiter
pdoerner May 24, 2023
d64ff92
fix clients
pdoerner May 24, 2023
c56dd9b
Merge branch 'rate-limiter-metrics' into dynamic-rate-limiter
pdoerner May 24, 2023
e91a0ee
types
pdoerner May 24, 2023
9b152fa
Merge branch 'rate-limiter-metrics' into dynamic-rate-limiter
pdoerner May 24, 2023
2469798
simple dynamic rate limiter test
pdoerner May 24, 2023
2ae3371
linting
pdoerner May 24, 2023
ee46440
tests
pdoerner May 24, 2023
7d15e99
tests
pdoerner May 24, 2023
db4db37
acquire lock once
pdoerner May 24, 2023
c256fb1
locks
pdoerner May 24, 2023
98f2b66
array moving average
pdoerner May 24, 2023
f3dfa3d
Revert "array moving average"
pdoerner May 24, 2023
fc62af7
Merge branch 'master' into rate-limiter-metrics
pdoerner May 24, 2023
33448c8
cleanup
pdoerner May 24, 2023
5a94697
emit per shard RPS
pdoerner May 24, 2023
e179473
Merge branch 'master' into rate-limiter-metrics
pdoerner May 24, 2023
e0ce388
array average
pdoerner May 25, 2023
e48fcec
feedback
pdoerner May 25, 2023
d3b20f1
cleanup
pdoerner May 25, 2023
c42a921
Merge branch 'master' into rate-limiter-metrics
pdoerner May 25, 2023
b6fd862
handle nil health signals
pdoerner May 25, 2023
bf7a9e2
Merge branch 'rate-limiter-metrics' into dynamic-rate-limiter
pdoerner May 25, 2023
d56bf0d
tests
pdoerner May 25, 2023
6d8f69a
Merge branch 'master' into dynamic-rate-limiter
pdoerner May 31, 2023
5c381d3
uncomment signal collection
pdoerner May 31, 2023
0175f37
Merge branch 'master' into dynamic-rate-limiter
pdoerner May 31, 2023
47ba914
refactor dynamic config properties
pdoerner Jun 1, 2023
06e4d1d
Merge branch 'master' into dynamic-rate-limiter
pdoerner Jun 1, 2023
c375bbc
cleanup
pdoerner Jun 2, 2023
d99a697
Merge branch 'master' into dynamic-rate-limiter
pdoerner Jun 2, 2023
9fc18e0
add aggregation feature flag
pdoerner Jun 2, 2023
0bccb26
fix test
pdoerner Jun 2, 2023
311abbd
cleanup
pdoerner Jun 2, 2023
32f1c81
avoid potential race condition
pdoerner Jun 2, 2023
6f51bfc
feedback
pdoerner Jun 2, 2023
fe0828e
Merge branch 'master' into dynamic-rate-limiter
pdoerner Jun 2, 2023
79d3f9b
fix
pdoerner Jun 2, 2023
e0dbb5a
remove test
pdoerner Jun 2, 2023
7331f76
Merge branch 'master' into dynamic-rate-limiter
pdoerner Jun 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions common/aggregate/noop_moving_window_average.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package aggregate

var NoopMovingWindowAverage MovingWindowAverage = newNoopMovingWindowAverage()

type (
noopMovingWindowAverage struct{}
)

func newNoopMovingWindowAverage() *noopMovingWindowAverage { return &noopMovingWindowAverage{} }

func (a *noopMovingWindowAverage) Record(_ int64) {}

func (a *noopMovingWindowAverage) Average() float64 { return 0 }
18 changes: 16 additions & 2 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ const (
EnableEagerWorkflowStart = "system.enableEagerWorkflowStart"
// NamespaceCacheRefreshInterval is the key for namespace cache refresh interval dynamic config
NamespaceCacheRefreshInterval = "system.namespaceCacheRefreshInterval"
// PersistenceHealthSignalCollectionEnabled determines whether persistence health signal collection/aggregation is enabled
PersistenceHealthSignalCollectionEnabled = "system.persistenceHealthSignalCollectionEnabled"
// PersistenceHealthSignalMetricsEnabled determines whether persistence shard RPS metrics are emitted
PersistenceHealthSignalMetricsEnabled = "system.persistenceHealthSignalMetricsEnabled"
// PersistenceHealthSignalAggregationEnabled determines whether persistence latency and error averages are tracked
PersistenceHealthSignalAggregationEnabled = "system.persistenceHealthSignalAggregationEnabled"
// PersistenceHealthSignalWindowSize is the time window size in seconds for aggregating persistence signals
PersistenceHealthSignalWindowSize = "system.persistenceHealthSignalWindowSize"
// PersistenceHealthSignalBufferSize is the maximum number of persistence signals to buffer in memory per signal key
Expand Down Expand Up @@ -206,6 +208,9 @@ const (
FrontendPersistenceNamespaceMaxQPS = "frontend.persistenceNamespaceMaxQPS"
// FrontendEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in frontend persistence client
FrontendEnablePersistencePriorityRateLimiting = "frontend.enablePersistencePriorityRateLimiting"
// FrontendPersistenceDynamicRateLimitingParams is a map that contains all adjustable dynamic rate limiting params
// see DefaultDynamicRateLimitingParams for available options and defaults
FrontendPersistenceDynamicRateLimitingParams = "frontend.persistenceDynamicRateLimitingParams"
// FrontendVisibilityMaxPageSize is default max size for ListWorkflowExecutions in one page
FrontendVisibilityMaxPageSize = "frontend.visibilityMaxPageSize"
// FrontendHistoryMaxPageSize is default max size for GetWorkflowExecutionHistory in one page
Expand Down Expand Up @@ -349,6 +354,9 @@ const (
MatchingPersistenceNamespaceMaxQPS = "matching.persistenceNamespaceMaxQPS"
// MatchingEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in matching persistence client
MatchingEnablePersistencePriorityRateLimiting = "matching.enablePersistencePriorityRateLimiting"
// MatchingPersistenceDynamicRateLimitingParams is a map that contains all adjustable dynamic rate limiting params
// see DefaultDynamicRateLimitingParams for available options and defaults
MatchingPersistenceDynamicRateLimitingParams = "matching.persistenceDynamicRateLimitingParams"
// MatchingMinTaskThrottlingBurstSize is the minimum burst size for task queue throttling
MatchingMinTaskThrottlingBurstSize = "matching.minTaskThrottlingBurstSize"
// MatchingGetTasksBatchSize is the maximum batch size to fetch from the task buffer
Expand Down Expand Up @@ -413,6 +421,9 @@ const (
HistoryPersistencePerShardNamespaceMaxQPS = "history.persistencePerShardNamespaceMaxQPS"
// HistoryEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in history persistence client
HistoryEnablePersistencePriorityRateLimiting = "history.enablePersistencePriorityRateLimiting"
// HistoryPersistenceDynamicRateLimitingParams is a map that contains all adjustable dynamic rate limiting params
// see DefaultDynamicRateLimitingParams for available options and defaults
HistoryPersistenceDynamicRateLimitingParams = "history.persistenceDynamicRateLimitingParams"
// HistoryLongPollExpirationInterval is the long poll expiration interval in the history service
HistoryLongPollExpirationInterval = "history.longPollExpirationInterval"
// HistoryCacheInitialSize is initial size of history cache
Expand Down Expand Up @@ -746,6 +757,9 @@ const (
WorkerPersistenceNamespaceMaxQPS = "worker.persistenceNamespaceMaxQPS"
// WorkerEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in worker persistence client
WorkerEnablePersistencePriorityRateLimiting = "worker.enablePersistencePriorityRateLimiting"
// WorkerPersistenceDynamicRateLimitingParams is a map that contains all adjustable dynamic rate limiting params
// see DefaultDynamicRateLimitingParams for available options and defaults
WorkerPersistenceDynamicRateLimitingParams = "worker.persistenceDynamicRateLimitingParams"
// WorkerIndexerConcurrency is the max concurrent messages to be processed at any given time
WorkerIndexerConcurrency = "worker.indexerConcurrency"
// WorkerESProcessorNumOfWorkers is num of workers for esProcessor
Expand Down
33 changes: 32 additions & 1 deletion common/dynamicconfig/shared_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,35 @@ var defaultNumTaskQueuePartitions = []ConstrainedValue{
},
}

var DefaultPerShardNamespaceRPSMax = func(namespace string) int { return 0 }
var DefaultPerShardNamespaceRPSMax = GetIntPropertyFilteredByNamespace(0)

const (
// dynamic config map keys and defaults for client.DynamicRateLimitingParams for controlling dynamic rate limiting options
// dynamicRateLimitEnabledKey toggles whether dynamic rate limiting is enabled
dynamicRateLimitEnabledKey = "enabled"
dynamicRateLimitEnabledDefault = false
// dynamicRateLimitRefreshIntervalKey is how often the rate limit and dynamic properties are refreshed. should be a string timestamp e.g. 10s
dynamicRateLimitRefreshIntervalKey = "refreshInterval"
dynamicRateLimitRefreshIntervalDefault = "10s"
// dynamicRateLimitLatencyThresholdKey is the maximum average latency in ms before the rate limiter should backoff
dynamicRateLimitLatencyThresholdKey = "latencyThreshold"
dynamicRateLimitLatencyThresholdDefault = 0.0 // will not do backoff based on latency
// dynamicRateLimitErrorThresholdKey is the maximum ratio of errors:total_requests before the rate limiter should backoff. should be between 0 and 1
dynamicRateLimitErrorThresholdKey = "errorThreshold"
dynamicRateLimitErrorThresholdDefault = 0.0 // will not do backoff based on errors
// dynamicRateLimitBackoffStepSizeKey is the amount the rate limit multiplier is reduced when backing off. should be between 0 and 1
dynamicRateLimitBackoffStepSizeKey = "rateBackoffStepSize"
dynamicRateLimitBackoffStepSizeDefault = 0.3
// dynamicRateLimitIncreaseStepSizeKey the amount the rate limit multiplier is increased when the system is healthy. should be between 0 and 1
dynamicRateLimitIncreaseStepSizeKey = "rateIncreaseStepSize"
dynamicRateLimitIncreaseStepSizeDefault = 0.1
)

var DefaultDynamicRateLimitingParams = map[string]interface{}{
dynamicRateLimitEnabledKey: dynamicRateLimitEnabledDefault,
dynamicRateLimitRefreshIntervalKey: dynamicRateLimitRefreshIntervalDefault,
dynamicRateLimitLatencyThresholdKey: dynamicRateLimitLatencyThresholdDefault,
dynamicRateLimitErrorThresholdKey: dynamicRateLimitErrorThresholdDefault,
dynamicRateLimitBackoffStepSizeKey: dynamicRateLimitBackoffStepSizeDefault,
dynamicRateLimitIncreaseStepSizeKey: dynamicRateLimitIncreaseStepSizeDefault,
Comment on lines +74 to +79
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I find this kind of hard to read and I'd just put the defaults inline in this map. It is effectively a constant itself

}
16 changes: 12 additions & 4 deletions common/persistence/client/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ type (
PersistenceNamespaceMaxQps dynamicconfig.IntPropertyFnWithNamespaceFilter
PersistencePerShardNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter
EnablePriorityRateLimiting dynamicconfig.BoolPropertyFn
ClusterName string

DynamicRateLimitingParams dynamicconfig.MapPropertyFn

ClusterName string

NewFactoryParams struct {
fx.In
Expand All @@ -61,6 +64,7 @@ type (
MetricsHandler metrics.Handler
Logger log.Logger
HealthSignals persistence.HealthSignalAggregator
DynamicRateLimitingParams DynamicRateLimitingParams
}

FactoryProviderFn func(NewFactoryParams) Factory
Expand Down Expand Up @@ -88,6 +92,9 @@ func FactoryProvider(
params.PersistenceMaxQPS,
params.PersistencePerShardNamespaceMaxQPS,
RequestPriorityFn,
params.HealthSignals,
params.DynamicRateLimitingParams,
params.Logger,
)
} else {
requestRatelimiter = NewNoopPriorityRateLimiter(params.PersistenceMaxQPS)
Expand All @@ -111,13 +118,14 @@ func HealthSignalAggregatorProvider(
metricsHandler metrics.Handler,
logger log.Logger,
) persistence.HealthSignalAggregator {
if dynamicCollection.GetBoolProperty(dynamicconfig.PersistenceHealthSignalCollectionEnabled, true)() {
if dynamicCollection.GetBoolProperty(dynamicconfig.PersistenceHealthSignalMetricsEnabled, true)() {
return persistence.NewHealthSignalAggregatorImpl(
dynamicCollection.GetDurationProperty(dynamicconfig.PersistenceHealthSignalWindowSize, 3*time.Second)(),
dynamicCollection.GetIntProperty(dynamicconfig.PersistenceHealthSignalBufferSize, 500)(),
dynamicCollection.GetDurationProperty(dynamicconfig.PersistenceHealthSignalWindowSize, 10*time.Second)(),
dynamicCollection.GetIntProperty(dynamicconfig.PersistenceHealthSignalBufferSize, 5000)(),
metricsHandler,
dynamicCollection.GetIntProperty(dynamicconfig.ShardRPSWarnLimit, 50),
logger,
dynamicCollection.GetBoolProperty(dynamicconfig.PersistenceHealthSignalAggregationEnabled, true)(),
)
}

Expand Down
199 changes: 199 additions & 0 deletions common/persistence/client/health_request_rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package client

import (
"context"
"encoding/json"
"math"
"sync/atomic"
"time"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/quotas"
)

const (
DefaultRefreshInterval = 10 * time.Second
DefaultRateBurstRatio = 1.0
DefaultMinRateMultiplier = 0.1
DefaultMaxRateMultiplier = 1.0
)

type (
HealthRequestRateLimiterImpl struct {
enabled *atomic.Bool
params DynamicRateLimitingParams // dynamic config map
curOptions dynamicRateLimitingOptions // current dynamic config values (updated on refresh)

rateLimiter *quotas.RateLimiterImpl
healthSignals persistence.HealthSignalAggregator

refreshTimer *time.Ticker

rateFn quotas.RateFn
rateToBurstRatio float64

minRateMultiplier float64
maxRateMultiplier float64
curRateMultiplier float64

logger log.Logger
}

dynamicRateLimitingOptions struct {
Enabled bool

RefreshInterval string // string returned by json.Unmarshal will be parsed into a duration

// thresholds which should trigger backoff if exceeded
LatencyThreshold float64
ErrorThreshold float64

// if either threshold is exceeded, the current rate multiplier will be reduced by this amount
RateBackoffStepSize float64
// when the system is healthy and current rate < max rate, the current rate multiplier will be
// increased by this amount
RateIncreaseStepSize float64
}
)

var _ quotas.RequestRateLimiter = (*HealthRequestRateLimiterImpl)(nil)

func NewHealthRequestRateLimiterImpl(
healthSignals persistence.HealthSignalAggregator,
rateFn quotas.RateFn,
params DynamicRateLimitingParams,
logger log.Logger,
) *HealthRequestRateLimiterImpl {
limiter := &HealthRequestRateLimiterImpl{
enabled: &atomic.Bool{},
rateLimiter: quotas.NewRateLimiter(rateFn(), int(DefaultRateBurstRatio*rateFn())),
healthSignals: healthSignals,
rateFn: rateFn,
params: params,
refreshTimer: time.NewTicker(DefaultRefreshInterval),
rateToBurstRatio: DefaultRateBurstRatio,
minRateMultiplier: DefaultMinRateMultiplier,
maxRateMultiplier: DefaultMaxRateMultiplier,
curRateMultiplier: DefaultMaxRateMultiplier,
logger: logger,
}
limiter.refreshDynamicParams()
return limiter
}

func (rl *HealthRequestRateLimiterImpl) Allow(now time.Time, request quotas.Request) bool {
rl.maybeRefresh()
if !rl.enabled.Load() {
return true
}
return rl.rateLimiter.AllowN(now, request.Token)
}

func (rl *HealthRequestRateLimiterImpl) Reserve(now time.Time, request quotas.Request) quotas.Reservation {
rl.maybeRefresh()
if !rl.enabled.Load() {
return quotas.NoopReservation
}
return rl.rateLimiter.ReserveN(now, request.Token)
}

func (rl *HealthRequestRateLimiterImpl) Wait(ctx context.Context, request quotas.Request) error {
rl.maybeRefresh()
if !rl.enabled.Load() {
return nil
}
return rl.rateLimiter.WaitN(ctx, request.Token)
}

func (rl *HealthRequestRateLimiterImpl) maybeRefresh() {
select {
case <-rl.refreshTimer.C:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add a channel from persistence.HealthSignalAggregator to rate limiter for quicker reaction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do not necessarily need the fastest reaction. I want to avoid the situation where the rate limit seesaws up and down too frequently.

rl.refreshDynamicParams()
if rl.enabled.Load() {
rl.refreshRate()
}
rl.updateRefreshTimer()

default:
// no-op
}
}

func (rl *HealthRequestRateLimiterImpl) refreshRate() {
if rl.latencyThresholdExceeded() || rl.errorThresholdExceeded() {
// limit exceeded, do backoff
rl.curRateMultiplier = math.Max(rl.minRateMultiplier, rl.curRateMultiplier-rl.curOptions.RateBackoffStepSize)
rl.rateLimiter.SetRate(rl.curRateMultiplier * rl.rateFn())
rl.rateLimiter.SetBurst(int(rl.rateToBurstRatio * rl.rateFn()))
Comment on lines +153 to +154
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should you maybe update rl.rateLimiter always here, in case rateFn returns something different?

} else if rl.curRateMultiplier < rl.maxRateMultiplier {
// already doing backoff and under thresholds, increase limit
rl.curRateMultiplier = math.Min(rl.maxRateMultiplier, rl.curRateMultiplier+rl.curOptions.RateIncreaseStepSize)
rl.rateLimiter.SetRate(rl.curRateMultiplier * rl.rateFn())
rl.rateLimiter.SetBurst(int(rl.rateToBurstRatio * rl.rateFn()))
}
}

func (rl *HealthRequestRateLimiterImpl) refreshDynamicParams() {
var options dynamicRateLimitingOptions
b, err := json.Marshal(rl.params())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. I thought the point of defining the keys explicitly was to avoid the json stuff? So, those constants are only used in the default value and nowhere else?

I would think if you're going to define string constants for the map keys, you should use them to read the map

if err != nil {
rl.logger.Warn("Error marshalling dynamic rate limiting params. Dynamic rate limiting is disabled.", tag.Error(err))
rl.enabled.Store(false)
return
}

err = json.Unmarshal(b, &options)
if err != nil {
rl.logger.Warn("Error unmarshalling dynamic rate limiting params. Dynamic rate limiting is disabled.", tag.Error(err))
rl.enabled.Store(false)
return
}

rl.enabled.Store(options.Enabled)
rl.curOptions = options
}

func (rl *HealthRequestRateLimiterImpl) updateRefreshTimer() {
if len(rl.curOptions.RefreshInterval) > 0 {
if refreshDuration, err := timestamp.ParseDurationDefaultSeconds(rl.curOptions.RefreshInterval); err != nil {
rl.logger.Warn("Error parsing dynamic rate limit refreshInterval timestamp. Using previous value.", tag.Error(err))
} else {
rl.refreshTimer.Reset(refreshDuration)
}
}
}

func (rl *HealthRequestRateLimiterImpl) latencyThresholdExceeded() bool {
return rl.curOptions.LatencyThreshold > 0 && rl.healthSignals.AverageLatency() > rl.curOptions.LatencyThreshold
}

func (rl *HealthRequestRateLimiterImpl) errorThresholdExceeded() bool {
return rl.curOptions.ErrorThreshold > 0 && rl.healthSignals.ErrorRatio() > rl.curOptions.ErrorThreshold
}
Loading