From be20d12da1da4674949c0414b015f4b8c4ecbe98 Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Wed, 7 Feb 2024 11:50:55 +0100 Subject: [PATCH] The ratelimiter needs to be created with the domain name not the ID (#5644) What changed? The ratelimiter is per domain name not id, fixed this, so we can do the filtering correctly Why? Because we need to filter on the correct value How did you test it? Tested locally, and updated the unit tests Potential risks Release notes Documentation Changes --- service/history/workflowcache/cache.go | 14 +++++++------- service/history/workflowcache/cache_test.go | 18 +++++++++--------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/service/history/workflowcache/cache.go b/service/history/workflowcache/cache.go index 5ab60d70e15..97739277ed1 100644 --- a/service/history/workflowcache/cache.go +++ b/service/history/workflowcache/cache.go @@ -54,11 +54,11 @@ type wfCache struct { domainCache cache.DomainCache metricsClient metrics.Client logger log.Logger - getCacheItemFn func(domainID string, workflowID string) (*cacheValue, error) + getCacheItemFn func(domainName string, workflowID string) (*cacheValue, error) } type cacheKey struct { - domainID string + domainName string workflowID string } @@ -126,7 +126,7 @@ func (c *wfCache) allow(domainID string, workflowID string, rateLimitType rateLi UpdateGauge(metrics.WorkflowIDCacheSizeGauge, float64(c.lru.Size())) // Locking is not needed because both getCacheItem and the rate limiter are thread safe - value, err := c.getCacheItemFn(domainID, workflowID) + value, err := c.getCacheItemFn(domainName, workflowID) if err != nil { c.logError(domainID, workflowID, err) // If we can't get the cache item, we should allow the request through @@ -174,10 +174,10 @@ func (c *wfCache) AllowInternal(domainID string, workflowID string) bool { return c.allow(domainID, workflowID, internal) } -func (c *wfCache) getCacheItem(domainID string, workflowID string) (*cacheValue, error) { +func (c *wfCache) getCacheItem(domainName string, workflowID string) (*cacheValue, error) { // The underlying lru cache is thread safe, so there is no need to lock key := cacheKey{ - domainID: domainID, + domainName: domainName, workflowID: workflowID, } @@ -188,8 +188,8 @@ func (c *wfCache) getCacheItem(domainID string, workflowID string) (*cacheValue, } value = &cacheValue{ - externalRateLimiter: c.externalLimiterFactory.GetLimiter(domainID), - internalRateLimiter: c.internalLimiterFactory.GetLimiter(domainID), + externalRateLimiter: c.externalLimiterFactory.GetLimiter(domainName), + internalRateLimiter: c.internalLimiterFactory.GetLimiter(domainName), } // PutIfNotExist is thread safe, and will either return the value that was already in the cache or the value we just created // another thread might have inserted a value between the Get and PutIfNotExist, but that is ok diff --git a/service/history/workflowcache/cache_test.go b/service/history/workflowcache/cache_test.go index 1d12e05899f..3ea7e0cedd7 100644 --- a/service/history/workflowcache/cache_test.go +++ b/service/history/workflowcache/cache_test.go @@ -56,7 +56,7 @@ func TestWfCache_AllowSingleWorkflow(t *testing.T) { externalLimiter.EXPECT().Allow().Return(false).Times(1) externalLimiterFactory := quotas.NewMockLimiterFactory(ctrl) - externalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(externalLimiter).Times(1) + externalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(externalLimiter).Times(1) // The internal rate limiter will allow the second request, but not the first. internalLimiter := quotas.NewMockLimiter(ctrl) @@ -64,7 +64,7 @@ func TestWfCache_AllowSingleWorkflow(t *testing.T) { internalLimiter.EXPECT().Allow().Return(true).Times(1) internalLimiterFactory := quotas.NewMockLimiterFactory(ctrl) - internalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(internalLimiter).Times(1) + internalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(internalLimiter).Times(1) wfCache := New(Params{ // The cache TTL is set to 1 minute, so all requests will hit the cache @@ -103,16 +103,16 @@ func TestWfCache_AllowMultipleWorkflow(t *testing.T) { externalLimiterWf2.EXPECT().Allow().Return(true).Times(1) externalLimiterFactory := quotas.NewMockLimiterFactory(ctrl) - externalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(externalLimiterWf1).Times(1) - externalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(externalLimiterWf2).Times(1) + externalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(externalLimiterWf1).Times(1) + externalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(externalLimiterWf2).Times(1) // We do not expect calls to the internal rate limiters, but they will still be created. internalLimiterWf1 := quotas.NewMockLimiter(ctrl) internalLimiterWf2 := quotas.NewMockLimiter(ctrl) internalLimiterFactory := quotas.NewMockLimiterFactory(ctrl) - internalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(internalLimiterWf1).Times(1) - internalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(internalLimiterWf2).Times(1) + internalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(internalLimiterWf1).Times(1) + internalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(internalLimiterWf2).Times(1) wfCache := New(Params{ TTL: time.Minute, @@ -166,7 +166,7 @@ func TestWfCache_AllowError(t *testing.T) { }).(*wfCache) // We set getCacheItemFn to a function that will return an error so that we can test the error logic - wfCache.getCacheItemFn = func(domainID string, workflowID string) (*cacheValue, error) { + wfCache.getCacheItemFn = func(domainName string, workflowID string) (*cacheValue, error) { return nil, assert.AnError } @@ -260,14 +260,14 @@ func TestWfCache_RejectLog(t *testing.T) { externalLimiter.EXPECT().Allow().Return(false).Times(1) externalLimiterFactory := quotas.NewMockLimiterFactory(ctrl) - externalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(externalLimiter).Times(1) + externalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(externalLimiter).Times(1) // The internal rate limiter will reject internalLimiter := quotas.NewMockLimiter(ctrl) internalLimiter.EXPECT().Allow().Return(false).Times(1) internalLimiterFactory := quotas.NewMockLimiterFactory(ctrl) - internalLimiterFactory.EXPECT().GetLimiter(testDomainID).Return(internalLimiter).Times(1) + internalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(internalLimiter).Times(1) // Setup the mock logger logger := new(log.MockLogger)