From ef9b3c4ca0ae2fa30fa5e0f6c2c46e9426b3be5b Mon Sep 17 00:00:00 2001 From: Zijian Date: Fri, 22 Nov 2024 17:44:08 +0000 Subject: [PATCH] Change downscale detection --- common/dynamicconfig/config.go | 23 ++++++++++++++ common/dynamicconfig/constants.go | 15 ++++----- common/metrics/defs.go | 4 +++ service/matching/config/config.go | 6 ++-- service/matching/config/config_test.go | 4 ++- service/matching/tasklist/adaptive_scaler.go | 11 +++---- .../matching/tasklist/adaptive_scaler_test.go | 31 ++++++++++++++++++- .../matching/tasklist/task_list_manager.go | 4 +-- 8 files changed, 78 insertions(+), 20 deletions(-) diff --git a/common/dynamicconfig/config.go b/common/dynamicconfig/config.go index 6a43188fc30..21616559a8b 100644 --- a/common/dynamicconfig/config.go +++ b/common/dynamicconfig/config.go @@ -98,6 +98,9 @@ type FloatPropertyFn func(opts ...FilterOption) float64 // FloatPropertyFnWithShardIDFilter is a wrapper to get float property from dynamic config with shardID as filter type FloatPropertyFnWithShardIDFilter func(shardID int) float64 +// FloatPropertyFnWithTaskListInfoFilters is a wrapper to get duration property from dynamic config with three filters: domain, taskList, taskType +type FloatPropertyFnWithTaskListInfoFilters func(domain string, taskList string, taskType int) float64 + // DurationPropertyFn is a wrapper to get duration property from dynamic config type DurationPropertyFn func(opts ...FilterOption) time.Duration @@ -302,6 +305,26 @@ func (c *Collection) GetFloat64PropertyFilteredByShardID(key FloatKey) FloatProp } } +// GetFloatPropertyFilteredByTaskListInfo gets property with taskListInfo as filters and asserts that it's a float64 +func (c *Collection) GetFloat64PropertyFilteredByTaskListInfo(key FloatKey) FloatPropertyFnWithTaskListInfoFilters { + return func(domain string, taskList string, taskType int) float64 { + filters := c.toFilterMap( + DomainFilter(domain), + TaskListFilter(taskList), + TaskTypeFilter(taskType), + ) + val, err := c.client.GetFloatValue( + key, + filters, + ) + if err != nil { + c.logError(key, filters, err) + return key.DefaultFloat() + } + return val + } +} + // GetDurationProperty gets property and asserts that it's a duration func (c *Collection) GetDurationProperty(key DurationKey) DurationPropertyFn { return func(opts ...FilterOption) time.Duration { diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 3868734843c..b145d307c7c 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -814,7 +814,6 @@ const ( MatchingForwarderMaxChildrenPerNode MatchingPartitionUpscaleRPS - MatchingPartitionDownscaleRPS // key for history @@ -2249,6 +2248,8 @@ const ( // Default value: 0.5 HistoryGlobalRatelimiterNewDataWeight + MatchingPartitionDownscaleFactor + // LastFloatKey must be the last one in this const group LastFloatKey ) @@ -3293,12 +3294,6 @@ var IntKeys = map[IntKey]DynamicInt{ Description: "MatchingPartitionUpscaleRPS is the threshold of adding tasks RPS per partition to trigger upscale", DefaultValue: 200, }, - MatchingPartitionDownscaleRPS: { - KeyName: "matching.partitionDownscaleRPS", - Filters: []Filter{DomainName, TaskListName, TaskType}, - Description: "MatchingPartitionDownscaleRPS is the threshold of adding tasks RPS per partition to trigger downscale", - DefaultValue: 100, - }, HistoryRPS: { KeyName: "history.rps", Description: "HistoryRPS is request rate per second for each history host", @@ -4552,6 +4547,12 @@ var FloatKeys = map[FloatKey]DynamicFloat{ Description: "HistoryGlobalRatelimiterNewDataWeight defines how much weight to give each host's newest data, per update. Must be between 0 and 1, higher values match new values more closely after a single update", DefaultValue: 0.5, }, + MatchingPartitionDownscaleFactor: { + KeyName: "matching.partitionDownscaleFactor", + Description: "MatchingPartitionDownscaleFactor introduces hysteresis to prevent oscillation by setting a lower QPS threshold for downscaling, ensuring partitions are only removed when the load decreases significantly below the capacity of fewer partitions.", + Filters: []Filter{DomainName, TaskListName, TaskType}, + DefaultValue: 0.75, + }, } var StringKeys = map[StringKey]DynamicString{ diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 0ff0931ed3f..f3c02aee76e 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -2632,6 +2632,8 @@ const ( TaskListReadWritePartitionMismatchGauge TaskListPollerPartitionMismatchGauge EstimatedAddTaskQPSGauge + TaskListPartitionUpscaleThresholdGauge + TaskListPartitionDownscaleThresholdGauge NumMatchingMetrics ) @@ -3322,6 +3324,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ TaskListReadWritePartitionMismatchGauge: {metricName: "tasklist_read_write_partition_mismatch", metricType: Gauge}, TaskListPollerPartitionMismatchGauge: {metricName: "tasklist_poller_partition_mismatch", metricType: Gauge}, EstimatedAddTaskQPSGauge: {metricName: "estimated_add_task_qps_per_tl", metricType: Gauge}, + TaskListPartitionUpscaleThresholdGauge: {metricName: "tasklist_partition_upscale_threshold", metricType: Gauge}, + TaskListPartitionDownscaleThresholdGauge: {metricName: "tasklist_partition_downscale_threshold", metricType: Gauge}, }, Worker: { ReplicatorMessages: {metricName: "replicator_messages"}, diff --git a/service/matching/config/config.go b/service/matching/config/config.go index 1931df4bc20..0e806e3d735 100644 --- a/service/matching/config/config.go +++ b/service/matching/config/config.go @@ -55,7 +55,7 @@ type ( LocalTaskWaitTime dynamicconfig.DurationPropertyFnWithTaskListInfoFilters EnableGetNumberOfPartitionsFromCache dynamicconfig.BoolPropertyFnWithTaskListInfoFilters PartitionUpscaleRPS dynamicconfig.IntPropertyFnWithTaskListInfoFilters - PartitionDownscaleRPS dynamicconfig.IntPropertyFnWithTaskListInfoFilters + PartitionDownscaleFactor dynamicconfig.FloatPropertyFnWithTaskListInfoFilters PartitionUpscaleSustainedDuration dynamicconfig.DurationPropertyFnWithTaskListInfoFilters PartitionDownscaleSustainedDuration dynamicconfig.DurationPropertyFnWithTaskListInfoFilters AdaptiveScalerUpdateInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters @@ -116,7 +116,7 @@ type ( LocalPollWaitTime func() time.Duration LocalTaskWaitTime func() time.Duration PartitionUpscaleRPS func() int - PartitionDownscaleRPS func() int + PartitionDownscaleFactor func() float64 PartitionUpscaleSustainedDuration func() time.Duration PartitionDownscaleSustainedDuration func() time.Duration AdaptiveScalerUpdateInterval func() time.Duration @@ -179,7 +179,7 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string, getIsolationGroups LocalPollWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalPollWaitTime), LocalTaskWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalTaskWaitTime), PartitionUpscaleRPS: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingPartitionUpscaleRPS), - PartitionDownscaleRPS: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingPartitionDownscaleRPS), + PartitionDownscaleFactor: dc.GetFloat64PropertyFilteredByTaskListInfo(dynamicconfig.MatchingPartitionDownscaleFactor), PartitionUpscaleSustainedDuration: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingPartitionUpscaleSustainedDuration), PartitionDownscaleSustainedDuration: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingPartitionDownscaleSustainedDuration), AdaptiveScalerUpdateInterval: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingAdaptiveScalerUpdateInterval), diff --git a/service/matching/config/config_test.go b/service/matching/config/config_test.go index 275a78d7ec2..1de9219dd43 100644 --- a/service/matching/config/config_test.go +++ b/service/matching/config/config_test.go @@ -82,7 +82,7 @@ func TestNewConfig(t *testing.T) { "EnableTasklistOwnershipGuard": {dynamicconfig.MatchingEnableTasklistGuardAgainstOwnershipShardLoss, false}, "EnableGetNumberOfPartitionsFromCache": {dynamicconfig.MatchingEnableGetNumberOfPartitionsFromCache, false}, "PartitionUpscaleRPS": {dynamicconfig.MatchingPartitionUpscaleRPS, 30}, - "PartitionDownscaleRPS": {dynamicconfig.MatchingPartitionDownscaleRPS, 31}, + "PartitionDownscaleFactor": {dynamicconfig.MatchingPartitionDownscaleFactor, 31.0}, "PartitionUpscaleSustainedDuration": {dynamicconfig.MatchingPartitionUpscaleSustainedDuration, time.Duration(32)}, "PartitionDownscaleSustainedDuration": {dynamicconfig.MatchingPartitionDownscaleSustainedDuration, time.Duration(33)}, "AdaptiveScalerUpdateInterval": {dynamicconfig.MatchingAdaptiveScalerUpdateInterval, time.Duration(34)}, @@ -155,6 +155,8 @@ func getValue(f *reflect.Value) interface{} { return fn() case dynamicconfig.StringPropertyFn: return fn() + case dynamicconfig.FloatPropertyFnWithTaskListInfoFilters: + return fn("domain", "tasklist", int(types.TaskListTypeDecision)) case func() []string: return fn() default: diff --git a/service/matching/tasklist/adaptive_scaler.go b/service/matching/tasklist/adaptive_scaler.go index dcc4f0ff336..67aeda16b00 100644 --- a/service/matching/tasklist/adaptive_scaler.go +++ b/service/matching/tasklist/adaptive_scaler.go @@ -133,7 +133,6 @@ func (a *adaptiveScalerImpl) run() { return } qps := a.qpsTracker.QPS() - a.scope.UpdateGauge(metrics.EstimatedAddTaskQPSGauge, qps) partitionConfig := a.getPartitionConfig() // adjust the number of write partitions based on qps numWritePartitions := a.adjustWritePartitions(qps, partitionConfig.NumWritePartitions) @@ -168,11 +167,11 @@ func (a *adaptiveScalerImpl) getPartitionConfig() *types.TaskListPartitionConfig func (a *adaptiveScalerImpl) adjustWritePartitions(qps float64, numWritePartitions int32) int32 { upscaleThreshold := float64(a.config.PartitionUpscaleRPS()) - downscaleThreshold := float64(a.config.PartitionDownscaleRPS()) - if downscaleThreshold > upscaleThreshold { - downscaleThreshold = upscaleThreshold - a.logger.Warn("downscale threshold is larger than upscale threshold, use upscale threshold for downscale threshold instead") - } + downscaleFactor := a.config.PartitionDownscaleFactor() + downscaleThreshold := float64(numWritePartitions-1) * upscaleThreshold * downscaleFactor / float64(numWritePartitions) + a.scope.UpdateGauge(metrics.EstimatedAddTaskQPSGauge, qps) + a.scope.UpdateGauge(metrics.TaskListPartitionUpscaleThresholdGauge, upscaleThreshold) + a.scope.UpdateGauge(metrics.TaskListPartitionDownscaleThresholdGauge, downscaleThreshold) result := numWritePartitions if qps > upscaleThreshold { diff --git a/service/matching/tasklist/adaptive_scaler_test.go b/service/matching/tasklist/adaptive_scaler_test.go index e04a8103a74..9be3f2049cc 100644 --- a/service/matching/tasklist/adaptive_scaler_test.go +++ b/service/matching/tasklist/adaptive_scaler_test.go @@ -209,6 +209,35 @@ func TestAdaptiveScalerRun(t *testing.T) { }, cycles: 2, }, + { + name: "overload but no fluctuation", + mockSetup: func(deps *mockAdaptiveScalerDeps) { + // overload start + deps.mockQPSTracker.EXPECT().QPS().Return(210.0) + deps.mockManager.EXPECT().TaskListPartitionConfig().Return(nil) + + // overload passing sustained period + deps.mockQPSTracker.EXPECT().QPS().Return(210.0) + deps.mockManager.EXPECT().TaskListPartitionConfig().Return(nil) + deps.mockManager.EXPECT().UpdateTaskListPartitionConfig(gomock.Any(), &types.TaskListPartitionConfig{ + NumReadPartitions: 2, + NumWritePartitions: 2, + }).Return(nil) + + // not overload with 1 partition, but avoid fluctuation, so don't scale down + deps.mockQPSTracker.EXPECT().QPS().Return(190.0) + deps.mockManager.EXPECT().TaskListPartitionConfig().Return(&types.TaskListPartitionConfig{ + NumReadPartitions: 2, + NumWritePartitions: 2, + }) + deps.mockQPSTracker.EXPECT().QPS().Return(190.0) + deps.mockManager.EXPECT().TaskListPartitionConfig().Return(&types.TaskListPartitionConfig{ + NumReadPartitions: 2, + NumWritePartitions: 2, + }) + }, + cycles: 4, + }, } for _, tc := range testCases { @@ -219,7 +248,7 @@ func TestAdaptiveScalerRun(t *testing.T) { require.NoError(t, deps.dynamicClient.UpdateValue(dynamicconfig.MatchingEnableAdaptiveScaler, true)) require.NoError(t, deps.dynamicClient.UpdateValue(dynamicconfig.MatchingEnableGetNumberOfPartitionsFromCache, true)) require.NoError(t, deps.dynamicClient.UpdateValue(dynamicconfig.MatchingPartitionUpscaleRPS, 200)) - require.NoError(t, deps.dynamicClient.UpdateValue(dynamicconfig.MatchingPartitionDownscaleRPS, 100)) + require.NoError(t, deps.dynamicClient.UpdateValue(dynamicconfig.MatchingPartitionDownscaleFactor, 0.75)) require.NoError(t, deps.dynamicClient.UpdateValue(dynamicconfig.MatchingPartitionUpscaleSustainedDuration, time.Second)) require.NoError(t, deps.dynamicClient.UpdateValue(dynamicconfig.MatchingPartitionDownscaleSustainedDuration, time.Second)) tc.mockSetup(deps) diff --git a/service/matching/tasklist/task_list_manager.go b/service/matching/tasklist/task_list_manager.go index 9a788cb4cef..292fe479d39 100644 --- a/service/matching/tasklist/task_list_manager.go +++ b/service/matching/tasklist/task_list_manager.go @@ -975,8 +975,8 @@ func newTaskListConfig(id *Identifier, cfg *config.Config, domainName string) *c PartitionUpscaleRPS: func() int { return cfg.PartitionUpscaleRPS(domainName, taskListName, taskType) }, - PartitionDownscaleRPS: func() int { - return cfg.PartitionDownscaleRPS(domainName, taskListName, taskType) + PartitionDownscaleFactor: func() float64 { + return cfg.PartitionDownscaleFactor(domainName, taskListName, taskType) }, PartitionUpscaleSustainedDuration: func() time.Duration { return cfg.PartitionUpscaleSustainedDuration(domainName, taskListName, taskType)