Skip to content

Commit

Permalink
Make sure task processing rate limiter is only done in the active side (
Browse files Browse the repository at this point in the history
#5654)

* Make sure task processing rate limiter is only done in the active side

* Update transfer_active_task_executor.go
  • Loading branch information
sankari165 authored Feb 13, 2024
1 parent a614c4c commit 701c628
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 17 deletions.
1 change: 0 additions & 1 deletion service/history/queue/transfer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func NewTransferQueueProcessor(
logger,
clusterName,
config,
wfIDCache,
)
standbyQueueProcessors[clusterName] = newTransferQueueStandbyProcessor(
clusterName,
Expand Down
7 changes: 6 additions & 1 deletion service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type (
historyClient history.Client
parentClosePolicyClient parentclosepolicy.Client
workflowResetter reset.WorkflowResetter
wfIDCache workflowcache.WFCache
}

generatorF = func(taskGenerator execution.MutableStateTaskGenerator) error
Expand All @@ -95,7 +96,6 @@ func NewTransferActiveTaskExecutor(
executionCache,
logger,
config,
wfIDCache,
),
historyClient: shard.GetService().GetHistoryClient(),
parentClosePolicyClient: parentclosepolicy.NewClient(
Expand All @@ -105,6 +105,7 @@ func NewTransferActiveTaskExecutor(
config.NumParentClosePolicySystemWorkflows(),
),
workflowResetter: workflowResetter,
wfIDCache: wfIDCache,
}
}

Expand Down Expand Up @@ -195,6 +196,10 @@ func (t *transferActiveTaskExecutor) processActivityTask(
// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release(nil)

// Ratelimiting is not done. This is only to count the number of requests via metrics
t.wfIDCache.AllowInternal(task.DomainID, task.WorkflowID)

return t.pushActivity(ctx, task, timeout, mutableState.GetExecutionInfo().PartitionConfig)
}

Expand Down
3 changes: 0 additions & 3 deletions service/history/task/transfer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/workflowcache"
"github.com/uber/cadence/service/worker/archiver"
)

Expand All @@ -57,7 +56,6 @@ func NewTransferStandbyTaskExecutor(
logger log.Logger,
clusterName string,
config *config.Config,
wfIDCache workflowcache.WFCache,
) Executor {
return &transferStandbyTaskExecutor{
transferTaskExecutorBase: newTransferTaskExecutorBase(
Expand All @@ -66,7 +64,6 @@ func NewTransferStandbyTaskExecutor(
executionCache,
logger,
config,
wfIDCache,
),
clusterName: clusterName,
historyResender: historyResender,
Expand Down
5 changes: 0 additions & 5 deletions service/history/task/transfer_standby_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
test "github.com/uber/cadence/service/history/testing"
"github.com/uber/cadence/service/history/workflowcache"
warchiver "github.com/uber/cadence/service/worker/archiver"
)

Expand All @@ -61,7 +60,6 @@ type (
controller *gomock.Controller
mockShard *shard.TestContext
mockDomainCache *cache.MockDomainCache
mockWFCache *workflowcache.MockWFCache
mockNDCHistoryResender *ndc.MockHistoryResender
mockMatchingClient *matching.MockClient

Expand Down Expand Up @@ -138,7 +136,6 @@ func (s *transferStandbyTaskExecutorSuite) SetupTest() {
s.mockArchivalMetadata = s.mockShard.Resource.ArchivalMetadata
s.mockArchiverProvider = s.mockShard.Resource.ArchiverProvider
s.mockDomainCache = s.mockShard.Resource.DomainCache
s.mockWFCache = workflowcache.NewMockWFCache(s.controller)
s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomainName(constants.TestDomainID).Return(constants.TestDomainName, nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomain(constants.TestDomainName).Return(constants.TestGlobalDomainEntry, nil).AnyTimes()
Expand All @@ -162,7 +159,6 @@ func (s *transferStandbyTaskExecutorSuite) SetupTest() {
s.logger,
s.clusterName,
config,
s.mockWFCache,
).(*transferStandbyTaskExecutor)
}

Expand Down Expand Up @@ -240,7 +236,6 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessActivityTask_Pending_PushT
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockMatchingClient.EXPECT().AddActivityTask(gomock.Any(), createAddActivityTaskRequest(transferTask, ai, mutableState.GetExecutionInfo().PartitionConfig)).Return(nil).Times(1)
s.mockWFCache.EXPECT().AllowInternal(constants.TestDomainID, constants.TestWorkflowID).Return(true).Times(1)
s.mockShard.SetCurrentTime(s.clusterName, now)
err = s.transferStandbyTaskExecutor.Execute(transferTask, true)
s.Nil(err)
Expand Down
7 changes: 0 additions & 7 deletions service/history/task/transfer_task_executor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/workflowcache"
"github.com/uber/cadence/service/worker/archiver"
)

Expand All @@ -60,7 +59,6 @@ type (
visibilityMgr persistence.VisibilityManager
config *config.Config
throttleRetry *backoff.ThrottleRetry
wfIDCache workflowcache.WFCache
}
)

Expand All @@ -70,7 +68,6 @@ func newTransferTaskExecutorBase(
executionCache *execution.Cache,
logger log.Logger,
config *config.Config,
wfIDCache workflowcache.WFCache,
) *transferTaskExecutorBase {
return &transferTaskExecutorBase{
shard: shard,
Expand All @@ -85,7 +82,6 @@ func newTransferTaskExecutorBase(
backoff.WithRetryPolicy(taskRetryPolicy),
backoff.WithRetryableError(common.IsServiceTransientError),
),
wfIDCache: wfIDCache,
}
}

Expand All @@ -103,9 +99,6 @@ func (t *transferTaskExecutorBase) pushActivity(
t.logger.Fatal("Cannot process non activity task", tag.TaskType(task.GetTaskType()))
}

// Ratelimiting is not done. This is only to count the number of requests via metrics
t.wfIDCache.AllowInternal(task.DomainID, task.WorkflowID)

return t.matchingClient.AddActivityTask(ctx, &types.AddActivityTaskRequest{
DomainUUID: task.TargetDomainID,
SourceDomainUUID: task.DomainID,
Expand Down

0 comments on commit 701c628

Please sign in to comment.