From 8d41945f1a6fea5d4d1320d94971b2a795947d1b Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Thu, 15 Feb 2024 14:24:34 +0100 Subject: [PATCH] Emit metrics when transfer tasks for decisions could be ratelimited (#5665) * Emit metrics when transfer tasks for decisions could be ratelimited * update unit tests --- service/history/task/transfer_active_task_executor.go | 4 ++++ service/history/task/transfer_active_task_executor_test.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index 292ed1da78f..f490a40e311 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -269,6 +269,10 @@ func (t *transferActiveTaskExecutor) processDecisionTask( // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(nil) + + // Ratelimiting is not done. This is only to count the number of requests via metrics + t.wfIDCache.AllowInternal(task.DomainID, task.WorkflowID) + err = t.pushDecision(ctx, task, taskList, decisionTimeout, mutableState.GetExecutionInfo().PartitionConfig) if _, ok := err.(*types.StickyWorkerUnavailableError); ok { // sticky worker is unavailable, switch to non-sticky task list diff --git a/service/history/task/transfer_active_task_executor_test.go b/service/history/task/transfer_active_task_executor_test.go index cf6a92d695f..4e0abd6d7d1 100644 --- a/service/history/task/transfer_active_task_executor_test.go +++ b/service/history/task/transfer_active_task_executor_test.go @@ -308,6 +308,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessDecisionTask_FirstDecision( persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, di.ScheduleID, di.Version) s.NoError(err) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + s.mockWFCache.EXPECT().AllowInternal(constants.TestDomainID, constants.TestWorkflowID).Return(true).Times(1) s.mockMatchingClient.EXPECT().AddDecisionTask(gomock.Any(), createAddDecisionTaskRequest(transferTask, mutableState)).Return(nil).Times(1) err = s.transferActiveTaskExecutor.Execute(transferTask, true) @@ -336,6 +337,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessDecisionTask_NonFirstDecisi persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, di.ScheduleID, di.Version) s.NoError(err) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + s.mockWFCache.EXPECT().AllowInternal(constants.TestDomainID, constants.TestWorkflowID).Return(true).Times(1) s.mockMatchingClient.EXPECT().AddDecisionTask(gomock.Any(), createAddDecisionTaskRequest(transferTask, mutableState)).Return(nil).Times(1) err = s.transferActiveTaskExecutor.Execute(transferTask, true) @@ -369,6 +371,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessDecisionTask_Sticky_NonFirs persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, di.ScheduleID, di.Version) s.NoError(err) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + s.mockWFCache.EXPECT().AllowInternal(constants.TestDomainID, constants.TestWorkflowID).Return(true).Times(1) s.mockMatchingClient.EXPECT().AddDecisionTask(gomock.Any(), createAddDecisionTaskRequest(transferTask, mutableState)).Return(nil).Times(1) err = s.transferActiveTaskExecutor.Execute(transferTask, true) @@ -402,6 +405,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessDecisionTask_DecisionNotSti persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, di.ScheduleID, di.Version) s.NoError(err) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + s.mockWFCache.EXPECT().AllowInternal(constants.TestDomainID, constants.TestWorkflowID).Return(true).Times(1) s.mockMatchingClient.EXPECT().AddDecisionTask(gomock.Any(), createAddDecisionTaskRequest(transferTask, mutableState)).Return(nil).Times(1) err = s.transferActiveTaskExecutor.Execute(transferTask, true)