diff --git a/common/namespace/registry.go b/common/namespace/registry.go index bc2499b64c9..213a5fbd855 100644 --- a/common/namespace/registry.go +++ b/common/namespace/registry.go @@ -33,7 +33,6 @@ import ( "time" "go.temporal.io/api/serviceerror" - "go.temporal.io/server/common" "go.temporal.io/server/common/cache" "go.temporal.io/server/common/clock" diff --git a/service/history/archival_queue_task_executor_test.go b/service/history/archival_queue_task_executor_test.go index 76c3dd71e4b..9d3ab1561c2 100644 --- a/service/history/archival_queue_task_executor_test.go +++ b/service/history/archival_queue_task_executor_test.go @@ -354,7 +354,6 @@ func TestArchivalQueueTaskExecutor(t *testing.T) { shardContext.EXPECT().GetConfig().Return(cfg).AnyTimes() mockMetadata := cluster.NewMockMetadata(p.Controller) mockMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes() - mockMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() shardContext.EXPECT().GetClusterMetadata().Return(mockMetadata).AnyTimes() shardID := int32(1) diff --git a/service/history/queues/executable.go b/service/history/queues/executable.go index 84a287bee7e..2f9e146174d 100644 --- a/service/history/queues/executable.go +++ b/service/history/queues/executable.go @@ -177,18 +177,15 @@ func (e *executableImpl) Execute() (retErr error) { e.Unlock() return nil } - var namespaceName string - ns, err := e.namespaceRegistry.GetNamespaceByID(namespace.ID(e.GetNamespaceID())) - if err == nil { - namespaceName = ns.Name().String() - } + + ns, _ := e.namespaceRegistry.GetNamespaceName(namespace.ID(e.GetNamespaceID())) var callerInfo headers.CallerInfo switch e.priority { case ctasks.PriorityHigh: - callerInfo = headers.NewBackgroundCallerInfo(namespaceName) + callerInfo = headers.NewBackgroundCallerInfo(ns.String()) default: // priority low or unknown - callerInfo = headers.NewPreemptableCallerInfo(namespaceName) + callerInfo = headers.NewPreemptableCallerInfo(ns.String()) } ctx := headers.SetCallerInfo( metrics.AddMetricsContext(context.Background()), @@ -232,12 +229,6 @@ func (e *executableImpl) Execute() (retErr error) { // Not doing it here as for certain errors latency for the attempt should not be counted }() - if ns != nil && !ns.IsOnCluster(e.clusterMetadata.GetCurrentClusterName()) { - // Discard task if the namespace is not on the current cluster. - e.taggedMetricsHandler = e.metricsHandler.WithTags(e.estimateTaskMetricTag()...) - return consts.ErrTaskDiscarded - } - metricsTags, isActive, err := e.executor.Execute(ctx, e) e.taggedMetricsHandler = e.metricsHandler.WithTags(metricsTags...) diff --git a/service/history/queues/executable_test.go b/service/history/queues/executable_test.go index 122a9f6ca17..7ea1dbd099c 100644 --- a/service/history/queues/executable_test.go +++ b/service/history/queues/executable_test.go @@ -37,7 +37,6 @@ import ( enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" - persistencepb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/definition" @@ -271,19 +270,6 @@ func (s *executableSuite) TestExecute_CallerInfo() { s.NoError(executable.Execute()) } -func (s *executableSuite) TestExecute_DiscardTask() { - executable := s.newTestExecutable() - registry := namespace.NewMockRegistry(s.controller) - executable.(*executableImpl).namespaceRegistry = registry - ns := namespace.NewGlobalNamespaceForTest(nil, nil, &persistencepb.NamespaceReplicationConfig{ - ActiveClusterName: "nonCurrentCluster", - Clusters: []string{"nonCurrentCluster"}, - }, 1) - - registry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil).Times(2) - s.ErrorIs(executable.Execute(), consts.ErrTaskDiscarded) -} - func (s *executableSuite) TestExecuteHandleErr_ResetAttempt() { executable := s.newTestExecutable() s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Return(nil, true, errors.New("some random error")) diff --git a/service/history/timer_queue_standby_task_executor.go b/service/history/timer_queue_standby_task_executor.go index 16b6888492c..647db8dbf4e 100644 --- a/service/history/timer_queue_standby_task_executor.go +++ b/service/history/timer_queue_standby_task_executor.go @@ -435,6 +435,15 @@ func (t *timerQueueStandbyTaskExecutor) processTimer( ctx, cancel := context.WithTimeout(ctx, taskTimeout) defer cancel() + nsRecord, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(timerTask.GetNamespaceID())) + if err != nil { + return err + } + if !nsRecord.IsOnCluster(t.clusterName) { + // discard standby tasks + return consts.ErrTaskDiscarded + } + executionContext, release, err := getWorkflowExecutionContextForTask(ctx, t.cache, timerTask) if err != nil { return err diff --git a/service/history/transfer_queue_standby_task_executor.go b/service/history/transfer_queue_standby_task_executor.go index 1f689cf5ae2..94b8ac0896a 100644 --- a/service/history/transfer_queue_standby_task_executor.go +++ b/service/history/transfer_queue_standby_task_executor.go @@ -503,6 +503,15 @@ func (t *transferQueueStandbyTaskExecutor) processTransfer( ctx, cancel := context.WithTimeout(ctx, taskTimeout) defer cancel() + nsRecord, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(taskInfo.GetNamespaceID())) + if err != nil { + return err + } + if !nsRecord.IsOnCluster(t.clusterName) { + // discard standby tasks + return consts.ErrTaskDiscarded + } + weContext, release, err := getWorkflowExecutionContextForTask(ctx, t.cache, taskInfo) if err != nil { return err