Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The ratelimiter needs to be created with the domain name not the ID #5644

Merged
merged 4 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions service/history/workflowcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ type wfCache struct {
domainCache cache.DomainCache
metricsClient metrics.Client
logger log.Logger
getCacheItemFn func(domainID string, workflowID string) (*cacheValue, error)
getCacheItemFn func(domainName string, workflowID string) (*cacheValue, error)
}

type cacheKey struct {
domainID string
domainName string
workflowID string
}

Expand Down Expand Up @@ -126,7 +126,7 @@ func (c *wfCache) allow(domainID string, workflowID string, rateLimitType rateLi
UpdateGauge(metrics.WorkflowIDCacheSizeGauge, float64(c.lru.Size()))

// Locking is not needed because both getCacheItem and the rate limiter are thread safe
value, err := c.getCacheItemFn(domainID, workflowID)
value, err := c.getCacheItemFn(domainName, workflowID)
if err != nil {
c.logError(domainID, workflowID, err)
// If we can't get the cache item, we should allow the request through
Expand Down Expand Up @@ -174,10 +174,10 @@ func (c *wfCache) AllowInternal(domainID string, workflowID string) bool {
return c.allow(domainID, workflowID, internal)
}

func (c *wfCache) getCacheItem(domainID string, workflowID string) (*cacheValue, error) {
func (c *wfCache) getCacheItem(domainName string, workflowID string) (*cacheValue, error) {
// The underlying lru cache is thread safe, so there is no need to lock
key := cacheKey{
domainID: domainID,
domainName: domainName,
workflowID: workflowID,
}

Expand All @@ -188,8 +188,8 @@ func (c *wfCache) getCacheItem(domainID string, workflowID string) (*cacheValue,
}

value = &cacheValue{
externalRateLimiter: c.externalLimiterFactory.GetLimiter(domainID),
internalRateLimiter: c.internalLimiterFactory.GetLimiter(domainID),
externalRateLimiter: c.externalLimiterFactory.GetLimiter(domainName),
internalRateLimiter: c.internalLimiterFactory.GetLimiter(domainName),
}
// PutIfNotExist is thread safe, and will either return the value that was already in the cache or the value we just created
// another thread might have inserted a value between the Get and PutIfNotExist, but that is ok
Expand Down
18 changes: 9 additions & 9 deletions service/history/workflowcache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ func TestWfCache_AllowSingleWorkflow(t *testing.T) {
externalLimiter.EXPECT().Allow().Return(false).Times(1)

externalLimiterFactory := quotas.NewMockLimiterFactory(ctrl)
externalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(externalLimiter).Times(1)
externalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(externalLimiter).Times(1)

// The internal rate limiter will allow the second request, but not the first.
internalLimiter := quotas.NewMockLimiter(ctrl)
internalLimiter.EXPECT().Allow().Return(false).Times(1)
internalLimiter.EXPECT().Allow().Return(true).Times(1)

internalLimiterFactory := quotas.NewMockLimiterFactory(ctrl)
internalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(internalLimiter).Times(1)
internalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(internalLimiter).Times(1)

wfCache := New(Params{
// The cache TTL is set to 1 minute, so all requests will hit the cache
Expand Down Expand Up @@ -103,16 +103,16 @@ func TestWfCache_AllowMultipleWorkflow(t *testing.T) {
externalLimiterWf2.EXPECT().Allow().Return(true).Times(1)

externalLimiterFactory := quotas.NewMockLimiterFactory(ctrl)
externalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(externalLimiterWf1).Times(1)
externalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(externalLimiterWf2).Times(1)
externalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(externalLimiterWf1).Times(1)
externalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(externalLimiterWf2).Times(1)

// We do not expect calls to the internal rate limiters, but they will still be created.
internalLimiterWf1 := quotas.NewMockLimiter(ctrl)
internalLimiterWf2 := quotas.NewMockLimiter(ctrl)

internalLimiterFactory := quotas.NewMockLimiterFactory(ctrl)
internalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(internalLimiterWf1).Times(1)
internalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(internalLimiterWf2).Times(1)
internalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(internalLimiterWf1).Times(1)
internalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(internalLimiterWf2).Times(1)

wfCache := New(Params{
TTL: time.Minute,
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestWfCache_AllowError(t *testing.T) {
}).(*wfCache)

// We set getCacheItemFn to a function that will return an error so that we can test the error logic
wfCache.getCacheItemFn = func(domainID string, workflowID string) (*cacheValue, error) {
wfCache.getCacheItemFn = func(domainName string, workflowID string) (*cacheValue, error) {
return nil, assert.AnError
}

Expand Down Expand Up @@ -260,14 +260,14 @@ func TestWfCache_RejectLog(t *testing.T) {
externalLimiter.EXPECT().Allow().Return(false).Times(1)

externalLimiterFactory := quotas.NewMockLimiterFactory(ctrl)
externalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(externalLimiter).Times(1)
externalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(externalLimiter).Times(1)

// The internal rate limiter will reject
internalLimiter := quotas.NewMockLimiter(ctrl)
internalLimiter.EXPECT().Allow().Return(false).Times(1)

internalLimiterFactory := quotas.NewMockLimiterFactory(ctrl)
internalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(internalLimiter).Times(1)
internalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(internalLimiter).Times(1)

// Setup the mock logger
logger := new(log.MockLogger)
Expand Down