Skip to content

Commit

Permalink
Add special rate limiter for namespace replication inducing APIs (#4455)
Browse files Browse the repository at this point in the history
  • Loading branch information
bergundy authored and deepakkarki committed Jun 14, 2023
1 parent 65300d5 commit eebed31
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 45 deletions.
19 changes: 19 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ const (
FrontendHistoryMaxPageSize = "frontend.historyMaxPageSize"
// FrontendRPS is workflow rate limit per second
FrontendRPS = "frontend.rps"
// FrontendNamespaceReplicationInducingAPIsRPS limits the per second request rate for namespace replication inducing
// APIs (e.g. UpdateNamespace, UpdateWorkerBuildIdCompatibility).
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendNamespaceReplicationInducingAPIsRPS = "frontend.rps.namespaceReplicationInducingAPIs"
// FrontendMaxNamespaceRPSPerInstance is workflow namespace rate limit per second
FrontendMaxNamespaceRPSPerInstance = "frontend.namespaceRPS"
// FrontendMaxNamespaceBurstPerInstance is workflow namespace burst limit
Expand All @@ -226,9 +230,17 @@ const (
// FrontendMaxNamespaceVisibilityRPSPerInstance is namespace rate limit per second for visibility APIs.
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendMaxNamespaceVisibilityRPSPerInstance = "frontend.namespaceRPS.visibility"
// FrontendMaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance is a per host/per namespace RPS limit for
// namespace replication inducing APIs (e.g. UpdateNamespace, UpdateWorkerBuildIdCompatibility).
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendMaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance = "frontend.namespaceRPS.namespaceReplicationInducingAPIs"
// FrontendMaxNamespaceVisibilityBurstPerInstance is namespace burst limit for visibility APIs.
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendMaxNamespaceVisibilityBurstPerInstance = "frontend.namespaceBurst.visibility"
// FrontendMaxNamespaceNamespaceReplicationInducingAPIsBurstPerInstance is a per host/per namespace burst limit for
// namespace replication inducing APIs (e.g. UpdateNamespace, UpdateWorkerBuildIdCompatibility).
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendMaxNamespaceNamespaceReplicationInducingAPIsBurstPerInstance = "frontend.namespaceBurst.namespaceReplicationInducingAPIs"
// FrontendGlobalNamespaceRPS is workflow namespace rate limit per second for the whole cluster.
// The limit is evenly distributed among available frontend service instances.
// If this is set, it overwrites per instance limit "frontend.namespaceRPS".
Expand All @@ -241,6 +253,13 @@ const (
// If this is set, it overwrites per instance limit "frontend.namespaceRPS.visibility".
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendGlobalNamespaceVisibilityRPS = "frontend.globalNamespaceRPS.visibility"
// FrontendGlobalNamespaceNamespaceReplicationInducingAPIsRPS is a cluster global, per namespace RPS limit for
// namespace replication inducing APIs (e.g. UpdateNamespace, UpdateWorkerBuildIdCompatibility).
// The limit is evenly distributed among available frontend service instances.
// If this is set, it overwrites the per instance limit configured with
// "frontend.namespaceRPS.namespaceReplicationInducingAPIs".
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendGlobalNamespaceNamespaceReplicationInducingAPIsRPS = "frontend.globalNamespaceRPS.namespaceReplicationInducingAPIs"
// InternalFrontendGlobalNamespaceVisibilityRPS is workflow namespace rate limit per second
// across all internal-frontends.
// This config is EXPERIMENTAL and may be changed or removed in a later release.
Expand Down
32 changes: 30 additions & 2 deletions service/frontend/configs/quotas.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ var (
"PollActivityTaskQueue": 2,
"GetWorkflowExecutionHistoryReverse": 2,
"GetWorkerBuildIdCompatibility": 2,
"UpdateWorkerBuildIdCompatibility": 2,
"GetWorkerTaskReachability": 2,
"DeleteWorkflowExecution": 2,

Expand All @@ -95,13 +94,22 @@ var (

VisibilityAPIPrioritiesOrdered = []int{0}

// Special rate limiting for APIs that may insert replication tasks into a namespace replication queue.
// The replication queue is used to propagate critical failover messages and this mapping prevents flooding the
// queue and delaying failover.
NamespaceReplicationInducingAPIToPriority = map[string]int{
"UpdateNamespace": 0,
"UpdateWorkerBuildIdCompatibility": 1,
}

NamespaceReplicationInducingAPIPrioritiesOrdered = []int{0, 1}

OtherAPIToPriority = map[string]int{
"GetClusterInfo": 0,
"GetSystemInfo": 0,
"GetSearchAttributes": 0,

"RegisterNamespace": 0,
"UpdateNamespace": 0,
"DescribeNamespace": 0,
"ListNamespaces": 0,
"DeprecateNamespace": 0,
Expand Down Expand Up @@ -157,12 +165,14 @@ func (c *NamespaceRateBurstImpl) Burst() int {
func NewRequestToRateLimiter(
executionRateBurstFn quotas.RateBurst,
visibilityRateBurstFn quotas.RateBurst,
namespaceReplicationInducingRateBurstFn quotas.RateBurst,
otherRateBurstFn quotas.RateBurst,
) quotas.RequestRateLimiter {
mapping := make(map[string]quotas.RequestRateLimiter)

executionRateLimiter := NewExecutionPriorityRateLimiter(executionRateBurstFn)
visibilityRateLimiter := NewVisibilityPriorityRateLimiter(visibilityRateBurstFn)
namespaceReplicationInducingRateLimiter := NewNamespaceReplicationInducingAPIPriorityRateLimiter(namespaceReplicationInducingRateBurstFn)
otherRateLimiter := NewOtherAPIPriorityRateLimiter(otherRateBurstFn)

for api := range ExecutionAPIToPriority {
Expand All @@ -171,6 +181,9 @@ func NewRequestToRateLimiter(
for api := range VisibilityAPIToPriority {
mapping[api] = visibilityRateLimiter
}
for api := range NamespaceReplicationInducingAPIToPriority {
mapping[api] = namespaceReplicationInducingRateLimiter
}
for api := range OtherAPIToPriority {
mapping[api] = otherRateLimiter
}
Expand Down Expand Up @@ -208,6 +221,21 @@ func NewVisibilityPriorityRateLimiter(
}, rateLimiters)
}

func NewNamespaceReplicationInducingAPIPriorityRateLimiter(
rateBurstFn quotas.RateBurst,
) quotas.RequestRateLimiter {
rateLimiters := make(map[int]quotas.RequestRateLimiter)
for priority := range NamespaceReplicationInducingAPIPrioritiesOrdered {
rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute))
}
return quotas.NewPriorityRateLimiter(func(req quotas.Request) int {
if priority, ok := NamespaceReplicationInducingAPIToPriority[req.API]; ok {
return priority
}
return NamespaceReplicationInducingAPIPrioritiesOrdered[len(NamespaceReplicationInducingAPIPrioritiesOrdered)-1]
}, rateLimiters)
}

func NewOtherAPIPriorityRateLimiter(
rateBurstFn quotas.RateBurst,
) quotas.RequestRateLimiter {
Expand Down
56 changes: 44 additions & 12 deletions service/frontend/configs/quotas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ func (s *quotasSuite) TestVisibilityAPIToPriorityMapping() {
}
}

func (s *quotasSuite) TestNamespaceReplicationInducingAPIToPriorityMapping() {
for _, priority := range NamespaceReplicationInducingAPIToPriority {
index := slices.Index(NamespaceReplicationInducingAPIPrioritiesOrdered, priority)
s.NotEqual(-1, index)
}
}

func (s *quotasSuite) TestOtherAPIToPriorityMapping() {
for _, priority := range OtherAPIToPriority {
index := slices.Index(OtherAPIPrioritiesOrdered, priority)
Expand All @@ -92,6 +99,12 @@ func (s *quotasSuite) TestVisibilityAPIPrioritiesOrdered() {
}
}

func (s *quotasSuite) TestNamespaceReplicationInducingAPIPrioritiesOrdered() {
for idx := range NamespaceReplicationInducingAPIPrioritiesOrdered[1:] {
s.True(NamespaceReplicationInducingAPIPrioritiesOrdered[idx] < NamespaceReplicationInducingAPIPrioritiesOrdered[idx+1])
}
}

func (s *quotasSuite) TestOtherAPIPrioritiesOrdered() {
for idx := range OtherAPIPrioritiesOrdered[1:] {
s.True(OtherAPIPrioritiesOrdered[idx] < OtherAPIPrioritiesOrdered[idx+1])
Expand Down Expand Up @@ -120,17 +133,16 @@ func (s *quotasSuite) TestExecutionAPIs() {
"RespondActivityTaskCompletedById": {},
"RespondWorkflowTaskCompleted": {},

"ResetWorkflowExecution": {},
"DescribeWorkflowExecution": {},
"RespondWorkflowTaskFailed": {},
"QueryWorkflow": {},
"RespondQueryTaskCompleted": {},
"PollWorkflowTaskQueue": {},
"PollActivityTaskQueue": {},
"GetWorkerBuildIdCompatibility": {},
"UpdateWorkerBuildIdCompatibility": {},
"GetWorkerTaskReachability": {},
"DeleteWorkflowExecution": {},
"ResetWorkflowExecution": {},
"DescribeWorkflowExecution": {},
"RespondWorkflowTaskFailed": {},
"QueryWorkflow": {},
"RespondQueryTaskCompleted": {},
"PollWorkflowTaskQueue": {},
"PollActivityTaskQueue": {},
"GetWorkerBuildIdCompatibility": {},
"GetWorkerTaskReachability": {},
"DeleteWorkflowExecution": {},

"ResetStickyTaskQueue": {},
"DescribeTaskQueue": {},
Expand Down Expand Up @@ -172,14 +184,31 @@ func (s *quotasSuite) TestVisibilityAPIs() {
s.Equal(apiToPriority, VisibilityAPIToPriority)
}

func (s *quotasSuite) TestNamespaceReplicationInducingAPIs() {
apis := map[string]struct{}{
"UpdateWorkerBuildIdCompatibility": {},
"UpdateNamespace": {},
}

var service workflowservice.WorkflowServiceServer
t := reflect.TypeOf(&service).Elem()
apiToPriority := make(map[string]int, t.NumMethod())
for i := 0; i < t.NumMethod(); i++ {
apiName := t.Method(i).Name
if _, ok := apis[apiName]; ok {
apiToPriority[apiName] = NamespaceReplicationInducingAPIToPriority[apiName]
}
}
s.Equal(apiToPriority, NamespaceReplicationInducingAPIToPriority)
}

func (s *quotasSuite) TestOtherAPIs() {
apis := map[string]struct{}{
"GetClusterInfo": {},
"GetSystemInfo": {},
"GetSearchAttributes": {},

"RegisterNamespace": {},
"UpdateNamespace": {},
"DescribeNamespace": {},
"ListNamespaces": {},
"DeprecateNamespace": {},
Expand Down Expand Up @@ -225,6 +254,9 @@ func (s *quotasSuite) TestAllAPIs() {
for api := range VisibilityAPIToPriority {
actualAPIs[api] = struct{}{}
}
for api := range NamespaceReplicationInducingAPIToPriority {
actualAPIs[api] = struct{}{}
}
for api := range OtherAPIToPriority {
actualAPIs[api] = struct{}{}
}
Expand Down
17 changes: 16 additions & 1 deletion service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,13 @@ func RateLimitInterceptorProvider(
serviceConfig *Config,
) *interceptor.RateLimitInterceptor {
rateFn := func() float64 { return float64(serviceConfig.RPS()) }
namespaceReplicationInducingRateFn := func() float64 { return float64(serviceConfig.NamespaceReplicationInducingAPIsRPS()) }

return interceptor.NewRateLimitInterceptor(
configs.NewRequestToRateLimiter(
quotas.NewDefaultIncomingRateLimiter(rateFn),
quotas.NewDefaultIncomingRateLimiter(rateFn),
quotas.NewDefaultIncomingRateLimiter(namespaceReplicationInducingRateFn),
quotas.NewDefaultIncomingRateLimiter(rateFn),
),
map[string]int{},
Expand All @@ -304,15 +307,18 @@ func NamespaceRateLimitInterceptorProvider(
namespaceRegistry namespace.Registry,
frontendServiceResolver membership.ServiceResolver,
) *interceptor.NamespaceRateLimitInterceptor {
var globalNamespaceRPS, globalNamespaceVisibilityRPS dynamicconfig.IntPropertyFnWithNamespaceFilter
var globalNamespaceRPS, globalNamespaceVisibilityRPS, globalNamespaceNamespaceReplicationInducingAPIsRPS dynamicconfig.IntPropertyFnWithNamespaceFilter

switch serviceName {
case primitives.FrontendService:
globalNamespaceRPS = serviceConfig.GlobalNamespaceRPS
globalNamespaceVisibilityRPS = serviceConfig.GlobalNamespaceVisibilityRPS
globalNamespaceNamespaceReplicationInducingAPIsRPS = serviceConfig.GlobalNamespaceNamespaceReplicationInducingAPIsRPS
case primitives.InternalFrontendService:
globalNamespaceRPS = serviceConfig.InternalFEGlobalNamespaceRPS
globalNamespaceVisibilityRPS = serviceConfig.InternalFEGlobalNamespaceVisibilityRPS
// Internal frontend has no special limit for this set of APIs
globalNamespaceNamespaceReplicationInducingAPIsRPS = serviceConfig.InternalFEGlobalNamespaceRPS
default:
panic("invalid service name")
}
Expand All @@ -334,11 +340,20 @@ func NamespaceRateLimitInterceptorProvider(
namespace,
)
}
namespaceReplicationInducingRateFn := func(ns string) float64 {
return namespaceRPS(
serviceConfig.MaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance,
globalNamespaceNamespaceReplicationInducingAPIsRPS,
frontendServiceResolver,
ns,
)
}
namespaceRateLimiter := quotas.NewNamespaceRequestRateLimiter(
func(req quotas.Request) quotas.RequestRateLimiter {
return configs.NewRequestToRateLimiter(
configs.NewNamespaceRateBurst(req.Caller, rateFn, serviceConfig.MaxNamespaceBurstPerInstance),
configs.NewNamespaceRateBurst(req.Caller, visibilityRateFn, serviceConfig.MaxNamespaceVisibilityBurstPerInstance),
configs.NewNamespaceRateBurst(req.Caller, namespaceReplicationInducingRateFn, serviceConfig.MaxNamespaceNamespaceReplicationInducingAPIsBurstPerInstance),
configs.NewNamespaceRateBurst(req.Caller, rateFn, serviceConfig.MaxNamespaceBurstPerInstance),
)
},
Expand Down
Loading

0 comments on commit eebed31

Please sign in to comment.