diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 89acf6ff3a2e..e66c91e2b6ec 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -217,6 +217,10 @@ const ( FrontendHistoryMaxPageSize = "frontend.historyMaxPageSize" // FrontendRPS is workflow rate limit per second FrontendRPS = "frontend.rps" + // FrontendNamespaceReplicationInducingAPIsRPS limits the per second request rate for namespace replication inducing + // APIs (e.g. UpdateNamespace, UpdateWorkerBuildIdCompatibility). + // This config is EXPERIMENTAL and may be changed or removed in a later release. + FrontendNamespaceReplicationInducingAPIsRPS = "frontend.rps.namespaceReplicationInducingAPIs" // FrontendMaxNamespaceRPSPerInstance is workflow namespace rate limit per second FrontendMaxNamespaceRPSPerInstance = "frontend.namespaceRPS" // FrontendMaxNamespaceBurstPerInstance is workflow namespace burst limit @@ -226,9 +230,17 @@ const ( // FrontendMaxNamespaceVisibilityRPSPerInstance is namespace rate limit per second for visibility APIs. // This config is EXPERIMENTAL and may be changed or removed in a later release. FrontendMaxNamespaceVisibilityRPSPerInstance = "frontend.namespaceRPS.visibility" + // FrontendMaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance is a per host/per namespace RPS limit for + // namespace replication inducing APIs (e.g. UpdateNamespace, UpdateWorkerBuildIdCompatibility). + // This config is EXPERIMENTAL and may be changed or removed in a later release. + FrontendMaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance = "frontend.namespaceRPS.namespaceReplicationInducingAPIs" // FrontendMaxNamespaceVisibilityBurstPerInstance is namespace burst limit for visibility APIs. // This config is EXPERIMENTAL and may be changed or removed in a later release. FrontendMaxNamespaceVisibilityBurstPerInstance = "frontend.namespaceBurst.visibility" + // FrontendMaxNamespaceNamespaceReplicationInducingAPIsBurstPerInstance is a per host/per namespace burst limit for + // namespace replication inducing APIs (e.g. UpdateNamespace, UpdateWorkerBuildIdCompatibility). + // This config is EXPERIMENTAL and may be changed or removed in a later release. + FrontendMaxNamespaceNamespaceReplicationInducingAPIsBurstPerInstance = "frontend.namespaceBurst.namespaceReplicationInducingAPIs" // FrontendGlobalNamespaceRPS is workflow namespace rate limit per second for the whole cluster. // The limit is evenly distributed among available frontend service instances. // If this is set, it overwrites per instance limit "frontend.namespaceRPS". @@ -241,6 +253,13 @@ const ( // If this is set, it overwrites per instance limit "frontend.namespaceRPS.visibility". // This config is EXPERIMENTAL and may be changed or removed in a later release. FrontendGlobalNamespaceVisibilityRPS = "frontend.globalNamespaceRPS.visibility" + // FrontendGlobalNamespaceNamespaceReplicationInducingAPIsRPS is a cluster global, per namespace RPS limit for + // namespace replication inducing APIs (e.g. UpdateNamespace, UpdateWorkerBuildIdCompatibility). + // The limit is evenly distributed among available frontend service instances. + // If this is set, it overwrites the per instance limit configured with + // "frontend.namespaceRPS.namespaceReplicationInducingAPIs". + // This config is EXPERIMENTAL and may be changed or removed in a later release. + FrontendGlobalNamespaceNamespaceReplicationInducingAPIsRPS = "frontend.globalNamespaceRPS.namespaceReplicationInducingAPIs" // InternalFrontendGlobalNamespaceVisibilityRPS is workflow namespace rate limit per second // across all internal-frontends. // This config is EXPERIMENTAL and may be changed or removed in a later release. diff --git a/service/frontend/configs/quotas.go b/service/frontend/configs/quotas.go index c209e36d0781..2294b6d88759 100644 --- a/service/frontend/configs/quotas.go +++ b/service/frontend/configs/quotas.go @@ -72,7 +72,6 @@ var ( "PollActivityTaskQueue": 2, "GetWorkflowExecutionHistoryReverse": 2, "GetWorkerBuildIdCompatibility": 2, - "UpdateWorkerBuildIdCompatibility": 2, "GetWorkerTaskReachability": 2, "DeleteWorkflowExecution": 2, @@ -95,13 +94,22 @@ var ( VisibilityAPIPrioritiesOrdered = []int{0} + // Special rate limiting for APIs that may insert replication tasks into a namespace replication queue. + // The replication queue is used to propagate critical failover messages and this mapping prevents flooding the + // queue and delaying failover. + NamespaceReplicationInducingAPIToPriority = map[string]int{ + "UpdateNamespace": 0, + "UpdateWorkerBuildIdCompatibility": 1, + } + + NamespaceReplicationInducingAPIPrioritiesOrdered = []int{0, 1} + OtherAPIToPriority = map[string]int{ "GetClusterInfo": 0, "GetSystemInfo": 0, "GetSearchAttributes": 0, "RegisterNamespace": 0, - "UpdateNamespace": 0, "DescribeNamespace": 0, "ListNamespaces": 0, "DeprecateNamespace": 0, @@ -157,12 +165,14 @@ func (c *NamespaceRateBurstImpl) Burst() int { func NewRequestToRateLimiter( executionRateBurstFn quotas.RateBurst, visibilityRateBurstFn quotas.RateBurst, + namespaceReplicationInducingRateBurstFn quotas.RateBurst, otherRateBurstFn quotas.RateBurst, ) quotas.RequestRateLimiter { mapping := make(map[string]quotas.RequestRateLimiter) executionRateLimiter := NewExecutionPriorityRateLimiter(executionRateBurstFn) visibilityRateLimiter := NewVisibilityPriorityRateLimiter(visibilityRateBurstFn) + namespaceReplicationInducingRateLimiter := NewNamespaceReplicationInducingAPIPriorityRateLimiter(namespaceReplicationInducingRateBurstFn) otherRateLimiter := NewOtherAPIPriorityRateLimiter(otherRateBurstFn) for api := range ExecutionAPIToPriority { @@ -171,6 +181,9 @@ func NewRequestToRateLimiter( for api := range VisibilityAPIToPriority { mapping[api] = visibilityRateLimiter } + for api := range NamespaceReplicationInducingAPIToPriority { + mapping[api] = namespaceReplicationInducingRateLimiter + } for api := range OtherAPIToPriority { mapping[api] = otherRateLimiter } @@ -208,6 +221,21 @@ func NewVisibilityPriorityRateLimiter( }, rateLimiters) } +func NewNamespaceReplicationInducingAPIPriorityRateLimiter( + rateBurstFn quotas.RateBurst, +) quotas.RequestRateLimiter { + rateLimiters := make(map[int]quotas.RequestRateLimiter) + for priority := range NamespaceReplicationInducingAPIPrioritiesOrdered { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) + } + return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { + if priority, ok := NamespaceReplicationInducingAPIToPriority[req.API]; ok { + return priority + } + return NamespaceReplicationInducingAPIPrioritiesOrdered[len(NamespaceReplicationInducingAPIPrioritiesOrdered)-1] + }, rateLimiters) +} + func NewOtherAPIPriorityRateLimiter( rateBurstFn quotas.RateBurst, ) quotas.RequestRateLimiter { diff --git a/service/frontend/configs/quotas_test.go b/service/frontend/configs/quotas_test.go index 64520ef6859f..d48027cf2a51 100644 --- a/service/frontend/configs/quotas_test.go +++ b/service/frontend/configs/quotas_test.go @@ -73,6 +73,13 @@ func (s *quotasSuite) TestVisibilityAPIToPriorityMapping() { } } +func (s *quotasSuite) TestNamespaceReplicationInducingAPIToPriorityMapping() { + for _, priority := range NamespaceReplicationInducingAPIToPriority { + index := slices.Index(NamespaceReplicationInducingAPIPrioritiesOrdered, priority) + s.NotEqual(-1, index) + } +} + func (s *quotasSuite) TestOtherAPIToPriorityMapping() { for _, priority := range OtherAPIToPriority { index := slices.Index(OtherAPIPrioritiesOrdered, priority) @@ -92,6 +99,12 @@ func (s *quotasSuite) TestVisibilityAPIPrioritiesOrdered() { } } +func (s *quotasSuite) TestNamespaceReplicationInducingAPIPrioritiesOrdered() { + for idx := range NamespaceReplicationInducingAPIPrioritiesOrdered[1:] { + s.True(NamespaceReplicationInducingAPIPrioritiesOrdered[idx] < NamespaceReplicationInducingAPIPrioritiesOrdered[idx+1]) + } +} + func (s *quotasSuite) TestOtherAPIPrioritiesOrdered() { for idx := range OtherAPIPrioritiesOrdered[1:] { s.True(OtherAPIPrioritiesOrdered[idx] < OtherAPIPrioritiesOrdered[idx+1]) @@ -120,17 +133,16 @@ func (s *quotasSuite) TestExecutionAPIs() { "RespondActivityTaskCompletedById": {}, "RespondWorkflowTaskCompleted": {}, - "ResetWorkflowExecution": {}, - "DescribeWorkflowExecution": {}, - "RespondWorkflowTaskFailed": {}, - "QueryWorkflow": {}, - "RespondQueryTaskCompleted": {}, - "PollWorkflowTaskQueue": {}, - "PollActivityTaskQueue": {}, - "GetWorkerBuildIdCompatibility": {}, - "UpdateWorkerBuildIdCompatibility": {}, - "GetWorkerTaskReachability": {}, - "DeleteWorkflowExecution": {}, + "ResetWorkflowExecution": {}, + "DescribeWorkflowExecution": {}, + "RespondWorkflowTaskFailed": {}, + "QueryWorkflow": {}, + "RespondQueryTaskCompleted": {}, + "PollWorkflowTaskQueue": {}, + "PollActivityTaskQueue": {}, + "GetWorkerBuildIdCompatibility": {}, + "GetWorkerTaskReachability": {}, + "DeleteWorkflowExecution": {}, "ResetStickyTaskQueue": {}, "DescribeTaskQueue": {}, @@ -172,6 +184,24 @@ func (s *quotasSuite) TestVisibilityAPIs() { s.Equal(apiToPriority, VisibilityAPIToPriority) } +func (s *quotasSuite) TestNamespaceReplicationInducingAPIs() { + apis := map[string]struct{}{ + "UpdateWorkerBuildIdCompatibility": {}, + "UpdateNamespace": {}, + } + + var service workflowservice.WorkflowServiceServer + t := reflect.TypeOf(&service).Elem() + apiToPriority := make(map[string]int, t.NumMethod()) + for i := 0; i < t.NumMethod(); i++ { + apiName := t.Method(i).Name + if _, ok := apis[apiName]; ok { + apiToPriority[apiName] = NamespaceReplicationInducingAPIToPriority[apiName] + } + } + s.Equal(apiToPriority, NamespaceReplicationInducingAPIToPriority) +} + func (s *quotasSuite) TestOtherAPIs() { apis := map[string]struct{}{ "GetClusterInfo": {}, @@ -179,7 +209,6 @@ func (s *quotasSuite) TestOtherAPIs() { "GetSearchAttributes": {}, "RegisterNamespace": {}, - "UpdateNamespace": {}, "DescribeNamespace": {}, "ListNamespaces": {}, "DeprecateNamespace": {}, @@ -225,6 +254,9 @@ func (s *quotasSuite) TestAllAPIs() { for api := range VisibilityAPIToPriority { actualAPIs[api] = struct{}{} } + for api := range NamespaceReplicationInducingAPIToPriority { + actualAPIs[api] = struct{}{} + } for api := range OtherAPIToPriority { actualAPIs[api] = struct{}{} } diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 5b627bd95af1..ff4d8671baa9 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -288,10 +288,13 @@ func RateLimitInterceptorProvider( serviceConfig *Config, ) *interceptor.RateLimitInterceptor { rateFn := func() float64 { return float64(serviceConfig.RPS()) } + namespaceReplicationInducingRateFn := func() float64 { return float64(serviceConfig.NamespaceReplicationInducingAPIsRPS()) } + return interceptor.NewRateLimitInterceptor( configs.NewRequestToRateLimiter( quotas.NewDefaultIncomingRateLimiter(rateFn), quotas.NewDefaultIncomingRateLimiter(rateFn), + quotas.NewDefaultIncomingRateLimiter(namespaceReplicationInducingRateFn), quotas.NewDefaultIncomingRateLimiter(rateFn), ), map[string]int{}, @@ -304,15 +307,18 @@ func NamespaceRateLimitInterceptorProvider( namespaceRegistry namespace.Registry, frontendServiceResolver membership.ServiceResolver, ) *interceptor.NamespaceRateLimitInterceptor { - var globalNamespaceRPS, globalNamespaceVisibilityRPS dynamicconfig.IntPropertyFnWithNamespaceFilter + var globalNamespaceRPS, globalNamespaceVisibilityRPS, globalNamespaceNamespaceReplicationInducingAPIsRPS dynamicconfig.IntPropertyFnWithNamespaceFilter switch serviceName { case primitives.FrontendService: globalNamespaceRPS = serviceConfig.GlobalNamespaceRPS globalNamespaceVisibilityRPS = serviceConfig.GlobalNamespaceVisibilityRPS + globalNamespaceNamespaceReplicationInducingAPIsRPS = serviceConfig.GlobalNamespaceNamespaceReplicationInducingAPIsRPS case primitives.InternalFrontendService: globalNamespaceRPS = serviceConfig.InternalFEGlobalNamespaceRPS globalNamespaceVisibilityRPS = serviceConfig.InternalFEGlobalNamespaceVisibilityRPS + // Internal frontend has no special limit for this set of APIs + globalNamespaceNamespaceReplicationInducingAPIsRPS = serviceConfig.InternalFEGlobalNamespaceRPS default: panic("invalid service name") } @@ -334,11 +340,20 @@ func NamespaceRateLimitInterceptorProvider( namespace, ) } + namespaceReplicationInducingRateFn := func(ns string) float64 { + return namespaceRPS( + serviceConfig.MaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance, + globalNamespaceNamespaceReplicationInducingAPIsRPS, + frontendServiceResolver, + ns, + ) + } namespaceRateLimiter := quotas.NewNamespaceRequestRateLimiter( func(req quotas.Request) quotas.RequestRateLimiter { return configs.NewRequestToRateLimiter( configs.NewNamespaceRateBurst(req.Caller, rateFn, serviceConfig.MaxNamespaceBurstPerInstance), configs.NewNamespaceRateBurst(req.Caller, visibilityRateFn, serviceConfig.MaxNamespaceVisibilityBurstPerInstance), + configs.NewNamespaceRateBurst(req.Caller, namespaceReplicationInducingRateFn, serviceConfig.MaxNamespaceNamespaceReplicationInducingAPIsBurstPerInstance), configs.NewNamespaceRateBurst(req.Caller, rateFn, serviceConfig.MaxNamespaceBurstPerInstance), ) }, diff --git a/service/frontend/service.go b/service/frontend/service.go index c520d914e855..b23927860026 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -69,24 +69,28 @@ type Config struct { VisibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter VisibilityEnableManualPagination dynamicconfig.BoolPropertyFnWithNamespaceFilter - HistoryMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter - RPS dynamicconfig.IntPropertyFn - MaxNamespaceRPSPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter - MaxNamespaceBurstPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter - MaxNamespaceCountPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter - MaxNamespaceVisibilityRPSPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter - MaxNamespaceVisibilityBurstPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter - GlobalNamespaceRPS dynamicconfig.IntPropertyFnWithNamespaceFilter - InternalFEGlobalNamespaceRPS dynamicconfig.IntPropertyFnWithNamespaceFilter - GlobalNamespaceVisibilityRPS dynamicconfig.IntPropertyFnWithNamespaceFilter - InternalFEGlobalNamespaceVisibilityRPS dynamicconfig.IntPropertyFnWithNamespaceFilter - MaxIDLengthLimit dynamicconfig.IntPropertyFn - WorkerBuildIdSizeLimit dynamicconfig.IntPropertyFn - ReachabilityTaskQueueScanLimit dynamicconfig.IntPropertyFn - ReachabilityQueryBuildIdLimit dynamicconfig.IntPropertyFn - DisallowQuery dynamicconfig.BoolPropertyFnWithNamespaceFilter - ShutdownDrainDuration dynamicconfig.DurationPropertyFn - ShutdownFailHealthCheckDuration dynamicconfig.DurationPropertyFn + HistoryMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter + RPS dynamicconfig.IntPropertyFn + NamespaceReplicationInducingAPIsRPS dynamicconfig.IntPropertyFn + MaxNamespaceRPSPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter + MaxNamespaceBurstPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter + MaxNamespaceCountPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter + MaxNamespaceVisibilityRPSPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter + MaxNamespaceVisibilityBurstPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter + MaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter + MaxNamespaceNamespaceReplicationInducingAPIsBurstPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter + GlobalNamespaceRPS dynamicconfig.IntPropertyFnWithNamespaceFilter + InternalFEGlobalNamespaceRPS dynamicconfig.IntPropertyFnWithNamespaceFilter + GlobalNamespaceVisibilityRPS dynamicconfig.IntPropertyFnWithNamespaceFilter + InternalFEGlobalNamespaceVisibilityRPS dynamicconfig.IntPropertyFnWithNamespaceFilter + GlobalNamespaceNamespaceReplicationInducingAPIsRPS dynamicconfig.IntPropertyFnWithNamespaceFilter + MaxIDLengthLimit dynamicconfig.IntPropertyFn + WorkerBuildIdSizeLimit dynamicconfig.IntPropertyFn + ReachabilityTaskQueueScanLimit dynamicconfig.IntPropertyFn + ReachabilityQueryBuildIdLimit dynamicconfig.IntPropertyFn + DisallowQuery dynamicconfig.BoolPropertyFnWithNamespaceFilter + ShutdownDrainDuration dynamicconfig.DurationPropertyFn + ShutdownFailHealthCheckDuration dynamicconfig.DurationPropertyFn MaxBadBinaries dynamicconfig.IntPropertyFnWithNamespaceFilter @@ -198,17 +202,24 @@ func NewConfig( VisibilityDisableOrderByClause: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.VisibilityDisableOrderByClause, true), VisibilityEnableManualPagination: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.VisibilityEnableManualPagination, true), - HistoryMaxPageSize: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendHistoryMaxPageSize, common.GetHistoryMaxPageSize), - RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 2400), - MaxNamespaceRPSPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceRPSPerInstance, 2400), - MaxNamespaceBurstPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceBurstPerInstance, 4800), - MaxNamespaceCountPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceCountPerInstance, 1200), - MaxNamespaceVisibilityRPSPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceVisibilityRPSPerInstance, 10), - MaxNamespaceVisibilityBurstPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceVisibilityBurstPerInstance, 10), + HistoryMaxPageSize: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendHistoryMaxPageSize, common.GetHistoryMaxPageSize), + RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 2400), + NamespaceReplicationInducingAPIsRPS: dc.GetIntProperty(dynamicconfig.FrontendNamespaceReplicationInducingAPIsRPS, 20), + + MaxNamespaceRPSPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceRPSPerInstance, 2400), + MaxNamespaceBurstPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceBurstPerInstance, 4800), + MaxNamespaceCountPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceCountPerInstance, 1200), + MaxNamespaceVisibilityRPSPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceVisibilityRPSPerInstance, 10), + MaxNamespaceVisibilityBurstPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceVisibilityBurstPerInstance, 10), + MaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance, 1), + MaxNamespaceNamespaceReplicationInducingAPIsBurstPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceNamespaceReplicationInducingAPIsBurstPerInstance, 10), + GlobalNamespaceRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendGlobalNamespaceRPS, 0), InternalFEGlobalNamespaceRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.InternalFrontendGlobalNamespaceRPS, 0), GlobalNamespaceVisibilityRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendGlobalNamespaceVisibilityRPS, 0), InternalFEGlobalNamespaceVisibilityRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.InternalFrontendGlobalNamespaceVisibilityRPS, 0), + // Overshoot since these low rate limits don't work well in an uncoordinated global limiter. + GlobalNamespaceNamespaceReplicationInducingAPIsRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendGlobalNamespaceNamespaceReplicationInducingAPIsRPS, 10), MaxIDLengthLimit: dc.GetIntProperty(dynamicconfig.MaxIDLengthLimit, 1000), WorkerBuildIdSizeLimit: dc.GetIntProperty(dynamicconfig.WorkerBuildIdSizeLimit, 255), ReachabilityTaskQueueScanLimit: dc.GetIntProperty(dynamicconfig.ReachabilityTaskQueueScanLimit, 20), diff --git a/service/matching/version_sets.go b/service/matching/version_sets.go index 028cf7bad03b..df30b8046ba3 100644 --- a/service/matching/version_sets.go +++ b/service/matching/version_sets.go @@ -456,8 +456,8 @@ func lookupVersionSetForAdd(data *persistencespb.VersioningData, buildId string) // not, then we'll guess wrong, but when we get the replication event, we'll merge // the sets and use both ids. // TODO: add metric and log to make this situation visible - guessedSetId := hashBuildId(buildId) - return guessedSetId, nil + // guessedSetId := hashBuildId(buildId) + // return guessedSetId, nil } set = data.VersionSets[setIdx] } diff --git a/tests/versioning_test.go b/tests/versioning_test.go index 2d454743d915..de9519774fe1 100644 --- a/tests/versioning_test.go +++ b/tests/versioning_test.go @@ -75,6 +75,11 @@ func (s *versioningIntegSuite) SetupSuite() { dynamicconfig.MatchingForwarderMaxChildrenPerNode: partitionTreeDegree, dynamicconfig.TaskQueuesPerBuildIdLimit: 3, + // Make sure we don't hit the rate limiter in tests + dynamicconfig.FrontendMaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance: 1000, + dynamicconfig.FrontendMaxNamespaceNamespaceReplicationInducingAPIsBurstPerInstance: 1000, + dynamicconfig.FrontendNamespaceReplicationInducingAPIsRPS: 1000, + // The dispatch tests below rely on being able to see the effects of changing // versioning data relatively quickly. In general we only promise to act on new // versioning data "soon", i.e. after a long poll interval. We can reduce the long poll diff --git a/tests/xdc/user_data_replication_test.go b/tests/xdc/user_data_replication_test.go index 8a1e89ba0d67..a76133f08f8a 100644 --- a/tests/xdc/user_data_replication_test.go +++ b/tests/xdc/user_data_replication_test.go @@ -74,9 +74,13 @@ func TestUserDataReplicationTestSuite(t *testing.T) { func (s *userDataReplicationTestSuite) SetupSuite() { s.dynamicConfigOverrides = map[dynamicconfig.Key]interface{}{ - dynamicconfig.FrontendEnableWorkerVersioningDataAPIs: true, - dynamicconfig.FrontendEnableWorkerVersioningWorkflowAPIs: true, - dynamicconfig.BuildIdScavengerEnabled: true, + // Make sure we don't hit the rate limiter in tests + dynamicconfig.FrontendMaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance: 1000, + dynamicconfig.FrontendMaxNamespaceNamespaceReplicationInducingAPIsBurstPerInstance: 1000, + dynamicconfig.FrontendNamespaceReplicationInducingAPIsRPS: 1000, + dynamicconfig.FrontendEnableWorkerVersioningDataAPIs: true, + dynamicconfig.FrontendEnableWorkerVersioningWorkflowAPIs: true, + dynamicconfig.BuildIdScavengerEnabled: true, } s.setupSuite([]string{"task_queue_repl_active", "task_queue_repl_standby"}) }