From 685b55a26ce19da2e5d81c008b33f5b42974e722 Mon Sep 17 00:00:00 2001 From: yux0 Date: Fri, 4 Nov 2022 08:56:57 -0700 Subject: [PATCH 1/2] Handle ns not found in replication --- client/clientBean.go | 4 +-- service/history/replication/task_executor.go | 26 +++++++++++--------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/client/clientBean.go b/client/clientBean.go index 014d2a509fd..d2f919a8639 100644 --- a/client/clientBean.go +++ b/client/clientBean.go @@ -191,7 +191,7 @@ func (h *clientBeanImpl) GetRemoteAdminClient(cluster string) (adminservice.Admi if !ok { clusterInfo, clusterFound := h.clusterMetadata.GetAllClusterInfo()[cluster] if !clusterFound { - return nil, &serviceerror.Unavailable{ + return nil, &serviceerror.NotFound{ Message: fmt.Sprintf( "Unknown cluster name: %v with given cluster information map: %v.", cluster, @@ -233,7 +233,7 @@ func (h *clientBeanImpl) GetRemoteFrontendClient(cluster string) (workflowservic if !ok { clusterInfo, clusterFound := h.clusterMetadata.GetAllClusterInfo()[cluster] if !clusterFound { - return nil, &serviceerror.Unavailable{ + return nil, &serviceerror.NotFound{ Message: fmt.Sprintf( "Unknown cluster name: %v with given cluster information map: %v.", cluster, diff --git a/service/history/replication/task_executor.go b/service/history/replication/task_executor.go index 076da352e34..74cd1c624ab 100644 --- a/service/history/replication/task_executor.go +++ b/service/history/replication/task_executor.go @@ -306,19 +306,23 @@ func (e *taskExecutorImpl) filterTask( } namespaceEntry, err := e.namespaceRegistry.GetNamespaceByID(namespaceID) - if err != nil { - return false, err - } - - shouldProcessTask := false -FilterLoop: - for _, targetCluster := range namespaceEntry.ClusterNames() { - if e.currentCluster == targetCluster { - shouldProcessTask = true - break FilterLoop + switch err.(type) { + case nil: + shouldProcessTask := false + FilterLoop: + for _, targetCluster := range namespaceEntry.ClusterNames() { + if e.currentCluster == targetCluster { + shouldProcessTask = true + break FilterLoop + } } + return shouldProcessTask, nil + case *serviceerror.NamespaceNotFound: + // Drop the task + return false, nil + default: + return false, err } - return shouldProcessTask, nil } func (e *taskExecutorImpl) cleanupWorkflowExecution(ctx context.Context, namespaceID string, workflowID string, runID string) (retErr error) { From b0d920db415f7aa54a9c59975b1ee9fe27b21ca0 Mon Sep 17 00:00:00 2001 From: yux0 Date: Fri, 4 Nov 2022 22:25:30 -0700 Subject: [PATCH 2/2] Add unit test --- service/history/replication/task_executor.go | 28 +++++++++---------- .../history/replication/task_executor_test.go | 11 ++++++++ 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/service/history/replication/task_executor.go b/service/history/replication/task_executor.go index 74cd1c624ab..7a4980c9d9c 100644 --- a/service/history/replication/task_executor.go +++ b/service/history/replication/task_executor.go @@ -306,23 +306,23 @@ func (e *taskExecutorImpl) filterTask( } namespaceEntry, err := e.namespaceRegistry.GetNamespaceByID(namespaceID) - switch err.(type) { - case nil: - shouldProcessTask := false - FilterLoop: - for _, targetCluster := range namespaceEntry.ClusterNames() { - if e.currentCluster == targetCluster { - shouldProcessTask = true - break FilterLoop - } + if err != nil { + if _, ok := err.(*serviceerror.NamespaceNotFound); ok { + // Drop the task + return false, nil } - return shouldProcessTask, nil - case *serviceerror.NamespaceNotFound: - // Drop the task - return false, nil - default: return false, err } + + shouldProcessTask := false +FilterLoop: + for _, targetCluster := range namespaceEntry.ClusterNames() { + if e.currentCluster == targetCluster { + shouldProcessTask = true + break FilterLoop + } + } + return shouldProcessTask, nil } func (e *taskExecutorImpl) cleanupWorkflowExecution(ctx context.Context, namespaceID string, workflowID string, runID string) (retErr error) { diff --git a/service/history/replication/task_executor_test.go b/service/history/replication/task_executor_test.go index 76adbc4930d..cbffe6b98dd 100644 --- a/service/history/replication/task_executor_test.go +++ b/service/history/replication/task_executor_test.go @@ -34,6 +34,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/serviceerror" enumsspb "go.temporal.io/server/api/enums/v1" historyspb "go.temporal.io/server/api/history/v1" @@ -181,6 +182,16 @@ func (s *taskExecutorSuite) TestFilterTask_EnforceApply() { s.True(ok) } +func (s *taskExecutorSuite) TestFilterTask_NamespaceNotFound() { + namespaceID := namespace.ID(uuid.New()) + s.mockNamespaceCache.EXPECT(). + GetNamespaceByID(namespaceID). + Return(nil, &serviceerror.NamespaceNotFound{}) + ok, err := s.replicationTaskExecutor.filterTask(namespaceID, false) + s.NoError(err) + s.False(ok) +} + func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask() { namespaceID := namespace.ID(uuid.New()) workflowID := uuid.New()