From 49b99fe6a144f90f8427b7f7232e32a247694327 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Fri, 14 Jul 2023 09:45:37 -0700 Subject: [PATCH] Drop standby tasks in standby taks executor (#4626) **What changed?** 1. Revert "Drop task when namespace is not on the cluster (#4444)" 2. Add logic to drop standby task in transfer / timer standby task executor. **Why?** **How did you test it?** **Potential risks** **Is hotfix candidate?** --- common/namespace/registry.go | 1 - .../archival_queue_task_executor_test.go | 1 - service/history/queues/executable.go | 17 ++++------------- service/history/queues/executable_test.go | 14 -------------- .../history/timerQueueStandbyTaskExecutor.go | 9 +++++++++ .../history/transferQueueStandbyTaskExecutor.go | 9 +++++++++ 6 files changed, 22 insertions(+), 29 deletions(-) diff --git a/common/namespace/registry.go b/common/namespace/registry.go index 611ea6bac38..b61e32c579c 100644 --- a/common/namespace/registry.go +++ b/common/namespace/registry.go @@ -33,7 +33,6 @@ import ( "time" "go.temporal.io/api/serviceerror" - "go.temporal.io/server/common" "go.temporal.io/server/common/cache" "go.temporal.io/server/common/clock" diff --git a/service/history/archival_queue_task_executor_test.go b/service/history/archival_queue_task_executor_test.go index 76c3dd71e4b..9d3ab1561c2 100644 --- a/service/history/archival_queue_task_executor_test.go +++ b/service/history/archival_queue_task_executor_test.go @@ -354,7 +354,6 @@ func TestArchivalQueueTaskExecutor(t *testing.T) { shardContext.EXPECT().GetConfig().Return(cfg).AnyTimes() mockMetadata := cluster.NewMockMetadata(p.Controller) mockMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes() - mockMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() shardContext.EXPECT().GetClusterMetadata().Return(mockMetadata).AnyTimes() shardID := int32(1) diff --git a/service/history/queues/executable.go b/service/history/queues/executable.go index 84a287bee7e..2f9e146174d 100644 --- a/service/history/queues/executable.go +++ b/service/history/queues/executable.go @@ -177,18 +177,15 @@ func (e *executableImpl) Execute() (retErr error) { e.Unlock() return nil } - var namespaceName string - ns, err := e.namespaceRegistry.GetNamespaceByID(namespace.ID(e.GetNamespaceID())) - if err == nil { - namespaceName = ns.Name().String() - } + + ns, _ := e.namespaceRegistry.GetNamespaceName(namespace.ID(e.GetNamespaceID())) var callerInfo headers.CallerInfo switch e.priority { case ctasks.PriorityHigh: - callerInfo = headers.NewBackgroundCallerInfo(namespaceName) + callerInfo = headers.NewBackgroundCallerInfo(ns.String()) default: // priority low or unknown - callerInfo = headers.NewPreemptableCallerInfo(namespaceName) + callerInfo = headers.NewPreemptableCallerInfo(ns.String()) } ctx := headers.SetCallerInfo( metrics.AddMetricsContext(context.Background()), @@ -232,12 +229,6 @@ func (e *executableImpl) Execute() (retErr error) { // Not doing it here as for certain errors latency for the attempt should not be counted }() - if ns != nil && !ns.IsOnCluster(e.clusterMetadata.GetCurrentClusterName()) { - // Discard task if the namespace is not on the current cluster. - e.taggedMetricsHandler = e.metricsHandler.WithTags(e.estimateTaskMetricTag()...) - return consts.ErrTaskDiscarded - } - metricsTags, isActive, err := e.executor.Execute(ctx, e) e.taggedMetricsHandler = e.metricsHandler.WithTags(metricsTags...) diff --git a/service/history/queues/executable_test.go b/service/history/queues/executable_test.go index 122a9f6ca17..7ea1dbd099c 100644 --- a/service/history/queues/executable_test.go +++ b/service/history/queues/executable_test.go @@ -37,7 +37,6 @@ import ( enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" - persistencepb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/definition" @@ -271,19 +270,6 @@ func (s *executableSuite) TestExecute_CallerInfo() { s.NoError(executable.Execute()) } -func (s *executableSuite) TestExecute_DiscardTask() { - executable := s.newTestExecutable() - registry := namespace.NewMockRegistry(s.controller) - executable.(*executableImpl).namespaceRegistry = registry - ns := namespace.NewGlobalNamespaceForTest(nil, nil, &persistencepb.NamespaceReplicationConfig{ - ActiveClusterName: "nonCurrentCluster", - Clusters: []string{"nonCurrentCluster"}, - }, 1) - - registry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil).Times(2) - s.ErrorIs(executable.Execute(), consts.ErrTaskDiscarded) -} - func (s *executableSuite) TestExecuteHandleErr_ResetAttempt() { executable := s.newTestExecutable() s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Return(nil, true, errors.New("some random error")) diff --git a/service/history/timerQueueStandbyTaskExecutor.go b/service/history/timerQueueStandbyTaskExecutor.go index 16b6888492c..647db8dbf4e 100644 --- a/service/history/timerQueueStandbyTaskExecutor.go +++ b/service/history/timerQueueStandbyTaskExecutor.go @@ -435,6 +435,15 @@ func (t *timerQueueStandbyTaskExecutor) processTimer( ctx, cancel := context.WithTimeout(ctx, taskTimeout) defer cancel() + nsRecord, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(timerTask.GetNamespaceID())) + if err != nil { + return err + } + if !nsRecord.IsOnCluster(t.clusterName) { + // discard standby tasks + return consts.ErrTaskDiscarded + } + executionContext, release, err := getWorkflowExecutionContextForTask(ctx, t.cache, timerTask) if err != nil { return err diff --git a/service/history/transferQueueStandbyTaskExecutor.go b/service/history/transferQueueStandbyTaskExecutor.go index 1f689cf5ae2..94b8ac0896a 100644 --- a/service/history/transferQueueStandbyTaskExecutor.go +++ b/service/history/transferQueueStandbyTaskExecutor.go @@ -503,6 +503,15 @@ func (t *transferQueueStandbyTaskExecutor) processTransfer( ctx, cancel := context.WithTimeout(ctx, taskTimeout) defer cancel() + nsRecord, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(taskInfo.GetNamespaceID())) + if err != nil { + return err + } + if !nsRecord.IsOnCluster(t.clusterName) { + // discard standby tasks + return consts.ErrTaskDiscarded + } + weContext, release, err := getWorkflowExecutionContextForTask(ctx, t.cache, taskInfo) if err != nil { return err