Skip to content

Commit

Permalink
Use dynamic rate limiter for workflow handler (#2213)
Browse files Browse the repository at this point in the history
Update the rate limiter to take in an RPSFunc to handle changing RPS information
  • Loading branch information
shreyassrivatsan authored Jul 16, 2019
1 parent 4f19f6b commit 8982453
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 16 deletions.
3 changes: 3 additions & 0 deletions common/quotas/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

package quotas

// RPSFunc returns a float64 as the RPS
type RPSFunc func() float64

// Policy corresponds to a quota policy. A policy allows implementing layered
// and more complex rate limiting functionality.
type Policy interface {
Expand Down
20 changes: 20 additions & 0 deletions common/quotas/ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"golang.org/x/time/rate"
)

const _defaultRPSTTL = 60 * time.Second

// RateLimiter is a wrapper around the golang rate limiter handling dynamic
// configuration updates of the max dispatch per second. This has comparable
// performance to the token bucket rate limiter.
Expand Down Expand Up @@ -123,3 +125,21 @@ func (rl *RateLimiter) shouldUpdate(maxDispatchPerSecond *float64) bool {
return *maxDispatchPerSecond < *rl.maxDispatchPerSecond
}
}

type dynamicRateLimiter struct {
rps RPSFunc
rl *RateLimiter
}

// NewDynamicRateLimiter returns a rate limiter which handles dynamic config
func NewDynamicRateLimiter(rps RPSFunc) Policy {
initialRps := rps()
rl := NewRateLimiter(&initialRps, _defaultRPSTTL, int(rps()))
return &dynamicRateLimiter{rps, rl}
}

func (d *dynamicRateLimiter) Allow() bool {
rps := float64(d.rps())
d.rl.UpdateMaxDispatch(&rps)
return d.rl.Allow()
}
13 changes: 0 additions & 13 deletions common/tokenbucket/tb.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,6 @@ func newTokenBucket(rps int, timeSource clock.TimeSource) *tokenBucketImpl {
return tb
}

// NewFactory creates an instance of factory used for creating TokenBucket instances
func NewFactory() Factory {
return &tokenBucketFactoryImpl{}
}

// CreateTokenBucket creates and returns a
// new token bucket rate limiter that
// repelenishes the bucket every 100
// milliseconds. Thread safe.
func (f *tokenBucketFactoryImpl) CreateTokenBucket(rps int, timeSource clock.TimeSource) TokenBucket {
return New(rps, timeSource)
}

func (tb *tokenBucketImpl) TryConsume(count int) (bool, time.Duration) {
now := tb.timeSource.Now().UnixNano()
tb.Lock()
Expand Down
6 changes: 3 additions & 3 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/uber/cadence/common/blobstore"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/client"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/elasticsearch/validator"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand All @@ -53,7 +52,6 @@ import (
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/tokenbucket"
warchiver "github.com/uber/cadence/service/worker/archiver"
"go.uber.org/yarpc/yarpcerrors"
)
Expand Down Expand Up @@ -165,7 +163,9 @@ func NewWorkflowHandler(sVice service.Service, config *Config, metadataMgr persi
tokenSerializer: common.NewJSONTaskTokenSerializer(),
metricsClient: sVice.GetMetricsClient(),
domainCache: cache.NewDomainCache(metadataMgr, sVice.GetClusterMetadata(), sVice.GetMetricsClient(), sVice.GetLogger()),
rateLimiter: quotas.NewSimpleRateLimiter(tokenbucket.NewDynamicTokenBucket(config.RPS, clock.NewRealTimeSource())),
rateLimiter: quotas.NewDynamicRateLimiter(func() float64 {
return float64(config.RPS())
}),
blobstoreClient: blobstoreClient,
versionChecker: &versionChecker{checkVersion: config.EnableClientVersionCheck()},
domainHandler: newDomainHandler(
Expand Down

0 comments on commit 8982453

Please sign in to comment.