Skip to content

Commit

Permalink
Drop task when namespace is not on the cluster (temporalio#4444)
Browse files Browse the repository at this point in the history
* Drop task when namespace is not on the cluster
  • Loading branch information
yux0 authored Jun 7, 2023
1 parent 79e212d commit b61c3f3
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 3 deletions.
1 change: 1 addition & 0 deletions common/namespace/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"time"

"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common"
"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/clock"
Expand Down
1 change: 1 addition & 0 deletions service/history/archival_queue_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
shardContext.EXPECT().GetConfig().Return(cfg).AnyTimes()
mockMetadata := cluster.NewMockMetadata(p.Controller)
mockMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes()
mockMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
shardContext.EXPECT().GetClusterMetadata().Return(mockMetadata).AnyTimes()

shardID := int32(1)
Expand Down
11 changes: 8 additions & 3 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,21 +178,26 @@ func (e *executableImpl) Execute() (retErr error) {
return nil
}

ns, _ := e.namespaceRegistry.GetNamespaceName(namespace.ID(e.GetNamespaceID()))
ns, _ := e.namespaceRegistry.GetNamespaceByID(namespace.ID(e.GetNamespaceID()))
var callerInfo headers.CallerInfo
switch e.priority {
case ctasks.PriorityHigh:
callerInfo = headers.NewBackgroundCallerInfo(ns.String())
callerInfo = headers.NewBackgroundCallerInfo(ns.Name().String())
default:
// priority low or unknown
callerInfo = headers.NewPreemptableCallerInfo(ns.String())
callerInfo = headers.NewPreemptableCallerInfo(ns.Name().String())
}
ctx := headers.SetCallerInfo(
metrics.AddMetricsContext(context.Background()),
callerInfo,
)
e.Unlock()

if !ns.IsOnCluster(e.clusterMetadata.GetCurrentClusterName()) {
// Discard task if the namespace is not on the current cluster.
return consts.ErrTaskDiscarded
}

defer func() {
if panicObj := recover(); panicObj != nil {
err, ok := panicObj.(error)
Expand Down
14 changes: 14 additions & 0 deletions service/history/queues/executable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

persistencepb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/definition"
Expand Down Expand Up @@ -270,6 +271,19 @@ func (s *executableSuite) TestExecute_CallerInfo() {
s.NoError(executable.Execute())
}

func (s *executableSuite) TestExecute_DiscardTask() {
executable := s.newTestExecutable()
registry := namespace.NewMockRegistry(s.controller)
executable.(*executableImpl).namespaceRegistry = registry
ns := namespace.NewGlobalNamespaceForTest(nil, nil, &persistencepb.NamespaceReplicationConfig{
ActiveClusterName: "nonCurrentCluster",
Clusters: []string{"nonCurrentCluster"},
}, 1)

registry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil)
s.ErrorIs(executable.Execute(), consts.ErrTaskDiscarded)
}

func (s *executableSuite) TestExecuteHandleErr_ResetAttempt() {
executable := s.newTestExecutable()
s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Return(nil, true, errors.New("some random error"))
Expand Down
107 changes: 107 additions & 0 deletions service/history/queues/executor_wrapper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package queues

import (
"context"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

persistencepb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/namespace"
)

const (
currentCluster = "current"
nonCurrentCluster = "nonCurrent"
)

type (
executorSuite struct {
suite.Suite
*require.Assertions
ctrl *gomock.Controller

registry *namespace.MockRegistry
activeExecutor *MockExecutor
standbyExecutor *MockExecutor
executor Executor
}
)

func TestExecutorSuite(t *testing.T) {
t.Parallel()
s := new(executorSuite)
suite.Run(t, s)
}

func (s *executorSuite) SetupTest() {
s.Assertions = require.New(s.T())
s.ctrl = gomock.NewController(s.T())
s.registry = namespace.NewMockRegistry(s.ctrl)
s.activeExecutor = NewMockExecutor(s.ctrl)
s.standbyExecutor = NewMockExecutor(s.ctrl)
s.executor = NewExecutorWrapper(
currentCluster,
s.registry,
s.activeExecutor,
s.standbyExecutor,
log.NewNoopLogger(),
)
}

func (s *executorSuite) TestExecute_Active() {
executable := NewMockExecutable(s.ctrl)
executable.EXPECT().GetNamespaceID().Return("namespace_id")
executable.EXPECT().GetTask().Return(nil)
ns := namespace.NewGlobalNamespaceForTest(nil, nil, &persistencepb.NamespaceReplicationConfig{
ActiveClusterName: currentCluster,
Clusters: []string{currentCluster},
}, 1)
s.registry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil)
s.activeExecutor.EXPECT().Execute(gomock.Any(), gomock.Any()).Return(nil, true, nil).Times(1)
_, isActive, err := s.executor.Execute(context.Background(), executable)
s.NoError(err)
s.True(isActive)
}

func (s *executorSuite) TestExecute_Standby() {
executable := NewMockExecutable(s.ctrl)
executable.EXPECT().GetNamespaceID().Return("namespace_id")
executable.EXPECT().GetTask().Return(nil)
ns := namespace.NewGlobalNamespaceForTest(nil, nil, &persistencepb.NamespaceReplicationConfig{
ActiveClusterName: nonCurrentCluster,
Clusters: []string{currentCluster, nonCurrentCluster},
}, 1)
s.registry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil)
s.standbyExecutor.EXPECT().Execute(gomock.Any(), gomock.Any()).Return(nil, false, nil).Times(1)
_, isActive, err := s.executor.Execute(context.Background(), executable)
s.NoError(err)
s.False(isActive)
}

0 comments on commit b61c3f3

Please sign in to comment.