Skip to content

Commit

Permalink
Drop standby tasks in standby taks executor (#4626)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
1. Revert "Drop task when namespace is not on the cluster (#4444)"
2. Add logic to drop standby task in transfer / timer standby task
executor.

<!-- Tell your future self why have you made these changes -->
**Why?**


<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**


<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**


<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
  • Loading branch information
yux0 authored and dnr committed Jul 21, 2023
1 parent a466c4d commit 49b99fe
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 29 deletions.
1 change: 0 additions & 1 deletion common/namespace/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"time"

"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common"
"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/clock"
Expand Down
1 change: 0 additions & 1 deletion service/history/archival_queue_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
shardContext.EXPECT().GetConfig().Return(cfg).AnyTimes()
mockMetadata := cluster.NewMockMetadata(p.Controller)
mockMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes()
mockMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
shardContext.EXPECT().GetClusterMetadata().Return(mockMetadata).AnyTimes()

shardID := int32(1)
Expand Down
17 changes: 4 additions & 13 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,15 @@ func (e *executableImpl) Execute() (retErr error) {
e.Unlock()
return nil
}
var namespaceName string
ns, err := e.namespaceRegistry.GetNamespaceByID(namespace.ID(e.GetNamespaceID()))
if err == nil {
namespaceName = ns.Name().String()
}

ns, _ := e.namespaceRegistry.GetNamespaceName(namespace.ID(e.GetNamespaceID()))
var callerInfo headers.CallerInfo
switch e.priority {
case ctasks.PriorityHigh:
callerInfo = headers.NewBackgroundCallerInfo(namespaceName)
callerInfo = headers.NewBackgroundCallerInfo(ns.String())
default:
// priority low or unknown
callerInfo = headers.NewPreemptableCallerInfo(namespaceName)
callerInfo = headers.NewPreemptableCallerInfo(ns.String())
}
ctx := headers.SetCallerInfo(
metrics.AddMetricsContext(context.Background()),
Expand Down Expand Up @@ -232,12 +229,6 @@ func (e *executableImpl) Execute() (retErr error) {
// Not doing it here as for certain errors latency for the attempt should not be counted
}()

if ns != nil && !ns.IsOnCluster(e.clusterMetadata.GetCurrentClusterName()) {
// Discard task if the namespace is not on the current cluster.
e.taggedMetricsHandler = e.metricsHandler.WithTags(e.estimateTaskMetricTag()...)
return consts.ErrTaskDiscarded
}

metricsTags, isActive, err := e.executor.Execute(ctx, e)
e.taggedMetricsHandler = e.metricsHandler.WithTags(metricsTags...)

Expand Down
14 changes: 0 additions & 14 deletions service/history/queues/executable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

persistencepb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/definition"
Expand Down Expand Up @@ -271,19 +270,6 @@ func (s *executableSuite) TestExecute_CallerInfo() {
s.NoError(executable.Execute())
}

func (s *executableSuite) TestExecute_DiscardTask() {
executable := s.newTestExecutable()
registry := namespace.NewMockRegistry(s.controller)
executable.(*executableImpl).namespaceRegistry = registry
ns := namespace.NewGlobalNamespaceForTest(nil, nil, &persistencepb.NamespaceReplicationConfig{
ActiveClusterName: "nonCurrentCluster",
Clusters: []string{"nonCurrentCluster"},
}, 1)

registry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil).Times(2)
s.ErrorIs(executable.Execute(), consts.ErrTaskDiscarded)
}

func (s *executableSuite) TestExecuteHandleErr_ResetAttempt() {
executable := s.newTestExecutable()
s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Return(nil, true, errors.New("some random error"))
Expand Down
9 changes: 9 additions & 0 deletions service/history/timerQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,15 @@ func (t *timerQueueStandbyTaskExecutor) processTimer(
ctx, cancel := context.WithTimeout(ctx, taskTimeout)
defer cancel()

nsRecord, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(timerTask.GetNamespaceID()))
if err != nil {
return err
}
if !nsRecord.IsOnCluster(t.clusterName) {
// discard standby tasks
return consts.ErrTaskDiscarded
}

executionContext, release, err := getWorkflowExecutionContextForTask(ctx, t.cache, timerTask)
if err != nil {
return err
Expand Down
9 changes: 9 additions & 0 deletions service/history/transferQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,15 @@ func (t *transferQueueStandbyTaskExecutor) processTransfer(
ctx, cancel := context.WithTimeout(ctx, taskTimeout)
defer cancel()

nsRecord, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(taskInfo.GetNamespaceID()))
if err != nil {
return err
}
if !nsRecord.IsOnCluster(t.clusterName) {
// discard standby tasks
return consts.ErrTaskDiscarded
}

weContext, release, err := getWorkflowExecutionContextForTask(ctx, t.cache, taskInfo)
if err != nil {
return err
Expand Down

0 comments on commit 49b99fe

Please sign in to comment.