diff --git a/common/aggregate/bench_moving_window_avg_test.go b/common/aggregate/bench_test.go similarity index 100% rename from common/aggregate/bench_moving_window_avg_test.go rename to common/aggregate/bench_test.go diff --git a/common/aggregate/noop_moving_window_average.go b/common/aggregate/noop_moving_window_average.go new file mode 100644 index 00000000000..060c3d922e3 --- /dev/null +++ b/common/aggregate/noop_moving_window_average.go @@ -0,0 +1,37 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package aggregate + +var NoopMovingWindowAverage MovingWindowAverage = newNoopMovingWindowAverage() + +type ( + noopMovingWindowAverage struct{} +) + +func newNoopMovingWindowAverage() *noopMovingWindowAverage { return &noopMovingWindowAverage{} } + +func (a *noopMovingWindowAverage) Record(_ int64) {} + +func (a *noopMovingWindowAverage) Average() float64 { return 0 } diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index bad2a0b37ff..a3a22c7c212 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -106,8 +106,10 @@ const ( EnableEagerWorkflowStart = "system.enableEagerWorkflowStart" // NamespaceCacheRefreshInterval is the key for namespace cache refresh interval dynamic config NamespaceCacheRefreshInterval = "system.namespaceCacheRefreshInterval" - // PersistenceHealthSignalCollectionEnabled determines whether persistence health signal collection/aggregation is enabled - PersistenceHealthSignalCollectionEnabled = "system.persistenceHealthSignalCollectionEnabled" + // PersistenceHealthSignalMetricsEnabled determines whether persistence shard RPS metrics are emitted + PersistenceHealthSignalMetricsEnabled = "system.persistenceHealthSignalMetricsEnabled" + // PersistenceHealthSignalAggregationEnabled determines whether persistence latency and error averages are tracked + PersistenceHealthSignalAggregationEnabled = "system.persistenceHealthSignalAggregationEnabled" // PersistenceHealthSignalWindowSize is the time window size in seconds for aggregating persistence signals PersistenceHealthSignalWindowSize = "system.persistenceHealthSignalWindowSize" // PersistenceHealthSignalBufferSize is the maximum number of persistence signals to buffer in memory per signal key @@ -206,6 +208,9 @@ const ( FrontendPersistenceNamespaceMaxQPS = "frontend.persistenceNamespaceMaxQPS" // FrontendEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in frontend persistence client FrontendEnablePersistencePriorityRateLimiting = "frontend.enablePersistencePriorityRateLimiting" + // FrontendPersistenceDynamicRateLimitingParams is a map that contains all adjustable dynamic rate limiting params + // see DefaultDynamicRateLimitingParams for available options and defaults + FrontendPersistenceDynamicRateLimitingParams = "frontend.persistenceDynamicRateLimitingParams" // FrontendVisibilityMaxPageSize is default max size for ListWorkflowExecutions in one page FrontendVisibilityMaxPageSize = "frontend.visibilityMaxPageSize" // FrontendHistoryMaxPageSize is default max size for GetWorkflowExecutionHistory in one page @@ -349,6 +354,9 @@ const ( MatchingPersistenceNamespaceMaxQPS = "matching.persistenceNamespaceMaxQPS" // MatchingEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in matching persistence client MatchingEnablePersistencePriorityRateLimiting = "matching.enablePersistencePriorityRateLimiting" + // MatchingPersistenceDynamicRateLimitingParams is a map that contains all adjustable dynamic rate limiting params + // see DefaultDynamicRateLimitingParams for available options and defaults + MatchingPersistenceDynamicRateLimitingParams = "matching.persistenceDynamicRateLimitingParams" // MatchingMinTaskThrottlingBurstSize is the minimum burst size for task queue throttling MatchingMinTaskThrottlingBurstSize = "matching.minTaskThrottlingBurstSize" // MatchingGetTasksBatchSize is the maximum batch size to fetch from the task buffer @@ -413,6 +421,9 @@ const ( HistoryPersistencePerShardNamespaceMaxQPS = "history.persistencePerShardNamespaceMaxQPS" // HistoryEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in history persistence client HistoryEnablePersistencePriorityRateLimiting = "history.enablePersistencePriorityRateLimiting" + // HistoryPersistenceDynamicRateLimitingParams is a map that contains all adjustable dynamic rate limiting params + // see DefaultDynamicRateLimitingParams for available options and defaults + HistoryPersistenceDynamicRateLimitingParams = "history.persistenceDynamicRateLimitingParams" // HistoryLongPollExpirationInterval is the long poll expiration interval in the history service HistoryLongPollExpirationInterval = "history.longPollExpirationInterval" // HistoryCacheInitialSize is initial size of history cache @@ -748,6 +759,9 @@ const ( WorkerPersistenceNamespaceMaxQPS = "worker.persistenceNamespaceMaxQPS" // WorkerEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in worker persistence client WorkerEnablePersistencePriorityRateLimiting = "worker.enablePersistencePriorityRateLimiting" + // WorkerPersistenceDynamicRateLimitingParams is a map that contains all adjustable dynamic rate limiting params + // see DefaultDynamicRateLimitingParams for available options and defaults + WorkerPersistenceDynamicRateLimitingParams = "worker.persistenceDynamicRateLimitingParams" // WorkerIndexerConcurrency is the max concurrent messages to be processed at any given time WorkerIndexerConcurrency = "worker.indexerConcurrency" // WorkerESProcessorNumOfWorkers is num of workers for esProcessor diff --git a/common/dynamicconfig/shared_constants.go b/common/dynamicconfig/shared_constants.go index d054b8f5d84..6920db9caf8 100644 --- a/common/dynamicconfig/shared_constants.go +++ b/common/dynamicconfig/shared_constants.go @@ -45,4 +45,36 @@ var defaultNumTaskQueuePartitions = []ConstrainedValue{ }, } -var DefaultPerShardNamespaceRPSMax = func(namespace string) int { return 0 } +var DefaultPerShardNamespaceRPSMax = GetIntPropertyFilteredByNamespace(0) + +const ( + // dynamic config map keys and defaults for client.DynamicRateLimitingParams for controlling dynamic rate limiting options + // dynamicRateLimitEnabledKey toggles whether dynamic rate limiting is enabled + dynamicRateLimitEnabledKey = "enabled" + dynamicRateLimitEnabledDefault = false + // dynamicRateLimitRefreshIntervalKey is how often the rate limit and dynamic properties are refreshed. should be a string timestamp e.g. 10s + // even if the rate limiter is disabled, this property will still determine how often the dynamic config is reevaluated + dynamicRateLimitRefreshIntervalKey = "refreshInterval" + dynamicRateLimitRefreshIntervalDefault = "10s" + // dynamicRateLimitLatencyThresholdKey is the maximum average latency in ms before the rate limiter should backoff + dynamicRateLimitLatencyThresholdKey = "latencyThreshold" + dynamicRateLimitLatencyThresholdDefault = 0.0 // will not do backoff based on latency + // dynamicRateLimitErrorThresholdKey is the maximum ratio of errors:total_requests before the rate limiter should backoff. should be between 0 and 1 + dynamicRateLimitErrorThresholdKey = "errorThreshold" + dynamicRateLimitErrorThresholdDefault = 0.0 // will not do backoff based on errors + // dynamicRateLimitBackoffStepSizeKey is the amount the rate limit multiplier is reduced when backing off. should be between 0 and 1 + dynamicRateLimitBackoffStepSizeKey = "rateBackoffStepSize" + dynamicRateLimitBackoffStepSizeDefault = 0.3 + // dynamicRateLimitIncreaseStepSizeKey the amount the rate limit multiplier is increased when the system is healthy. should be between 0 and 1 + dynamicRateLimitIncreaseStepSizeKey = "rateIncreaseStepSize" + dynamicRateLimitIncreaseStepSizeDefault = 0.1 +) + +var DefaultDynamicRateLimitingParams = map[string]interface{}{ + dynamicRateLimitEnabledKey: dynamicRateLimitEnabledDefault, + dynamicRateLimitRefreshIntervalKey: dynamicRateLimitRefreshIntervalDefault, + dynamicRateLimitLatencyThresholdKey: dynamicRateLimitLatencyThresholdDefault, + dynamicRateLimitErrorThresholdKey: dynamicRateLimitErrorThresholdDefault, + dynamicRateLimitBackoffStepSizeKey: dynamicRateLimitBackoffStepSizeDefault, + dynamicRateLimitIncreaseStepSizeKey: dynamicRateLimitIncreaseStepSizeDefault, +} diff --git a/common/persistence/client/fx.go b/common/persistence/client/fx.go index 2a238b936b3..69ebc06c1b7 100644 --- a/common/persistence/client/fx.go +++ b/common/persistence/client/fx.go @@ -45,7 +45,10 @@ type ( PersistenceNamespaceMaxQps dynamicconfig.IntPropertyFnWithNamespaceFilter PersistencePerShardNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter EnablePriorityRateLimiting dynamicconfig.BoolPropertyFn - ClusterName string + + DynamicRateLimitingParams dynamicconfig.MapPropertyFn + + ClusterName string NewFactoryParams struct { fx.In @@ -61,6 +64,7 @@ type ( MetricsHandler metrics.Handler Logger log.Logger HealthSignals persistence.HealthSignalAggregator + DynamicRateLimitingParams DynamicRateLimitingParams } FactoryProviderFn func(NewFactoryParams) Factory @@ -88,6 +92,9 @@ func FactoryProvider( params.PersistenceMaxQPS, params.PersistencePerShardNamespaceMaxQPS, RequestPriorityFn, + params.HealthSignals, + params.DynamicRateLimitingParams, + params.Logger, ) } else { requestRatelimiter = NewNoopPriorityRateLimiter(params.PersistenceMaxQPS) @@ -111,10 +118,11 @@ func HealthSignalAggregatorProvider( metricsHandler metrics.Handler, logger log.Logger, ) persistence.HealthSignalAggregator { - if dynamicCollection.GetBoolProperty(dynamicconfig.PersistenceHealthSignalCollectionEnabled, true)() { + if dynamicCollection.GetBoolProperty(dynamicconfig.PersistenceHealthSignalMetricsEnabled, true)() { return persistence.NewHealthSignalAggregatorImpl( - dynamicCollection.GetDurationProperty(dynamicconfig.PersistenceHealthSignalWindowSize, 3*time.Second)(), - dynamicCollection.GetIntProperty(dynamicconfig.PersistenceHealthSignalBufferSize, 500)(), + dynamicCollection.GetBoolProperty(dynamicconfig.PersistenceHealthSignalAggregationEnabled, true)(), + dynamicCollection.GetDurationProperty(dynamicconfig.PersistenceHealthSignalWindowSize, 10*time.Second)(), + dynamicCollection.GetIntProperty(dynamicconfig.PersistenceHealthSignalBufferSize, 5000)(), metricsHandler, dynamicCollection.GetIntProperty(dynamicconfig.ShardRPSWarnLimit, 50), logger, diff --git a/common/persistence/client/health_request_rate_limiter.go b/common/persistence/client/health_request_rate_limiter.go new file mode 100644 index 00000000000..ef3c7d6f649 --- /dev/null +++ b/common/persistence/client/health_request_rate_limiter.go @@ -0,0 +1,199 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package client + +import ( + "context" + "encoding/json" + "math" + "sync/atomic" + "time" + + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/quotas" +) + +const ( + DefaultRefreshInterval = 10 * time.Second + DefaultRateBurstRatio = 1.0 + DefaultMinRateMultiplier = 0.1 + DefaultMaxRateMultiplier = 1.0 +) + +type ( + HealthRequestRateLimiterImpl struct { + enabled *atomic.Bool + params DynamicRateLimitingParams // dynamic config map + curOptions dynamicRateLimitingOptions // current dynamic config values (updated on refresh) + + rateLimiter *quotas.RateLimiterImpl + healthSignals persistence.HealthSignalAggregator + + refreshTimer *time.Ticker + + rateFn quotas.RateFn + rateToBurstRatio float64 + + minRateMultiplier float64 + maxRateMultiplier float64 + curRateMultiplier float64 + + logger log.Logger + } + + dynamicRateLimitingOptions struct { + Enabled bool + + RefreshInterval string // string returned by json.Unmarshal will be parsed into a duration + + // thresholds which should trigger backoff if exceeded + LatencyThreshold float64 + ErrorThreshold float64 + + // if either threshold is exceeded, the current rate multiplier will be reduced by this amount + RateBackoffStepSize float64 + // when the system is healthy and current rate < max rate, the current rate multiplier will be + // increased by this amount + RateIncreaseStepSize float64 + } +) + +var _ quotas.RequestRateLimiter = (*HealthRequestRateLimiterImpl)(nil) + +func NewHealthRequestRateLimiterImpl( + healthSignals persistence.HealthSignalAggregator, + rateFn quotas.RateFn, + params DynamicRateLimitingParams, + logger log.Logger, +) *HealthRequestRateLimiterImpl { + limiter := &HealthRequestRateLimiterImpl{ + enabled: &atomic.Bool{}, + rateLimiter: quotas.NewRateLimiter(rateFn(), int(DefaultRateBurstRatio*rateFn())), + healthSignals: healthSignals, + rateFn: rateFn, + params: params, + refreshTimer: time.NewTicker(DefaultRefreshInterval), + rateToBurstRatio: DefaultRateBurstRatio, + minRateMultiplier: DefaultMinRateMultiplier, + maxRateMultiplier: DefaultMaxRateMultiplier, + curRateMultiplier: DefaultMaxRateMultiplier, + logger: logger, + } + limiter.refreshDynamicParams() + return limiter +} + +func (rl *HealthRequestRateLimiterImpl) Allow(now time.Time, request quotas.Request) bool { + rl.maybeRefresh() + if !rl.enabled.Load() { + return true + } + return rl.rateLimiter.AllowN(now, request.Token) +} + +func (rl *HealthRequestRateLimiterImpl) Reserve(now time.Time, request quotas.Request) quotas.Reservation { + rl.maybeRefresh() + if !rl.enabled.Load() { + return quotas.NoopReservation + } + return rl.rateLimiter.ReserveN(now, request.Token) +} + +func (rl *HealthRequestRateLimiterImpl) Wait(ctx context.Context, request quotas.Request) error { + rl.maybeRefresh() + if !rl.enabled.Load() { + return nil + } + return rl.rateLimiter.WaitN(ctx, request.Token) +} + +func (rl *HealthRequestRateLimiterImpl) maybeRefresh() { + select { + case <-rl.refreshTimer.C: + rl.refreshDynamicParams() + if rl.enabled.Load() { + rl.refreshRate() + } + rl.updateRefreshTimer() + + default: + // no-op + } +} + +func (rl *HealthRequestRateLimiterImpl) refreshRate() { + if rl.latencyThresholdExceeded() || rl.errorThresholdExceeded() { + // limit exceeded, do backoff + rl.curRateMultiplier = math.Max(rl.minRateMultiplier, rl.curRateMultiplier-rl.curOptions.RateBackoffStepSize) + rl.rateLimiter.SetRate(rl.curRateMultiplier * rl.rateFn()) + rl.rateLimiter.SetBurst(int(rl.rateToBurstRatio * rl.rateFn())) + } else if rl.curRateMultiplier < rl.maxRateMultiplier { + // already doing backoff and under thresholds, increase limit + rl.curRateMultiplier = math.Min(rl.maxRateMultiplier, rl.curRateMultiplier+rl.curOptions.RateIncreaseStepSize) + rl.rateLimiter.SetRate(rl.curRateMultiplier * rl.rateFn()) + rl.rateLimiter.SetBurst(int(rl.rateToBurstRatio * rl.rateFn())) + } +} + +func (rl *HealthRequestRateLimiterImpl) refreshDynamicParams() { + var options dynamicRateLimitingOptions + b, err := json.Marshal(rl.params()) + if err != nil { + rl.logger.Warn("Error marshalling dynamic rate limiting params. Dynamic rate limiting is disabled.", tag.Error(err)) + rl.enabled.Store(false) + return + } + + err = json.Unmarshal(b, &options) + if err != nil { + rl.logger.Warn("Error unmarshalling dynamic rate limiting params. Dynamic rate limiting is disabled.", tag.Error(err)) + rl.enabled.Store(false) + return + } + + rl.enabled.Store(options.Enabled) + rl.curOptions = options +} + +func (rl *HealthRequestRateLimiterImpl) updateRefreshTimer() { + if len(rl.curOptions.RefreshInterval) > 0 { + if refreshDuration, err := timestamp.ParseDurationDefaultSeconds(rl.curOptions.RefreshInterval); err != nil { + rl.logger.Warn("Error parsing dynamic rate limit refreshInterval timestamp. Using previous value.", tag.Error(err)) + } else { + rl.refreshTimer.Reset(refreshDuration) + } + } +} + +func (rl *HealthRequestRateLimiterImpl) latencyThresholdExceeded() bool { + return rl.curOptions.LatencyThreshold > 0 && rl.healthSignals.AverageLatency() > rl.curOptions.LatencyThreshold +} + +func (rl *HealthRequestRateLimiterImpl) errorThresholdExceeded() bool { + return rl.curOptions.ErrorThreshold > 0 && rl.healthSignals.ErrorRatio() > rl.curOptions.ErrorThreshold +} diff --git a/common/persistence/client/quotas.go b/common/persistence/client/quotas.go index 24bf57fea33..b2466be540d 100644 --- a/common/persistence/client/quotas.go +++ b/common/persistence/client/quotas.go @@ -26,6 +26,7 @@ package client import ( "go.temporal.io/server/common/headers" + "go.temporal.io/server/common/log" p "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/quotas" "go.temporal.io/server/service/history/tasks" @@ -85,16 +86,21 @@ func NewPriorityRateLimiter( hostMaxQPS PersistenceMaxQps, perShardNamespaceMaxQPS PersistencePerShardNamespaceMaxQPS, requestPriorityFn quotas.RequestPriorityFn, + healthSignals p.HealthSignalAggregator, + dynamicParams DynamicRateLimitingParams, + logger log.Logger, ) quotas.RequestRateLimiter { - hostRequestRateLimiter := newPriorityRateLimiter( - func() float64 { return float64(hostMaxQPS()) }, - requestPriorityFn, - ) + hostRateFn := func() float64 { return float64(hostMaxQPS()) } return quotas.NewMultiRequestRateLimiter( + // per shardID+namespaceID rate limiters newPerShardPerNamespacePriorityRateLimiter(perShardNamespaceMaxQPS, hostMaxQPS, requestPriorityFn), + // per namespaceID rate limiters newPriorityNamespaceRateLimiter(namespaceMaxQPS, hostMaxQPS, requestPriorityFn), - hostRequestRateLimiter, + // host-level dynamic rate limiter + newPriorityDynamicRateLimiter(hostRateFn, requestPriorityFn, healthSignals, dynamicParams, logger), + // basic host-level rate limiter + newPriorityRateLimiter(hostRateFn, requestPriorityFn), ) } @@ -169,6 +175,25 @@ func newPriorityRateLimiter( ) } +func newPriorityDynamicRateLimiter( + rateFn quotas.RateFn, + requestPriorityFn quotas.RequestPriorityFn, + healthSignals p.HealthSignalAggregator, + dynamicParams DynamicRateLimitingParams, + logger log.Logger, +) quotas.RequestRateLimiter { + rateLimiters := make(map[int]quotas.RequestRateLimiter) + for priority := range RequestPrioritiesOrdered { + // TODO: refactor this so dynamic rate adjustment is global for all priorities + rateLimiters[priority] = NewHealthRequestRateLimiterImpl(healthSignals, rateFn, dynamicParams, logger) + } + + return quotas.NewPriorityRateLimiter( + requestPriorityFn, + rateLimiters, + ) +} + func NewNoopPriorityRateLimiter( maxQPS PersistenceMaxQps, ) quotas.RequestRateLimiter { diff --git a/common/persistence/client/store.go b/common/persistence/client/store.go index 608f53e0b87..2e4162e93a9 100644 --- a/common/persistence/client/store.go +++ b/common/persistence/client/store.go @@ -90,7 +90,8 @@ func DataStoreFactoryProvider( var faultInjection *FaultInjectionDataStoreFactory if defaultCfg.FaultInjection != nil { - dataStoreFactory = NewFaultInjectionDatastoreFactory(defaultCfg.FaultInjection, dataStoreFactory) + faultInjection = NewFaultInjectionDatastoreFactory(defaultCfg.FaultInjection, dataStoreFactory) + dataStoreFactory = faultInjection } return dataStoreFactory, faultInjection diff --git a/common/persistence/health_signal_aggregator.go b/common/persistence/health_signal_aggregator.go index b9ce009b56c..2a63d0f7edc 100644 --- a/common/persistence/health_signal_aggregator.go +++ b/common/persistence/health_signal_aggregator.go @@ -56,8 +56,9 @@ type ( requestsPerShard map[int32]int64 requestsLock sync.Mutex - latencyAverage aggregate.MovingWindowAverage - errorRatio aggregate.MovingWindowAverage + aggregationEnabled bool + latencyAverage aggregate.MovingWindowAverage + errorRatio aggregate.MovingWindowAverage metricsHandler metrics.Handler emitMetricsTimer *time.Ticker @@ -68,23 +69,33 @@ type ( ) func NewHealthSignalAggregatorImpl( + aggregationEnabled bool, windowSize time.Duration, maxBufferSize int, metricsHandler metrics.Handler, perShardRPSWarnLimit dynamicconfig.IntPropertyFn, logger log.Logger, ) *HealthSignalAggregatorImpl { - return &HealthSignalAggregatorImpl{ + ret := &HealthSignalAggregatorImpl{ status: common.DaemonStatusInitialized, shutdownCh: make(chan struct{}), requestsPerShard: make(map[int32]int64), - latencyAverage: aggregate.NewMovingWindowAvgImpl(windowSize, maxBufferSize), - errorRatio: aggregate.NewMovingWindowAvgImpl(windowSize, maxBufferSize), metricsHandler: metricsHandler, emitMetricsTimer: time.NewTicker(emitMetricsInterval), perShardRPSWarnLimit: perShardRPSWarnLimit, logger: logger, + aggregationEnabled: aggregationEnabled, } + + if aggregationEnabled { + ret.latencyAverage = aggregate.NewMovingWindowAvgImpl(windowSize, maxBufferSize) + ret.errorRatio = aggregate.NewMovingWindowAvgImpl(windowSize, maxBufferSize) + } else { + ret.latencyAverage = aggregate.NoopMovingWindowAverage + ret.errorRatio = aggregate.NoopMovingWindowAverage + } + + return ret } func (s *HealthSignalAggregatorImpl) Start() { @@ -103,14 +114,15 @@ func (s *HealthSignalAggregatorImpl) Stop() { } func (s *HealthSignalAggregatorImpl) Record(callerSegment int32, latency time.Duration, err error) { - // TODO: uncomment when adding dynamic rate limiter - //s.latencyAverage.Record(latency.Milliseconds()) - // - //if isUnhealthyError(err) { - // s.errorRatio.Record(1) - //} else { - // s.errorRatio.Record(0) - //} + if s.aggregationEnabled { + s.latencyAverage.Record(latency.Milliseconds()) + + if isUnhealthyError(err) { + s.errorRatio.Record(1) + } else { + s.errorRatio.Record(0) + } + } if callerSegment != CallerSegmentMissing { s.incrementShardRequestCount(callerSegment) @@ -153,18 +165,20 @@ func (s *HealthSignalAggregatorImpl) emitMetricsLoop() { } } -// TODO: uncomment when adding dynamic rate limiter -//func isUnhealthyError(err error) bool { -// if err == nil { -// return false -// } -// switch err.(type) { -// case *ShardOwnershipLostError, -// *AppendHistoryTimeoutError, -// *TimeoutError: -// return true -// -// default: -// return false -// } -//} +func isUnhealthyError(err error) bool { + if err == nil { + return false + } + if common.IsContextCanceledErr(err) { + return true + } + + switch err.(type) { + case *AppendHistoryTimeoutError, + *TimeoutError: + return true + + default: + return false + } +} diff --git a/common/persistence/persistence-tests/persistenceTestBase.go b/common/persistence/persistence-tests/persistenceTestBase.go index 02a88204a09..1a5c942fd7a 100644 --- a/common/persistence/persistence-tests/persistenceTestBase.go +++ b/common/persistence/persistence-tests/persistenceTestBase.go @@ -52,6 +52,7 @@ import ( "go.temporal.io/server/common/persistence/sql/sqlplugin/mysql" "go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql" "go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite" + "go.temporal.io/server/common/quotas" "go.temporal.io/server/common/resolver" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/environment" @@ -97,6 +98,7 @@ type ( TaskIDGenerator TransferTaskIDGenerator ClusterMetadata cluster.Metadata SearchAttributesManager searchattribute.Manager + PersistenceRateLimiter quotas.RequestRateLimiter PersistenceHealthSignals persistence.HealthSignalAggregator ReadLevel int64 ReplicationReadLevel int64 @@ -188,6 +190,9 @@ func (s *TestBase) Setup(clusterMetadataConfig *cluster.Config) { if clusterMetadataConfig == nil { clusterMetadataConfig = cluster.NewTestClusterMetadataConfig(false, false) } + if s.PersistenceHealthSignals == nil { + s.PersistenceHealthSignals = persistence.NoopHealthSignalAggregator + } clusterName := clusterMetadataConfig.CurrentClusterName @@ -202,7 +207,7 @@ func (s *TestBase) Setup(clusterMetadataConfig *cluster.Config) { s.Logger, metrics.NoopMetricsHandler, ) - factory := client.NewFactory(dataStoreFactory, &cfg, nil, serialization.NewSerializer(), clusterName, metrics.NoopMetricsHandler, s.Logger, persistence.NoopHealthSignalAggregator) + factory := client.NewFactory(dataStoreFactory, &cfg, s.PersistenceRateLimiter, serialization.NewSerializer(), clusterName, metrics.NoopMetricsHandler, s.Logger, s.PersistenceHealthSignals) s.TaskMgr, err = factory.NewTaskManager() s.fatalOnError("NewTaskManager", err) diff --git a/common/persistence/tests/cassandra_test.go b/common/persistence/tests/cassandra_test.go index b073430f542..9586da3b783 100644 --- a/common/persistence/tests/cassandra_test.go +++ b/common/persistence/tests/cassandra_test.go @@ -28,7 +28,6 @@ import ( "testing" "github.com/stretchr/testify/suite" - persistencetests "go.temporal.io/server/common/persistence/persistence-tests" "go.temporal.io/server/common/persistence/serialization" _ "go.temporal.io/server/common/persistence/sql/sqlplugin/mysql" diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 1beb7ff9277..5b627bd95af 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -389,6 +389,7 @@ func PersistenceRateLimitingParamsProvider( serviceConfig.PersistenceNamespaceMaxQPS, serviceConfig.PersistencePerShardNamespaceMaxQPS, serviceConfig.EnablePersistencePriorityRateLimiting, + serviceConfig.PersistenceDynamicRateLimitingParams, ) } diff --git a/service/frontend/service.go b/service/frontend/service.go index 7a91619bf5a..c520d914e85 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -60,6 +60,7 @@ type Config struct { PersistenceNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter PersistencePerShardNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter EnablePersistencePriorityRateLimiting dynamicconfig.BoolPropertyFn + PersistenceDynamicRateLimitingParams dynamicconfig.MapPropertyFn VisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn VisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn @@ -188,6 +189,7 @@ func NewConfig( PersistenceNamespaceMaxQPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendPersistenceNamespaceMaxQPS, 0), PersistencePerShardNamespaceMaxQPS: dynamicconfig.DefaultPerShardNamespaceRPSMax, EnablePersistencePriorityRateLimiting: dc.GetBoolProperty(dynamicconfig.FrontendEnablePersistencePriorityRateLimiting, true), + PersistenceDynamicRateLimitingParams: dc.GetMapProperty(dynamicconfig.FrontendPersistenceDynamicRateLimitingParams, dynamicconfig.DefaultDynamicRateLimitingParams), VisibilityPersistenceMaxReadQPS: visibility.GetVisibilityPersistenceMaxReadQPS(dc, enableReadFromES), VisibilityPersistenceMaxWriteQPS: visibility.GetVisibilityPersistenceMaxWriteQPS(dc, enableReadFromES), diff --git a/service/fx.go b/service/fx.go index c2049dc7609..4ab67d281a1 100644 --- a/service/fx.go +++ b/service/fx.go @@ -47,6 +47,7 @@ type ( PersistenceNamespaceMaxQps persistenceClient.PersistenceNamespaceMaxQps PersistencePerShardNamespaceMaxQPS persistenceClient.PersistencePerShardNamespaceMaxQPS EnablePriorityRateLimiting persistenceClient.EnablePriorityRateLimiting + DynamicRateLimitingParams persistenceClient.DynamicRateLimitingParams } ) @@ -56,12 +57,14 @@ func NewPersistenceRateLimitingParams( namespaceMaxQps dynamicconfig.IntPropertyFnWithNamespaceFilter, perShardNamespaceMaxQps dynamicconfig.IntPropertyFnWithNamespaceFilter, enablePriorityRateLimiting dynamicconfig.BoolPropertyFn, + dynamicRateLimitingParams dynamicconfig.MapPropertyFn, ) PersistenceRateLimitingParams { return PersistenceRateLimitingParams{ PersistenceMaxQps: PersistenceMaxQpsFn(maxQps, globalMaxQps), PersistenceNamespaceMaxQps: persistenceClient.PersistenceNamespaceMaxQps(namespaceMaxQps), PersistencePerShardNamespaceMaxQPS: persistenceClient.PersistencePerShardNamespaceMaxQPS(perShardNamespaceMaxQps), EnablePriorityRateLimiting: persistenceClient.EnablePriorityRateLimiting(enablePriorityRateLimiting), + DynamicRateLimitingParams: persistenceClient.DynamicRateLimitingParams(dynamicRateLimitingParams), } } diff --git a/service/history/configs/config.go b/service/history/configs/config.go index d3a706ec8fb..51402fbfa8a 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -48,6 +48,7 @@ type Config struct { PersistenceNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter PersistencePerShardNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter EnablePersistencePriorityRateLimiting dynamicconfig.BoolPropertyFn + PersistenceDynamicRateLimitingParams dynamicconfig.MapPropertyFn VisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn VisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn @@ -333,6 +334,7 @@ func NewConfig( PersistenceNamespaceMaxQPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryPersistenceNamespaceMaxQPS, 0), PersistencePerShardNamespaceMaxQPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryPersistencePerShardNamespaceMaxQPS, 0), EnablePersistencePriorityRateLimiting: dc.GetBoolProperty(dynamicconfig.HistoryEnablePersistencePriorityRateLimiting, true), + PersistenceDynamicRateLimitingParams: dc.GetMapProperty(dynamicconfig.HistoryPersistenceDynamicRateLimitingParams, dynamicconfig.DefaultDynamicRateLimitingParams), ShutdownDrainDuration: dc.GetDurationProperty(dynamicconfig.HistoryShutdownDrainDuration, 0*time.Second), MaxAutoResetPoints: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryMaxAutoResetPoints, DefaultHistoryMaxAutoResetPoints), MaxTrackedBuildIds: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryMaxTrackedBuildIds, DefaultHistoryMaxTrackedBuildIds), diff --git a/service/history/fx.go b/service/history/fx.go index 3fbce8b1b1f..5a84334dcd8 100644 --- a/service/history/fx.go +++ b/service/history/fx.go @@ -226,6 +226,7 @@ func PersistenceRateLimitingParamsProvider( serviceConfig.PersistenceNamespaceMaxQPS, serviceConfig.PersistencePerShardNamespaceMaxQPS, serviceConfig.EnablePersistencePriorityRateLimiting, + serviceConfig.PersistenceDynamicRateLimitingParams, ) } diff --git a/service/history/shard/context_test.go b/service/history/shard/context_test.go index 39037d4839c..8333a7ef957 100644 --- a/service/history/shard/context_test.go +++ b/service/history/shard/context_test.go @@ -173,7 +173,7 @@ func (s *contextSuite) TestOverwriteScheduledTaskTimestamp() { } func (s *contextSuite) TestAddTasks_Success() { - tasks := map[tasks.Category][]tasks.Task{ + testTasks := map[tasks.Category][]tasks.Task{ tasks.CategoryTransfer: {&tasks.ActivityTask{}}, // Just for testing purpose. In the real code ActivityTask can't be passed to shardContext.AddTasks. tasks.CategoryTimer: {&tasks.ActivityRetryTimerTask{}}, // Just for testing purpose. In the real code ActivityRetryTimerTask can't be passed to shardContext.AddTasks. tasks.CategoryReplication: {&tasks.HistoryReplicationTask{}}, // Just for testing purpose. In the real code HistoryReplicationTask can't be passed to shardContext.AddTasks. @@ -186,11 +186,11 @@ func (s *contextSuite) TestAddTasks_Success() { WorkflowID: tests.WorkflowID, RunID: tests.RunID, - Tasks: tasks, + Tasks: testTasks, } s.mockExecutionManager.EXPECT().AddHistoryTasks(gomock.Any(), addTasksRequest).Return(nil) - s.mockHistoryEngine.EXPECT().NotifyNewTasks(tasks) + s.mockHistoryEngine.EXPECT().NotifyNewTasks(testTasks) err := s.mockShard.AddTasks(context.Background(), addTasksRequest) s.NoError(err) diff --git a/service/matching/config.go b/service/matching/config.go index 5314cb1104e..132b3c209ad 100644 --- a/service/matching/config.go +++ b/service/matching/config.go @@ -41,6 +41,7 @@ type ( PersistenceNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter PersistencePerShardNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter EnablePersistencePriorityRateLimiting dynamicconfig.BoolPropertyFn + PersistenceDynamicRateLimitingParams dynamicconfig.MapPropertyFn SyncMatchWaitDuration dynamicconfig.DurationPropertyFnWithTaskQueueInfoFilters TestDisableSyncMatch dynamicconfig.BoolPropertyFn RPS dynamicconfig.IntPropertyFn @@ -135,6 +136,7 @@ func NewConfig(dc *dynamicconfig.Collection) *Config { PersistenceNamespaceMaxQPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MatchingPersistenceNamespaceMaxQPS, 0), PersistencePerShardNamespaceMaxQPS: dynamicconfig.DefaultPerShardNamespaceRPSMax, EnablePersistencePriorityRateLimiting: dc.GetBoolProperty(dynamicconfig.MatchingEnablePersistencePriorityRateLimiting, true), + PersistenceDynamicRateLimitingParams: dc.GetMapProperty(dynamicconfig.MatchingPersistenceDynamicRateLimitingParams, dynamicconfig.DefaultDynamicRateLimitingParams), SyncMatchWaitDuration: dc.GetDurationPropertyFilteredByTaskQueueInfo(dynamicconfig.MatchingSyncMatchWaitDuration, 200*time.Millisecond), TestDisableSyncMatch: dc.GetBoolProperty(dynamicconfig.TestMatchingDisableSyncMatch, false), RPS: dc.GetIntProperty(dynamicconfig.MatchingRPS, 1200), diff --git a/service/matching/fx.go b/service/matching/fx.go index 707694a9195..e6254ba3535 100644 --- a/service/matching/fx.go +++ b/service/matching/fx.go @@ -105,6 +105,7 @@ func PersistenceRateLimitingParamsProvider( serviceConfig.PersistenceNamespaceMaxQPS, serviceConfig.PersistencePerShardNamespaceMaxQPS, serviceConfig.EnablePersistencePriorityRateLimiting, + serviceConfig.PersistenceDynamicRateLimitingParams, ) } diff --git a/service/worker/fx.go b/service/worker/fx.go index 7454a448583..c9f3059b842 100644 --- a/service/worker/fx.go +++ b/service/worker/fx.go @@ -79,6 +79,7 @@ func PersistenceRateLimitingParamsProvider( serviceConfig.PersistenceNamespaceMaxQPS, serviceConfig.PersistencePerShardNamespaceMaxQPS, serviceConfig.EnablePersistencePriorityRateLimiting, + serviceConfig.PersistenceDynamicRateLimitingParams, ) } diff --git a/service/worker/service.go b/service/worker/service.go index 3ff30d3d202..512da2a0c81 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -113,6 +113,7 @@ type ( PersistenceNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter PersistencePerShardNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter EnablePersistencePriorityRateLimiting dynamicconfig.BoolPropertyFn + PersistenceDynamicRateLimitingParams dynamicconfig.MapPropertyFn EnableBatcher dynamicconfig.BoolPropertyFn BatcherRPS dynamicconfig.IntPropertyFnWithNamespaceFilter BatcherConcurrency dynamicconfig.IntPropertyFnWithNamespaceFilter @@ -353,6 +354,7 @@ func NewConfig( dynamicconfig.WorkerEnablePersistencePriorityRateLimiting, true, ), + PersistenceDynamicRateLimitingParams: dc.GetMapProperty(dynamicconfig.WorkerPersistenceDynamicRateLimitingParams, dynamicconfig.DefaultDynamicRateLimitingParams), VisibilityPersistenceMaxReadQPS: visibility.GetVisibilityPersistenceMaxReadQPS(dc, enableReadFromES), VisibilityPersistenceMaxWriteQPS: visibility.GetVisibilityPersistenceMaxWriteQPS(dc, enableReadFromES), diff --git a/tests/test_cluster.go b/tests/test_cluster.go index 216b4fae6a1..baa8fcf4e35 100644 --- a/tests/test_cluster.go +++ b/tests/test_cluster.go @@ -400,7 +400,6 @@ func (tc *TestCluster) SetFaultInjectionRate(rate float64) { // TearDownCluster tears down the test cluster func (tc *TestCluster) TearDownCluster() error { - tc.SetFaultInjectionRate(0) errs := tc.host.Stop() tc.testBase.TearDownWorkflowStore() if tc.host.esConfig != nil {