From 701c628e05f001ea44654b1e55926d19560a5e79 Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Tue, 13 Feb 2024 10:38:53 +0100 Subject: [PATCH] Make sure task processing rate limiter is only done in the active side (#5654) * Make sure task processing rate limiter is only done in the active side * Update transfer_active_task_executor.go --- service/history/queue/transfer_queue_processor.go | 1 - service/history/task/transfer_active_task_executor.go | 7 ++++++- service/history/task/transfer_standby_task_executor.go | 3 --- .../history/task/transfer_standby_task_executor_test.go | 5 ----- service/history/task/transfer_task_executor_base.go | 7 ------- 5 files changed, 6 insertions(+), 17 deletions(-) diff --git a/service/history/queue/transfer_queue_processor.go b/service/history/queue/transfer_queue_processor.go index 21830bd9af8..7fd04d3649b 100644 --- a/service/history/queue/transfer_queue_processor.go +++ b/service/history/queue/transfer_queue_processor.go @@ -134,7 +134,6 @@ func NewTransferQueueProcessor( logger, clusterName, config, - wfIDCache, ) standbyQueueProcessors[clusterName] = newTransferQueueStandbyProcessor( clusterName, diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index 714bca3b0ff..292ed1da78f 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -72,6 +72,7 @@ type ( historyClient history.Client parentClosePolicyClient parentclosepolicy.Client workflowResetter reset.WorkflowResetter + wfIDCache workflowcache.WFCache } generatorF = func(taskGenerator execution.MutableStateTaskGenerator) error @@ -95,7 +96,6 @@ func NewTransferActiveTaskExecutor( executionCache, logger, config, - wfIDCache, ), historyClient: shard.GetService().GetHistoryClient(), parentClosePolicyClient: parentclosepolicy.NewClient( @@ -105,6 +105,7 @@ func NewTransferActiveTaskExecutor( config.NumParentClosePolicySystemWorkflows(), ), workflowResetter: workflowResetter, + wfIDCache: wfIDCache, } } @@ -195,6 +196,10 @@ func (t *transferActiveTaskExecutor) processActivityTask( // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(nil) + + // Ratelimiting is not done. This is only to count the number of requests via metrics + t.wfIDCache.AllowInternal(task.DomainID, task.WorkflowID) + return t.pushActivity(ctx, task, timeout, mutableState.GetExecutionInfo().PartitionConfig) } diff --git a/service/history/task/transfer_standby_task_executor.go b/service/history/task/transfer_standby_task_executor.go index 56f432b318d..c13e6594768 100644 --- a/service/history/task/transfer_standby_task_executor.go +++ b/service/history/task/transfer_standby_task_executor.go @@ -35,7 +35,6 @@ import ( "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/shard" - "github.com/uber/cadence/service/history/workflowcache" "github.com/uber/cadence/service/worker/archiver" ) @@ -57,7 +56,6 @@ func NewTransferStandbyTaskExecutor( logger log.Logger, clusterName string, config *config.Config, - wfIDCache workflowcache.WFCache, ) Executor { return &transferStandbyTaskExecutor{ transferTaskExecutorBase: newTransferTaskExecutorBase( @@ -66,7 +64,6 @@ func NewTransferStandbyTaskExecutor( executionCache, logger, config, - wfIDCache, ), clusterName: clusterName, historyResender: historyResender, diff --git a/service/history/task/transfer_standby_task_executor_test.go b/service/history/task/transfer_standby_task_executor_test.go index 6149f6fa1aa..da1e28647e8 100644 --- a/service/history/task/transfer_standby_task_executor_test.go +++ b/service/history/task/transfer_standby_task_executor_test.go @@ -49,7 +49,6 @@ import ( "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/shard" test "github.com/uber/cadence/service/history/testing" - "github.com/uber/cadence/service/history/workflowcache" warchiver "github.com/uber/cadence/service/worker/archiver" ) @@ -61,7 +60,6 @@ type ( controller *gomock.Controller mockShard *shard.TestContext mockDomainCache *cache.MockDomainCache - mockWFCache *workflowcache.MockWFCache mockNDCHistoryResender *ndc.MockHistoryResender mockMatchingClient *matching.MockClient @@ -138,7 +136,6 @@ func (s *transferStandbyTaskExecutorSuite) SetupTest() { s.mockArchivalMetadata = s.mockShard.Resource.ArchivalMetadata s.mockArchiverProvider = s.mockShard.Resource.ArchiverProvider s.mockDomainCache = s.mockShard.Resource.DomainCache - s.mockWFCache = workflowcache.NewMockWFCache(s.controller) s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomainName(constants.TestDomainID).Return(constants.TestDomainName, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomain(constants.TestDomainName).Return(constants.TestGlobalDomainEntry, nil).AnyTimes() @@ -162,7 +159,6 @@ func (s *transferStandbyTaskExecutorSuite) SetupTest() { s.logger, s.clusterName, config, - s.mockWFCache, ).(*transferStandbyTaskExecutor) } @@ -240,7 +236,6 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessActivityTask_Pending_PushT s.NoError(err) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockMatchingClient.EXPECT().AddActivityTask(gomock.Any(), createAddActivityTaskRequest(transferTask, ai, mutableState.GetExecutionInfo().PartitionConfig)).Return(nil).Times(1) - s.mockWFCache.EXPECT().AllowInternal(constants.TestDomainID, constants.TestWorkflowID).Return(true).Times(1) s.mockShard.SetCurrentTime(s.clusterName, now) err = s.transferStandbyTaskExecutor.Execute(transferTask, true) s.Nil(err) diff --git a/service/history/task/transfer_task_executor_base.go b/service/history/task/transfer_task_executor_base.go index d2c2782ca05..da9470c5ac0 100644 --- a/service/history/task/transfer_task_executor_base.go +++ b/service/history/task/transfer_task_executor_base.go @@ -36,7 +36,6 @@ import ( "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/shard" - "github.com/uber/cadence/service/history/workflowcache" "github.com/uber/cadence/service/worker/archiver" ) @@ -60,7 +59,6 @@ type ( visibilityMgr persistence.VisibilityManager config *config.Config throttleRetry *backoff.ThrottleRetry - wfIDCache workflowcache.WFCache } ) @@ -70,7 +68,6 @@ func newTransferTaskExecutorBase( executionCache *execution.Cache, logger log.Logger, config *config.Config, - wfIDCache workflowcache.WFCache, ) *transferTaskExecutorBase { return &transferTaskExecutorBase{ shard: shard, @@ -85,7 +82,6 @@ func newTransferTaskExecutorBase( backoff.WithRetryPolicy(taskRetryPolicy), backoff.WithRetryableError(common.IsServiceTransientError), ), - wfIDCache: wfIDCache, } } @@ -103,9 +99,6 @@ func (t *transferTaskExecutorBase) pushActivity( t.logger.Fatal("Cannot process non activity task", tag.TaskType(task.GetTaskType())) } - // Ratelimiting is not done. This is only to count the number of requests via metrics - t.wfIDCache.AllowInternal(task.DomainID, task.WorkflowID) - return t.matchingClient.AddActivityTask(ctx, &types.AddActivityTaskRequest{ DomainUUID: task.TargetDomainID, SourceDomainUUID: task.DomainID,