Skip to content

Commit

Permalink
The ratelimiter needs to be created with the domain name not the ID (#…
Browse files Browse the repository at this point in the history
…5644)

What changed?
The ratelimiter is per domain name not id, fixed this, so we can do the filtering correctly

Why?
Because we need to filter on the correct value

How did you test it?
Tested locally, and updated the unit tests

Potential risks

Release notes

Documentation Changes
  • Loading branch information
jakobht authored Feb 7, 2024
1 parent a7d504a commit be20d12
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
14 changes: 7 additions & 7 deletions service/history/workflowcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ type wfCache struct {
domainCache cache.DomainCache
metricsClient metrics.Client
logger log.Logger
getCacheItemFn func(domainID string, workflowID string) (*cacheValue, error)
getCacheItemFn func(domainName string, workflowID string) (*cacheValue, error)
}

type cacheKey struct {
domainID string
domainName string
workflowID string
}

Expand Down Expand Up @@ -126,7 +126,7 @@ func (c *wfCache) allow(domainID string, workflowID string, rateLimitType rateLi
UpdateGauge(metrics.WorkflowIDCacheSizeGauge, float64(c.lru.Size()))

// Locking is not needed because both getCacheItem and the rate limiter are thread safe
value, err := c.getCacheItemFn(domainID, workflowID)
value, err := c.getCacheItemFn(domainName, workflowID)
if err != nil {
c.logError(domainID, workflowID, err)
// If we can't get the cache item, we should allow the request through
Expand Down Expand Up @@ -174,10 +174,10 @@ func (c *wfCache) AllowInternal(domainID string, workflowID string) bool {
return c.allow(domainID, workflowID, internal)
}

func (c *wfCache) getCacheItem(domainID string, workflowID string) (*cacheValue, error) {
func (c *wfCache) getCacheItem(domainName string, workflowID string) (*cacheValue, error) {
// The underlying lru cache is thread safe, so there is no need to lock
key := cacheKey{
domainID: domainID,
domainName: domainName,
workflowID: workflowID,
}

Expand All @@ -188,8 +188,8 @@ func (c *wfCache) getCacheItem(domainID string, workflowID string) (*cacheValue,
}

value = &cacheValue{
externalRateLimiter: c.externalLimiterFactory.GetLimiter(domainID),
internalRateLimiter: c.internalLimiterFactory.GetLimiter(domainID),
externalRateLimiter: c.externalLimiterFactory.GetLimiter(domainName),
internalRateLimiter: c.internalLimiterFactory.GetLimiter(domainName),
}
// PutIfNotExist is thread safe, and will either return the value that was already in the cache or the value we just created
// another thread might have inserted a value between the Get and PutIfNotExist, but that is ok
Expand Down
18 changes: 9 additions & 9 deletions service/history/workflowcache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ func TestWfCache_AllowSingleWorkflow(t *testing.T) {
externalLimiter.EXPECT().Allow().Return(false).Times(1)

externalLimiterFactory := quotas.NewMockLimiterFactory(ctrl)
externalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(externalLimiter).Times(1)
externalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(externalLimiter).Times(1)

// The internal rate limiter will allow the second request, but not the first.
internalLimiter := quotas.NewMockLimiter(ctrl)
internalLimiter.EXPECT().Allow().Return(false).Times(1)
internalLimiter.EXPECT().Allow().Return(true).Times(1)

internalLimiterFactory := quotas.NewMockLimiterFactory(ctrl)
internalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(internalLimiter).Times(1)
internalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(internalLimiter).Times(1)

wfCache := New(Params{
// The cache TTL is set to 1 minute, so all requests will hit the cache
Expand Down Expand Up @@ -103,16 +103,16 @@ func TestWfCache_AllowMultipleWorkflow(t *testing.T) {
externalLimiterWf2.EXPECT().Allow().Return(true).Times(1)

externalLimiterFactory := quotas.NewMockLimiterFactory(ctrl)
externalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(externalLimiterWf1).Times(1)
externalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(externalLimiterWf2).Times(1)
externalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(externalLimiterWf1).Times(1)
externalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(externalLimiterWf2).Times(1)

// We do not expect calls to the internal rate limiters, but they will still be created.
internalLimiterWf1 := quotas.NewMockLimiter(ctrl)
internalLimiterWf2 := quotas.NewMockLimiter(ctrl)

internalLimiterFactory := quotas.NewMockLimiterFactory(ctrl)
internalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(internalLimiterWf1).Times(1)
internalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(internalLimiterWf2).Times(1)
internalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(internalLimiterWf1).Times(1)
internalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(internalLimiterWf2).Times(1)

wfCache := New(Params{
TTL: time.Minute,
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestWfCache_AllowError(t *testing.T) {
}).(*wfCache)

// We set getCacheItemFn to a function that will return an error so that we can test the error logic
wfCache.getCacheItemFn = func(domainID string, workflowID string) (*cacheValue, error) {
wfCache.getCacheItemFn = func(domainName string, workflowID string) (*cacheValue, error) {
return nil, assert.AnError
}

Expand Down Expand Up @@ -260,14 +260,14 @@ func TestWfCache_RejectLog(t *testing.T) {
externalLimiter.EXPECT().Allow().Return(false).Times(1)

externalLimiterFactory := quotas.NewMockLimiterFactory(ctrl)
externalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(externalLimiter).Times(1)
externalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(externalLimiter).Times(1)

// The internal rate limiter will reject
internalLimiter := quotas.NewMockLimiter(ctrl)
internalLimiter.EXPECT().Allow().Return(false).Times(1)

internalLimiterFactory := quotas.NewMockLimiterFactory(ctrl)
internalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(internalLimiter).Times(1)
internalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(internalLimiter).Times(1)

// Setup the mock logger
logger := new(log.MockLogger)
Expand Down

0 comments on commit be20d12

Please sign in to comment.