Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add special rate limiter for namespace replication inducing APIs #4455

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ const (
FrontendHistoryMaxPageSize = "frontend.historyMaxPageSize"
// FrontendRPS is workflow rate limit per second
FrontendRPS = "frontend.rps"
// FrontendNamespaceReplicationInducingAPIsRPS limits the per second request rate for namespace replication inducing
// APIs (e.g. UpdateNamespace, UpdateWorkerBuildIdCompatibility).
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendNamespaceReplicationInducingAPIsRPS = "frontend.rps.namespaceReplicationInducingAPIs"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems not that useful. the point is it's a global resource. so we really want one "global" total limit (automatically divided across frontends) and one global per-namespace limit that's set lower (to prevent any one ns from hogging the entire global limit)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I'll remove this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but now there's no overall limit, there's only per-namespace limits. we need both if we want this to be effective.

(the naming is bad.. in this context "global" means "across all frontends", not "across all namespaces". we want an "overall global" i.e. "across all namespaces + across all frontends" limit.)

// FrontendMaxNamespaceRPSPerInstance is workflow namespace rate limit per second
FrontendMaxNamespaceRPSPerInstance = "frontend.namespaceRPS"
// FrontendMaxNamespaceBurstPerInstance is workflow namespace burst limit
Expand All @@ -226,9 +230,17 @@ const (
// FrontendMaxNamespaceVisibilityRPSPerInstance is namespace rate limit per second for visibility APIs.
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendMaxNamespaceVisibilityRPSPerInstance = "frontend.namespaceRPS.visibility"
// FrontendMaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance is a per host/per namespace RPS limit for
// namespace replication inducing APIs (e.g. UpdateNamespace, UpdateWorkerBuildIdCompatibility).
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendMaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance = "frontend.namespaceRPS.namespaceReplicationInducingAPIs"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the overall plan is to deprecate the non-global ones and keep only the global ones, since they're easier to configure (don't have to think about how many instances you have, they divide the rate as it gets scaled). so for new keys, we could just do a global one

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I thought the global one was still experimental.
Let's leave it for now and remove once global becomes the default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we could do it now and have less work to do later? there's no backwards compatibility to worry about

// FrontendMaxNamespaceVisibilityBurstPerInstance is namespace burst limit for visibility APIs.
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendMaxNamespaceVisibilityBurstPerInstance = "frontend.namespaceBurst.visibility"
// FrontendMaxNamespaceNamespaceReplicationInducingAPIsBurstPerInstance is a per host/per namespace burst limit for
// namespace replication inducing APIs (e.g. UpdateNamespace, UpdateWorkerBuildIdCompatibility).
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendMaxNamespaceNamespaceReplicationInducingAPIsBurstPerInstance = "frontend.namespaceBurst.namespaceReplicationInducingAPIs"
// FrontendGlobalNamespaceRPS is workflow namespace rate limit per second for the whole cluster.
// The limit is evenly distributed among available frontend service instances.
// If this is set, it overwrites per instance limit "frontend.namespaceRPS".
Expand All @@ -241,6 +253,13 @@ const (
// If this is set, it overwrites per instance limit "frontend.namespaceRPS.visibility".
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendGlobalNamespaceVisibilityRPS = "frontend.globalNamespaceRPS.visibility"
// FrontendGlobalNamespaceNamespaceReplicationInducingAPIsRPS is a cluster global, per namespace RPS limit for
// namespace replication inducing APIs (e.g. UpdateNamespace, UpdateWorkerBuildIdCompatibility).
// The limit is evenly distributed among available frontend service instances.
// If this is set, it overwrites the per instance limit configured with
// "frontend.namespaceRPS.namespaceReplicationInducingAPIs".
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendGlobalNamespaceNamespaceReplicationInducingAPIsRPS = "frontend.globalNamespaceRPS.namespaceReplicationInducingAPIs"
// InternalFrontendGlobalNamespaceVisibilityRPS is workflow namespace rate limit per second
// across all internal-frontends.
// This config is EXPERIMENTAL and may be changed or removed in a later release.
Expand Down
32 changes: 30 additions & 2 deletions service/frontend/configs/quotas.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ var (
"PollActivityTaskQueue": 2,
"GetWorkflowExecutionHistoryReverse": 2,
"GetWorkerBuildIdCompatibility": 2,
"UpdateWorkerBuildIdCompatibility": 2,
"GetWorkerTaskReachability": 2,
"DeleteWorkflowExecution": 2,

Expand All @@ -95,13 +94,22 @@ var (

VisibilityAPIPrioritiesOrdered = []int{0}

// Special rate limiting for APIs that may insert replication tasks into a namespace replication queue.
// The replication queue is used to propagate critical failover messages and this mapping prevents flooding the
// queue and delaying failover.
NamespaceReplicationInducingAPIToPriority = map[string]int{
"UpdateNamespace": 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about RegisterNamespace?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't care about that one, it's called once per namespace so it can't flood the per namespace replication queue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? there is no per-namespace queue, there's only one queue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was wrong.

"UpdateWorkerBuildIdCompatibility": 1,
}

NamespaceReplicationInducingAPIPrioritiesOrdered = []int{0, 1}

OtherAPIToPriority = map[string]int{
"GetClusterInfo": 0,
"GetSystemInfo": 0,
"GetSearchAttributes": 0,

"RegisterNamespace": 0,
"UpdateNamespace": 0,
"DescribeNamespace": 0,
"ListNamespaces": 0,
"DeprecateNamespace": 0,
Expand Down Expand Up @@ -157,12 +165,14 @@ func (c *NamespaceRateBurstImpl) Burst() int {
func NewRequestToRateLimiter(
executionRateBurstFn quotas.RateBurst,
visibilityRateBurstFn quotas.RateBurst,
namespaceReplicationInducingRateBurstFn quotas.RateBurst,
otherRateBurstFn quotas.RateBurst,
) quotas.RequestRateLimiter {
mapping := make(map[string]quotas.RequestRateLimiter)

executionRateLimiter := NewExecutionPriorityRateLimiter(executionRateBurstFn)
visibilityRateLimiter := NewVisibilityPriorityRateLimiter(visibilityRateBurstFn)
namespaceReplicationInducingRateLimiter := NewNamespaceReplicationInducingAPIPriorityRateLimiter(namespaceReplicationInducingRateBurstFn)
otherRateLimiter := NewOtherAPIPriorityRateLimiter(otherRateBurstFn)

for api := range ExecutionAPIToPriority {
Expand All @@ -171,6 +181,9 @@ func NewRequestToRateLimiter(
for api := range VisibilityAPIToPriority {
mapping[api] = visibilityRateLimiter
}
for api := range NamespaceReplicationInducingAPIToPriority {
mapping[api] = namespaceReplicationInducingRateLimiter
}
for api := range OtherAPIToPriority {
mapping[api] = otherRateLimiter
}
Expand Down Expand Up @@ -208,6 +221,21 @@ func NewVisibilityPriorityRateLimiter(
}, rateLimiters)
}

func NewNamespaceReplicationInducingAPIPriorityRateLimiter(
rateBurstFn quotas.RateBurst,
) quotas.RequestRateLimiter {
rateLimiters := make(map[int]quotas.RequestRateLimiter)
for priority := range NamespaceReplicationInducingAPIPrioritiesOrdered {
rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute))
}
return quotas.NewPriorityRateLimiter(func(req quotas.Request) int {
if priority, ok := NamespaceReplicationInducingAPIToPriority[req.API]; ok {
return priority
}
return NamespaceReplicationInducingAPIPrioritiesOrdered[len(NamespaceReplicationInducingAPIPrioritiesOrdered)-1]
}, rateLimiters)
}

func NewOtherAPIPriorityRateLimiter(
rateBurstFn quotas.RateBurst,
) quotas.RequestRateLimiter {
Expand Down
56 changes: 44 additions & 12 deletions service/frontend/configs/quotas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ func (s *quotasSuite) TestVisibilityAPIToPriorityMapping() {
}
}

func (s *quotasSuite) TestNamespaceReplicationInducingAPIToPriorityMapping() {
for _, priority := range NamespaceReplicationInducingAPIToPriority {
index := slices.Index(NamespaceReplicationInducingAPIPrioritiesOrdered, priority)
s.NotEqual(-1, index)
}
}

func (s *quotasSuite) TestOtherAPIToPriorityMapping() {
for _, priority := range OtherAPIToPriority {
index := slices.Index(OtherAPIPrioritiesOrdered, priority)
Expand All @@ -92,6 +99,12 @@ func (s *quotasSuite) TestVisibilityAPIPrioritiesOrdered() {
}
}

func (s *quotasSuite) TestNamespaceReplicationInducingAPIPrioritiesOrdered() {
for idx := range NamespaceReplicationInducingAPIPrioritiesOrdered[1:] {
s.True(NamespaceReplicationInducingAPIPrioritiesOrdered[idx] < NamespaceReplicationInducingAPIPrioritiesOrdered[idx+1])
}
}

func (s *quotasSuite) TestOtherAPIPrioritiesOrdered() {
for idx := range OtherAPIPrioritiesOrdered[1:] {
s.True(OtherAPIPrioritiesOrdered[idx] < OtherAPIPrioritiesOrdered[idx+1])
Expand Down Expand Up @@ -120,17 +133,16 @@ func (s *quotasSuite) TestExecutionAPIs() {
"RespondActivityTaskCompletedById": {},
"RespondWorkflowTaskCompleted": {},

"ResetWorkflowExecution": {},
"DescribeWorkflowExecution": {},
"RespondWorkflowTaskFailed": {},
"QueryWorkflow": {},
"RespondQueryTaskCompleted": {},
"PollWorkflowTaskQueue": {},
"PollActivityTaskQueue": {},
"GetWorkerBuildIdCompatibility": {},
"UpdateWorkerBuildIdCompatibility": {},
"GetWorkerTaskReachability": {},
"DeleteWorkflowExecution": {},
"ResetWorkflowExecution": {},
"DescribeWorkflowExecution": {},
"RespondWorkflowTaskFailed": {},
"QueryWorkflow": {},
"RespondQueryTaskCompleted": {},
"PollWorkflowTaskQueue": {},
"PollActivityTaskQueue": {},
"GetWorkerBuildIdCompatibility": {},
"GetWorkerTaskReachability": {},
"DeleteWorkflowExecution": {},

"ResetStickyTaskQueue": {},
"DescribeTaskQueue": {},
Expand Down Expand Up @@ -172,14 +184,31 @@ func (s *quotasSuite) TestVisibilityAPIs() {
s.Equal(apiToPriority, VisibilityAPIToPriority)
}

func (s *quotasSuite) TestNamespaceReplicationInducingAPIs() {
apis := map[string]struct{}{
"UpdateWorkerBuildIdCompatibility": {},
"UpdateNamespace": {},
}

var service workflowservice.WorkflowServiceServer
t := reflect.TypeOf(&service).Elem()
apiToPriority := make(map[string]int, t.NumMethod())
for i := 0; i < t.NumMethod(); i++ {
apiName := t.Method(i).Name
if _, ok := apis[apiName]; ok {
apiToPriority[apiName] = NamespaceReplicationInducingAPIToPriority[apiName]
}
}
s.Equal(apiToPriority, NamespaceReplicationInducingAPIToPriority)
}

func (s *quotasSuite) TestOtherAPIs() {
apis := map[string]struct{}{
"GetClusterInfo": {},
"GetSystemInfo": {},
"GetSearchAttributes": {},

"RegisterNamespace": {},
"UpdateNamespace": {},
"DescribeNamespace": {},
"ListNamespaces": {},
"DeprecateNamespace": {},
Expand Down Expand Up @@ -225,6 +254,9 @@ func (s *quotasSuite) TestAllAPIs() {
for api := range VisibilityAPIToPriority {
actualAPIs[api] = struct{}{}
}
for api := range NamespaceReplicationInducingAPIToPriority {
actualAPIs[api] = struct{}{}
}
for api := range OtherAPIToPriority {
actualAPIs[api] = struct{}{}
}
Expand Down
17 changes: 16 additions & 1 deletion service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,13 @@ func RateLimitInterceptorProvider(
serviceConfig *Config,
) *interceptor.RateLimitInterceptor {
rateFn := func() float64 { return float64(serviceConfig.RPS()) }
namespaceReplicationInducingRateFn := func() float64 { return float64(serviceConfig.NamespaceReplicationInducingAPIsRPS()) }

return interceptor.NewRateLimitInterceptor(
configs.NewRequestToRateLimiter(
quotas.NewDefaultIncomingRateLimiter(rateFn),
quotas.NewDefaultIncomingRateLimiter(rateFn),
quotas.NewDefaultIncomingRateLimiter(namespaceReplicationInducingRateFn),
quotas.NewDefaultIncomingRateLimiter(rateFn),
),
map[string]int{},
Expand All @@ -304,15 +307,18 @@ func NamespaceRateLimitInterceptorProvider(
namespaceRegistry namespace.Registry,
frontendServiceResolver membership.ServiceResolver,
) *interceptor.NamespaceRateLimitInterceptor {
var globalNamespaceRPS, globalNamespaceVisibilityRPS dynamicconfig.IntPropertyFnWithNamespaceFilter
var globalNamespaceRPS, globalNamespaceVisibilityRPS, globalNamespaceNamespaceReplicationInducingAPIsRPS dynamicconfig.IntPropertyFnWithNamespaceFilter

switch serviceName {
case primitives.FrontendService:
globalNamespaceRPS = serviceConfig.GlobalNamespaceRPS
globalNamespaceVisibilityRPS = serviceConfig.GlobalNamespaceVisibilityRPS
globalNamespaceNamespaceReplicationInducingAPIsRPS = serviceConfig.GlobalNamespaceNamespaceReplicationInducingAPIsRPS
case primitives.InternalFrontendService:
globalNamespaceRPS = serviceConfig.InternalFEGlobalNamespaceRPS
globalNamespaceVisibilityRPS = serviceConfig.InternalFEGlobalNamespaceVisibilityRPS
// Internal frontend has no special limit for this set of APIs
globalNamespaceNamespaceReplicationInducingAPIsRPS = serviceConfig.InternalFEGlobalNamespaceRPS
default:
panic("invalid service name")
}
Expand All @@ -334,11 +340,20 @@ func NamespaceRateLimitInterceptorProvider(
namespace,
)
}
namespaceReplicationInducingRateFn := func(ns string) float64 {
return namespaceRPS(
serviceConfig.MaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance,
globalNamespaceNamespaceReplicationInducingAPIsRPS,
frontendServiceResolver,
ns,
)
}
namespaceRateLimiter := quotas.NewNamespaceRequestRateLimiter(
func(req quotas.Request) quotas.RequestRateLimiter {
return configs.NewRequestToRateLimiter(
configs.NewNamespaceRateBurst(req.Caller, rateFn, serviceConfig.MaxNamespaceBurstPerInstance),
configs.NewNamespaceRateBurst(req.Caller, visibilityRateFn, serviceConfig.MaxNamespaceVisibilityBurstPerInstance),
configs.NewNamespaceRateBurst(req.Caller, namespaceReplicationInducingRateFn, serviceConfig.MaxNamespaceNamespaceReplicationInducingAPIsBurstPerInstance),
configs.NewNamespaceRateBurst(req.Caller, rateFn, serviceConfig.MaxNamespaceBurstPerInstance),
)
},
Expand Down
Loading