From dc692785b89a034f4310f8eb7bddbe7f308e8128 Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Mon, 6 Mar 2023 16:37:28 -0800 Subject: [PATCH] Restructure replication task implementation & UT (#3992) * Restructure replication task implementation & UT for ease of management --- common/metrics/metric_defs.go | 16 +- service/history/historyEngine.go | 2 +- .../executable_activity_state_task.go | 143 +++++++ .../executable_activity_state_task_test.go | 262 +++++++++++++ .../replication/executable_history_task.go | 139 +++++++ .../executable_history_task_test.go | 240 ++++++++++++ .../replication/executable_noop_task.go | 65 ++++ .../replication/executable_noop_task_test.go | 118 ++++++ .../history/replication/executable_task.go | 349 +++++++++++++++++ .../executable_task_initializer.go | 126 ++++++ .../replication/executable_task_mock.go | 240 ++++++++++++ .../replication/executable_task_test.go | 363 ++++++++++++++++++ .../replication/executable_task_tracker.go | 141 +++++++ .../replication/executable_unknown_task.go | 77 ++++ .../executable_unknown_task_test.go | 119 ++++++ .../executable_workflow_state_task.go | 121 ++++++ .../executable_workflow_state_task_test.go | 180 +++++++++ service/history/replication/fx.go | 27 ++ service/history/replication/task_executor.go | 2 +- 19 files changed, 2723 insertions(+), 7 deletions(-) create mode 100644 service/history/replication/executable_activity_state_task.go create mode 100644 service/history/replication/executable_activity_state_task_test.go create mode 100644 service/history/replication/executable_history_task.go create mode 100644 service/history/replication/executable_history_task_test.go create mode 100644 service/history/replication/executable_noop_task.go create mode 100644 service/history/replication/executable_noop_task_test.go create mode 100644 service/history/replication/executable_task.go create mode 100644 service/history/replication/executable_task_initializer.go create mode 100644 service/history/replication/executable_task_mock.go create mode 100644 service/history/replication/executable_task_test.go create mode 100644 service/history/replication/executable_task_tracker.go create mode 100644 service/history/replication/executable_unknown_task.go create mode 100644 service/history/replication/executable_unknown_task_test.go create mode 100644 service/history/replication/executable_workflow_state_task.go create mode 100644 service/history/replication/executable_workflow_state_task_test.go diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 78453e65a83..cbcefb708b3 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -197,6 +197,8 @@ const ( AdminRemoveRemoteClusterScope = "AdminRemoveRemoteCluster" // AdminDeleteWorkflowExecutionScope is the metric scope for admin.AdminDeleteWorkflowExecution AdminDeleteWorkflowExecutionScope = "AdminDeleteWorkflowExecution" + // AdminStreamReplicationMessagesScope is the metric scope for admin.AdminStreamReplicationMessages + AdminStreamReplicationMessagesScope = "AdminStreamReplicationMessages" // OperatorAddSearchAttributesScope is the metric scope for operator.AddSearchAttributes OperatorAddSearchAttributesScope @@ -433,6 +435,8 @@ const ( HistoryClientDescribeHistoryHostScope = "HistoryClientDescribeHistoryHost" // HistoryClientGetReplicationMessagesScope tracks RPC calls to history service HistoryClientGetReplicationMessagesScope = "HistoryClientGetReplicationMessages" + // HistoryClientStreamReplicationMessagesScope tracks RPC calls to history service + HistoryClientStreamReplicationMessagesScope = "HistoryClientStreamReplicationMessages" ) // Matching Client Operations @@ -1136,12 +1140,14 @@ const ( HistoryMetadataReplicationTaskScope = "HistoryMetadataReplicationTask" // SyncShardTaskScope is the scope used by sync shrad information processing SyncShardTaskScope = "SyncShardTask" - // SyncActivityTaskScope is the scope used by sync activity information processing + // SyncActivityTaskScope is the scope used by sync activity SyncActivityTaskScope = "SyncActivityTask" - // ESProcessorScope is scope used by all metric emitted by esProcessor - ESProcessorScope = "ESProcessor" - // IndexProcessorScope is scope used by all metric emitted by index processor - IndexProcessorScope = "IndexProcessor" + // SyncWorkflowTaskScope is the scope used by sync workflow + SyncWorkflowTaskScope = "SyncWorkflowTask" + // NoopTaskScope is the scope used by noop task + NoopTaskScope = "NoopTask" + // UnknownTaskScope is the scope used by unknown task + UnknownTaskScope = "UnknownTask" // ParentClosePolicyProcessorScope is scope used by all metrics emitted by worker.ParentClosePolicyProcessor ParentClosePolicyProcessorScope = "ParentClosePolicyProcessor" DeleteNamespaceWorkflowScope = "DeleteNamespaceWorkflow" diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 09fb74a3e4a..defea2b3441 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -79,7 +79,7 @@ import ( "go.temporal.io/server/service/history/api/verifychildworkflowcompletionrecorded" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" - deletemanager "go.temporal.io/server/service/history/deletemanager" + "go.temporal.io/server/service/history/deletemanager" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/ndc" "go.temporal.io/server/service/history/queues" diff --git a/service/history/replication/executable_activity_state_task.go b/service/history/replication/executable_activity_state_task.go new file mode 100644 index 00000000000..b2b79499ece --- /dev/null +++ b/service/history/replication/executable_activity_state_task.go @@ -0,0 +1,143 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "time" + + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/api/historyservice/v1" + replicationspb "go.temporal.io/server/api/replication/v1" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + serviceerrors "go.temporal.io/server/common/serviceerror" + ctasks "go.temporal.io/server/common/tasks" +) + +type ( + ExecutableActivityStateTask struct { + ProcessToolBox + + definition.WorkflowKey + ExecutableTask + req *historyservice.SyncActivityRequest + + // variables to be perhaps removed (not essential to logic) + sourceClusterName string + } +) + +var _ ctasks.Task = (*ExecutableActivityStateTask)(nil) +var _ TrackableExecutableTask = (*ExecutableActivityStateTask)(nil) + +func NewExecutableActivityStateTask( + processToolBox ProcessToolBox, + taskID int64, + taskCreationTime time.Time, + task *replicationspb.SyncActivityTaskAttributes, + sourceClusterName string, +) *ExecutableActivityStateTask { + return &ExecutableActivityStateTask{ + ProcessToolBox: processToolBox, + + WorkflowKey: definition.NewWorkflowKey(task.NamespaceId, task.WorkflowId, task.RunId), + ExecutableTask: NewExecutableTask( + processToolBox, + taskID, + metrics.SyncActivityTaskScope, + taskCreationTime, + time.Now().UTC(), + ), + req: &historyservice.SyncActivityRequest{ + NamespaceId: task.NamespaceId, + WorkflowId: task.WorkflowId, + RunId: task.RunId, + Version: task.Version, + ScheduledEventId: task.ScheduledEventId, + ScheduledTime: task.ScheduledTime, + StartedEventId: task.StartedEventId, + StartedTime: task.StartedTime, + LastHeartbeatTime: task.LastHeartbeatTime, + Details: task.Details, + Attempt: task.Attempt, + LastFailure: task.LastFailure, + LastWorkerIdentity: task.LastWorkerIdentity, + VersionHistory: task.GetVersionHistory(), + }, + + sourceClusterName: sourceClusterName, + } +} + +func (e *ExecutableActivityStateTask) Execute() error { + namespaceName, apply, nsError := e.GetNamespaceInfo(e.NamespaceID) + if nsError != nil { + return nsError + } else if !apply { + return nil + } + ctx, cancel := newTaskContext(namespaceName) + defer cancel() + + shardContext, err := e.ShardController.GetShardByNamespaceWorkflow( + namespace.ID(e.NamespaceID), + e.WorkflowID, + ) + if err != nil { + return err + } + engine, err := shardContext.GetEngine(ctx) + if err != nil { + return err + } + return engine.SyncActivity(ctx, e.req) +} + +func (e *ExecutableActivityStateTask) HandleErr(err error) error { + switch retryErr := err.(type) { + case nil, *serviceerror.NotFound: + return nil + case *serviceerrors.RetryReplication: + namespaceName, _, nsError := e.GetNamespaceInfo(e.NamespaceID) + if nsError != nil { + return err + } + ctx, cancel := newTaskContext(namespaceName) + defer cancel() + + if resendErr := e.Resend( + ctx, + e.sourceClusterName, + retryErr, + ); resendErr != nil { + return err + } + return e.Execute() + default: + return err + } +} diff --git a/service/history/replication/executable_activity_state_task_test.go b/service/history/replication/executable_activity_state_task_test.go new file mode 100644 index 00000000000..2f806631be1 --- /dev/null +++ b/service/history/replication/executable_activity_state_task_test.go @@ -0,0 +1,262 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "errors" + "math/rand" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + commonpb "go.temporal.io/api/common/v1" + failurepb "go.temporal.io/api/failure/v1" + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/api/history/v1" + "go.temporal.io/server/api/historyservice/v1" + replicationspb "go.temporal.io/server/api/replication/v1" + "go.temporal.io/server/client" + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/primitives/timestamp" + serviceerrors "go.temporal.io/server/common/serviceerror" + "go.temporal.io/server/common/xdc" + "go.temporal.io/server/service/history/shard" +) + +type ( + executableActivityStateTaskSuite struct { + suite.Suite + *require.Assertions + + controller *gomock.Controller + clusterMetadata *cluster.MockMetadata + clientBean *client.MockBean + shardController *shard.MockController + namespaceCache *namespace.MockRegistry + ndcHistoryResender *xdc.MockNDCHistoryResender + metricsHandler metrics.Handler + logger log.Logger + executableTask *MockExecutableTask + + replicationTask *replicationspb.SyncActivityTaskAttributes + sourceClusterName string + + task *ExecutableActivityStateTask + } +) + +func TestExecutableActivityStateTaskSuite(t *testing.T) { + s := new(executableActivityStateTaskSuite) + suite.Run(t, s) +} + +func (s *executableActivityStateTaskSuite) SetupSuite() { + s.Assertions = require.New(s.T()) +} + +func (s *executableActivityStateTaskSuite) TearDownSuite() { + +} + +func (s *executableActivityStateTaskSuite) SetupTest() { + s.controller = gomock.NewController(s.T()) + s.clusterMetadata = cluster.NewMockMetadata(s.controller) + s.clientBean = client.NewMockBean(s.controller) + s.shardController = shard.NewMockController(s.controller) + s.namespaceCache = namespace.NewMockRegistry(s.controller) + s.ndcHistoryResender = xdc.NewMockNDCHistoryResender(s.controller) + s.metricsHandler = metrics.NoopMetricsHandler + s.logger = log.NewNoopLogger() + s.executableTask = NewMockExecutableTask(s.controller) + s.replicationTask = &replicationspb.SyncActivityTaskAttributes{ + NamespaceId: uuid.NewString(), + WorkflowId: uuid.NewString(), + RunId: uuid.NewString(), + Version: rand.Int63(), + ScheduledEventId: rand.Int63(), + ScheduledTime: timestamp.TimePtr(time.Unix(0, rand.Int63())), + StartedEventId: rand.Int63(), + StartedTime: timestamp.TimePtr(time.Unix(0, rand.Int63())), + LastHeartbeatTime: timestamp.TimePtr(time.Unix(0, rand.Int63())), + Details: &commonpb.Payloads{}, + Attempt: rand.Int31(), + LastFailure: &failurepb.Failure{}, + LastWorkerIdentity: uuid.NewString(), + VersionHistory: &history.VersionHistory{}, + } + s.sourceClusterName = cluster.TestCurrentClusterName + + s.task = NewExecutableActivityStateTask( + ProcessToolBox{ + ClusterMetadata: s.clusterMetadata, + ClientBean: s.clientBean, + ShardController: s.shardController, + NamespaceCache: s.namespaceCache, + NDCHistoryResender: s.ndcHistoryResender, + MetricsHandler: s.metricsHandler, + Logger: s.logger, + }, + rand.Int63(), + time.Unix(0, rand.Int63()), + s.replicationTask, + s.sourceClusterName, + ) + s.task.ExecutableTask = s.executableTask +} + +func (s *executableActivityStateTaskSuite) TearDownTest() { + s.controller.Finish() +} + +func (s *executableActivityStateTaskSuite) TestExecute_Process() { + s.executableTask.EXPECT().GetNamespaceInfo(s.task.NamespaceID).Return( + uuid.NewString(), true, nil, + ).AnyTimes() + + shardContext := shard.NewMockContext(s.controller) + engine := shard.NewMockEngine(s.controller) + s.shardController.EXPECT().GetShardByNamespaceWorkflow( + namespace.ID(s.task.NamespaceID), + s.task.WorkflowID, + ).Return(shardContext, nil).AnyTimes() + shardContext.EXPECT().GetEngine(gomock.Any()).Return(engine, nil).AnyTimes() + engine.EXPECT().SyncActivity(gomock.Any(), &historyservice.SyncActivityRequest{ + NamespaceId: s.replicationTask.NamespaceId, + WorkflowId: s.replicationTask.WorkflowId, + RunId: s.replicationTask.RunId, + Version: s.replicationTask.Version, + ScheduledEventId: s.replicationTask.ScheduledEventId, + ScheduledTime: s.replicationTask.ScheduledTime, + StartedEventId: s.replicationTask.StartedEventId, + StartedTime: s.replicationTask.StartedTime, + LastHeartbeatTime: s.replicationTask.LastHeartbeatTime, + Details: s.replicationTask.Details, + Attempt: s.replicationTask.Attempt, + LastFailure: s.replicationTask.LastFailure, + LastWorkerIdentity: s.replicationTask.LastWorkerIdentity, + VersionHistory: s.replicationTask.GetVersionHistory(), + }).Return(nil) + + err := s.task.Execute() + s.NoError(err) +} + +func (s *executableActivityStateTaskSuite) TestExecute_Skip() { + s.executableTask.EXPECT().GetNamespaceInfo(s.task.NamespaceID).Return( + uuid.NewString(), false, nil, + ).AnyTimes() + + err := s.task.Execute() + s.NoError(err) +} + +func (s *executableActivityStateTaskSuite) TestExecute_Err() { + err := errors.New("OwO") + s.executableTask.EXPECT().GetNamespaceInfo(s.task.NamespaceID).Return( + "", false, err, + ).AnyTimes() + + s.Equal(err, s.task.Execute()) +} + +func (s *executableActivityStateTaskSuite) TestHandleErr_Resend_Success() { + s.executableTask.EXPECT().GetNamespaceInfo(s.task.NamespaceID).Return( + uuid.NewString(), true, nil, + ).AnyTimes() + shardContext := shard.NewMockContext(s.controller) + engine := shard.NewMockEngine(s.controller) + s.shardController.EXPECT().GetShardByNamespaceWorkflow( + namespace.ID(s.task.NamespaceID), + s.task.WorkflowID, + ).Return(shardContext, nil).AnyTimes() + shardContext.EXPECT().GetEngine(gomock.Any()).Return(engine, nil).AnyTimes() + engine.EXPECT().SyncActivity(gomock.Any(), &historyservice.SyncActivityRequest{ + NamespaceId: s.replicationTask.NamespaceId, + WorkflowId: s.replicationTask.WorkflowId, + RunId: s.replicationTask.RunId, + Version: s.replicationTask.Version, + ScheduledEventId: s.replicationTask.ScheduledEventId, + ScheduledTime: s.replicationTask.ScheduledTime, + StartedEventId: s.replicationTask.StartedEventId, + StartedTime: s.replicationTask.StartedTime, + LastHeartbeatTime: s.replicationTask.LastHeartbeatTime, + Details: s.replicationTask.Details, + Attempt: s.replicationTask.Attempt, + LastFailure: s.replicationTask.LastFailure, + LastWorkerIdentity: s.replicationTask.LastWorkerIdentity, + VersionHistory: s.replicationTask.GetVersionHistory(), + }).Return(nil) + + err := serviceerrors.NewRetryReplication( + "", + s.task.NamespaceID, + s.task.WorkflowID, + s.task.RunID, + rand.Int63(), + rand.Int63(), + rand.Int63(), + rand.Int63(), + ) + s.executableTask.EXPECT().Resend(gomock.Any(), s.sourceClusterName, err).Return(nil) + + s.NoError(s.task.HandleErr(err)) +} + +func (s *executableActivityStateTaskSuite) TestHandleErr_Resend_Error() { + s.executableTask.EXPECT().GetNamespaceInfo(s.task.NamespaceID).Return( + uuid.NewString(), true, nil, + ).AnyTimes() + err := serviceerrors.NewRetryReplication( + "", + s.task.NamespaceID, + s.task.WorkflowID, + s.task.RunID, + rand.Int63(), + rand.Int63(), + rand.Int63(), + rand.Int63(), + ) + s.executableTask.EXPECT().Resend(gomock.Any(), s.sourceClusterName, err).Return(errors.New("OwO")) + + s.Equal(err, s.task.HandleErr(err)) +} + +func (s *executableActivityStateTaskSuite) TestHandleErr_Other() { + err := errors.New("OwO") + s.Equal(err, s.task.HandleErr(err)) + + err = serviceerror.NewNotFound("") + s.Equal(nil, s.task.HandleErr(err)) + + err = serviceerror.NewUnavailable("") + s.Equal(err, s.task.HandleErr(err)) +} diff --git a/service/history/replication/executable_history_task.go b/service/history/replication/executable_history_task.go new file mode 100644 index 00000000000..12688f83326 --- /dev/null +++ b/service/history/replication/executable_history_task.go @@ -0,0 +1,139 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "time" + + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/api/historyservice/v1" + replicationspb "go.temporal.io/server/api/replication/v1" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + serviceerrors "go.temporal.io/server/common/serviceerror" + ctasks "go.temporal.io/server/common/tasks" +) + +type ( + ExecutableHistoryTask struct { + ProcessToolBox + + definition.WorkflowKey + ExecutableTask + req *historyservice.ReplicateEventsV2Request + + // variables to be perhaps removed (not essential to logic) + sourceClusterName string + } +) + +var _ ctasks.Task = (*ExecutableHistoryTask)(nil) +var _ TrackableExecutableTask = (*ExecutableHistoryTask)(nil) + +func NewExecutableHistoryTask( + processToolBox ProcessToolBox, + taskID int64, + taskCreationTime time.Time, + task *replicationspb.HistoryTaskAttributes, + sourceClusterName string, +) *ExecutableHistoryTask { + return &ExecutableHistoryTask{ + ProcessToolBox: processToolBox, + + WorkflowKey: definition.NewWorkflowKey(task.NamespaceId, task.WorkflowId, task.RunId), + ExecutableTask: NewExecutableTask( + processToolBox, + taskID, + metrics.HistoryReplicationTaskScope, + taskCreationTime, + time.Now().UTC(), + ), + req: &historyservice.ReplicateEventsV2Request{ + NamespaceId: task.NamespaceId, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: task.WorkflowId, + RunId: task.RunId, + }, + VersionHistoryItems: task.VersionHistoryItems, + Events: task.Events, + // new run events does not need version history since there is no prior events + NewRunEvents: task.NewRunEvents, + }, + + sourceClusterName: sourceClusterName, + } +} + +func (e *ExecutableHistoryTask) Execute() error { + namespaceName, apply, nsError := e.GetNamespaceInfo(e.NamespaceID) + if nsError != nil { + return nsError + } else if !apply { + return nil + } + ctx, cancel := newTaskContext(namespaceName) + defer cancel() + + shardContext, err := e.ShardController.GetShardByNamespaceWorkflow( + namespace.ID(e.NamespaceID), + e.WorkflowID, + ) + if err != nil { + return err + } + engine, err := shardContext.GetEngine(ctx) + if err != nil { + return err + } + return engine.ReplicateEventsV2(ctx, e.req) +} + +func (e *ExecutableHistoryTask) HandleErr(err error) error { + switch retryErr := err.(type) { + case nil, *serviceerror.NotFound: + return nil + case *serviceerrors.RetryReplication: + namespaceName, _, nsError := e.GetNamespaceInfo(e.NamespaceID) + if nsError != nil { + return err + } + ctx, cancel := newTaskContext(namespaceName) + defer cancel() + + if resendErr := e.Resend( + ctx, + e.sourceClusterName, + retryErr, + ); resendErr != nil { + return err + } + return e.Execute() + default: + return err + } +} diff --git a/service/history/replication/executable_history_task_test.go b/service/history/replication/executable_history_task_test.go new file mode 100644 index 00000000000..db1f09c2bcd --- /dev/null +++ b/service/history/replication/executable_history_task_test.go @@ -0,0 +1,240 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "errors" + "math/rand" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/api/history/v1" + "go.temporal.io/server/api/historyservice/v1" + replicationspb "go.temporal.io/server/api/replication/v1" + "go.temporal.io/server/client" + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + serviceerrors "go.temporal.io/server/common/serviceerror" + "go.temporal.io/server/common/xdc" + "go.temporal.io/server/service/history/shard" +) + +type ( + executableHistoryTaskSuite struct { + suite.Suite + *require.Assertions + + controller *gomock.Controller + clusterMetadata *cluster.MockMetadata + clientBean *client.MockBean + shardController *shard.MockController + namespaceCache *namespace.MockRegistry + ndcHistoryResender *xdc.MockNDCHistoryResender + metricsHandler metrics.Handler + logger log.Logger + executableTask *MockExecutableTask + + replicationTask *replicationspb.HistoryTaskAttributes + sourceClusterName string + + task *ExecutableHistoryTask + } +) + +func TestExecutableHistoryTaskSuite(t *testing.T) { + s := new(executableHistoryTaskSuite) + suite.Run(t, s) +} + +func (s *executableHistoryTaskSuite) SetupSuite() { + s.Assertions = require.New(s.T()) +} + +func (s *executableHistoryTaskSuite) TearDownSuite() { + +} + +func (s *executableHistoryTaskSuite) SetupTest() { + s.controller = gomock.NewController(s.T()) + s.clusterMetadata = cluster.NewMockMetadata(s.controller) + s.clientBean = client.NewMockBean(s.controller) + s.shardController = shard.NewMockController(s.controller) + s.namespaceCache = namespace.NewMockRegistry(s.controller) + s.ndcHistoryResender = xdc.NewMockNDCHistoryResender(s.controller) + s.metricsHandler = metrics.NoopMetricsHandler + s.logger = log.NewNoopLogger() + s.executableTask = NewMockExecutableTask(s.controller) + s.replicationTask = &replicationspb.HistoryTaskAttributes{ + NamespaceId: uuid.NewString(), + WorkflowId: uuid.NewString(), + RunId: uuid.NewString(), + VersionHistoryItems: []*history.VersionHistoryItem{}, + Events: &commonpb.DataBlob{}, + NewRunEvents: &commonpb.DataBlob{}, + } + s.sourceClusterName = cluster.TestCurrentClusterName + + s.task = NewExecutableHistoryTask( + ProcessToolBox{ + ClusterMetadata: s.clusterMetadata, + ClientBean: s.clientBean, + ShardController: s.shardController, + NamespaceCache: s.namespaceCache, + NDCHistoryResender: s.ndcHistoryResender, + MetricsHandler: s.metricsHandler, + Logger: s.logger, + }, + rand.Int63(), + time.Unix(0, rand.Int63()), + s.replicationTask, + s.sourceClusterName, + ) + s.task.ExecutableTask = s.executableTask +} + +func (s *executableHistoryTaskSuite) TearDownTest() { + s.controller.Finish() +} + +func (s *executableHistoryTaskSuite) TestExecute_Process() { + s.executableTask.EXPECT().GetNamespaceInfo(s.task.NamespaceID).Return( + uuid.NewString(), true, nil, + ).AnyTimes() + + shardContext := shard.NewMockContext(s.controller) + engine := shard.NewMockEngine(s.controller) + s.shardController.EXPECT().GetShardByNamespaceWorkflow( + namespace.ID(s.task.NamespaceID), + s.task.WorkflowID, + ).Return(shardContext, nil).AnyTimes() + shardContext.EXPECT().GetEngine(gomock.Any()).Return(engine, nil).AnyTimes() + engine.EXPECT().ReplicateEventsV2(gomock.Any(), &historyservice.ReplicateEventsV2Request{ + NamespaceId: s.task.NamespaceID, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: s.task.WorkflowID, + RunId: s.task.RunID, + }, + VersionHistoryItems: s.replicationTask.VersionHistoryItems, + Events: s.replicationTask.Events, + NewRunEvents: s.replicationTask.NewRunEvents, + }).Return(nil) + + err := s.task.Execute() + s.NoError(err) +} + +func (s *executableHistoryTaskSuite) TestExecute_Skip() { + s.executableTask.EXPECT().GetNamespaceInfo(s.task.NamespaceID).Return( + uuid.NewString(), false, nil, + ).AnyTimes() + + err := s.task.Execute() + s.NoError(err) +} + +func (s *executableHistoryTaskSuite) TestExecute_Err() { + err := errors.New("OwO") + s.executableTask.EXPECT().GetNamespaceInfo(s.task.NamespaceID).Return( + "", false, err, + ).AnyTimes() + + s.Equal(err, s.task.Execute()) +} + +func (s *executableHistoryTaskSuite) TestHandleErr_Resend_Success() { + s.executableTask.EXPECT().GetNamespaceInfo(s.task.NamespaceID).Return( + uuid.NewString(), true, nil, + ).AnyTimes() + shardContext := shard.NewMockContext(s.controller) + engine := shard.NewMockEngine(s.controller) + s.shardController.EXPECT().GetShardByNamespaceWorkflow( + namespace.ID(s.task.NamespaceID), + s.task.WorkflowID, + ).Return(shardContext, nil).AnyTimes() + shardContext.EXPECT().GetEngine(gomock.Any()).Return(engine, nil).AnyTimes() + engine.EXPECT().ReplicateEventsV2(gomock.Any(), &historyservice.ReplicateEventsV2Request{ + NamespaceId: s.task.NamespaceID, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: s.task.WorkflowID, + RunId: s.task.RunID, + }, + VersionHistoryItems: s.replicationTask.VersionHistoryItems, + Events: s.replicationTask.Events, + NewRunEvents: s.replicationTask.NewRunEvents, + }).Return(nil) + + err := serviceerrors.NewRetryReplication( + "", + s.task.NamespaceID, + s.task.WorkflowID, + s.task.RunID, + rand.Int63(), + rand.Int63(), + rand.Int63(), + rand.Int63(), + ) + s.executableTask.EXPECT().Resend(gomock.Any(), s.sourceClusterName, err).Return(nil) + + s.NoError(s.task.HandleErr(err)) +} + +func (s *executableHistoryTaskSuite) TestHandleErr_Resend_Error() { + s.executableTask.EXPECT().GetNamespaceInfo(s.task.NamespaceID).Return( + uuid.NewString(), true, nil, + ).AnyTimes() + err := serviceerrors.NewRetryReplication( + "", + s.task.NamespaceID, + s.task.WorkflowID, + s.task.RunID, + rand.Int63(), + rand.Int63(), + rand.Int63(), + rand.Int63(), + ) + s.executableTask.EXPECT().Resend(gomock.Any(), s.sourceClusterName, err).Return(errors.New("OwO")) + + s.Equal(err, s.task.HandleErr(err)) +} + +func (s *executableHistoryTaskSuite) TestHandleErr_Other() { + err := errors.New("OwO") + s.Equal(err, s.task.HandleErr(err)) + + err = serviceerror.NewNotFound("") + s.Equal(nil, s.task.HandleErr(err)) + + err = serviceerror.NewUnavailable("") + s.Equal(err, s.task.HandleErr(err)) +} diff --git a/service/history/replication/executable_noop_task.go b/service/history/replication/executable_noop_task.go new file mode 100644 index 00000000000..2f23ef68421 --- /dev/null +++ b/service/history/replication/executable_noop_task.go @@ -0,0 +1,65 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "time" + + "go.temporal.io/server/common/metrics" + ctasks "go.temporal.io/server/common/tasks" +) + +type ( + ExecutableNoopTask struct { + ExecutableTask + } +) + +var _ ctasks.Task = (*ExecutableNoopTask)(nil) +var _ TrackableExecutableTask = (*ExecutableNoopTask)(nil) + +func NewExecutableNoopTask( + processToolBox ProcessToolBox, + taskID int64, + taskCreationTime time.Time, +) *ExecutableNoopTask { + return &ExecutableNoopTask{ + ExecutableTask: NewExecutableTask( + processToolBox, + taskID, + metrics.NoopTaskScope, + taskCreationTime, + time.Now().UTC(), + ), + } +} + +func (e *ExecutableNoopTask) Execute() error { + return nil +} + +func (e *ExecutableNoopTask) HandleErr(err error) error { + return err +} diff --git a/service/history/replication/executable_noop_task_test.go b/service/history/replication/executable_noop_task_test.go new file mode 100644 index 00000000000..abceffb248c --- /dev/null +++ b/service/history/replication/executable_noop_task_test.go @@ -0,0 +1,118 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "errors" + "math/rand" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/client" + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/xdc" + "go.temporal.io/server/service/history/shard" +) + +type ( + executableNoopTaskSuite struct { + suite.Suite + *require.Assertions + + controller *gomock.Controller + clusterMetadata *cluster.MockMetadata + clientBean *client.MockBean + shardController *shard.MockController + namespaceCache *namespace.MockRegistry + ndcHistoryResender *xdc.MockNDCHistoryResender + metricsHandler metrics.Handler + logger log.Logger + + task *ExecutableNoopTask + } +) + +func TestExecutableNoopTaskSuite(t *testing.T) { + s := new(executableNoopTaskSuite) + suite.Run(t, s) +} + +func (s *executableNoopTaskSuite) SetupSuite() { + s.Assertions = require.New(s.T()) +} + +func (s *executableNoopTaskSuite) TearDownSuite() { + +} + +func (s *executableNoopTaskSuite) SetupTest() { + s.controller = gomock.NewController(s.T()) + s.clusterMetadata = cluster.NewMockMetadata(s.controller) + s.clientBean = client.NewMockBean(s.controller) + s.shardController = shard.NewMockController(s.controller) + s.namespaceCache = namespace.NewMockRegistry(s.controller) + s.ndcHistoryResender = xdc.NewMockNDCHistoryResender(s.controller) + s.metricsHandler = metrics.NoopMetricsHandler + s.logger = log.NewNoopLogger() + + s.task = NewExecutableNoopTask( + ProcessToolBox{ + ClusterMetadata: s.clusterMetadata, + ClientBean: s.clientBean, + ShardController: s.shardController, + NamespaceCache: s.namespaceCache, + NDCHistoryResender: s.ndcHistoryResender, + MetricsHandler: s.metricsHandler, + Logger: s.logger, + }, + rand.Int63(), + time.Unix(0, rand.Int63()), + ) +} + +func (s *executableNoopTaskSuite) TearDownTest() { + s.controller.Finish() +} + +func (s *executableNoopTaskSuite) TestExecute() { + err := s.task.Execute() + s.NoError(err) +} + +func (s *executableNoopTaskSuite) TestHandleErr() { + err := errors.New("OwO") + s.Equal(err, s.task.HandleErr(err)) + + err = serviceerror.NewUnavailable("") + s.Equal(err, s.task.HandleErr(err)) +} diff --git a/service/history/replication/executable_task.go b/service/history/replication/executable_task.go new file mode 100644 index 00000000000..57c6c7433c8 --- /dev/null +++ b/service/history/replication/executable_task.go @@ -0,0 +1,349 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/common" + "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/headers" + "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + serviceerrors "go.temporal.io/server/common/serviceerror" + ctasks "go.temporal.io/server/common/tasks" +) + +//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination executable_task_mock.go + +const ( + taskStatePending = int32(ctasks.TaskStatePending) + + taskStateCancelled = int32(ctasks.TaskStateCancelled) + taskStateAcked = int32(ctasks.TaskStateAcked) + taskStateNacked = int32(ctasks.TaskStateNacked) +) + +var ( + TaskRetryPolicy = backoff.NewExponentialRetryPolicy(1 * time.Second). + WithBackoffCoefficient(1.2). + WithMaximumInterval(5 * time.Second). + WithMaximumAttempts(80). + WithExpirationInterval(5 * time.Minute) +) + +type ( + ExecutableTask interface { + TaskID() int64 + TaskCreationTime() time.Time + Ack() + Nack(err error) + Cancel() + Reschedule() + IsRetryableError(err error) bool + RetryPolicy() backoff.RetryPolicy + State() ctasks.State + Attempt() int + Resend( + ctx context.Context, + remoteCluster string, + retryErr *serviceerrors.RetryReplication, + ) error + DeleteWorkflow( + ctx context.Context, + workflowKey definition.WorkflowKey, + ) (retError error) + GetNamespaceInfo( + namespaceID string, + ) (string, bool, error) + } + ExecutableTaskImpl struct { + ProcessToolBox + + // immutable data + taskID int64 + metricsTag string + taskCreationTime time.Time + taskReceivedTime time.Time + + // mutable data + taskState int32 + attempt int32 + } +) + +func NewExecutableTask( + processToolBox ProcessToolBox, + taskID int64, + metricsTag string, + taskCreationTime time.Time, + taskReceivedTime time.Time, +) *ExecutableTaskImpl { + return &ExecutableTaskImpl{ + ProcessToolBox: processToolBox, + taskID: taskID, + metricsTag: metricsTag, + taskCreationTime: taskCreationTime, + taskReceivedTime: taskReceivedTime, + + taskState: taskStatePending, + attempt: 1, + } +} + +func (e *ExecutableTaskImpl) TaskID() int64 { + return e.taskID +} + +func (e *ExecutableTaskImpl) TaskCreationTime() time.Time { + return e.taskCreationTime +} + +func (e *ExecutableTaskImpl) Ack() { + if atomic.LoadInt32(&e.taskState) != taskStatePending { + e.Logger.Error(fmt.Sprintf( + "replication task: %v encountered concurrent completion event", + e.taskID, + )) + return + } + if !atomic.CompareAndSwapInt32(&e.taskState, taskStatePending, taskStateAcked) { + e.Ack() // retry ack + } + + now := time.Now().UTC() + e.emitFinishMetrics(now) +} + +func (e *ExecutableTaskImpl) Nack(err error) { + if atomic.LoadInt32(&e.taskState) != taskStatePending { + e.Logger.Error(fmt.Sprintf( + "replication task: %v encountered concurrent completion event", + e.taskID, + )) + return + } + if !atomic.CompareAndSwapInt32(&e.taskState, taskStatePending, taskStateNacked) { + e.Nack(err) // retry nack + } + + e.Logger.Error(fmt.Sprintf( + "replication task: %v encountered nack event", + e.taskID, + ), tag.Error(err)) + now := time.Now().UTC() + e.emitFinishMetrics(now) +} + +func (e *ExecutableTaskImpl) Cancel() { + if atomic.LoadInt32(&e.taskState) != taskStatePending { + e.Logger.Error(fmt.Sprintf( + "replication task: %v encountered concurrent completion event", + e.taskID, + )) + return + } + if !atomic.CompareAndSwapInt32(&e.taskState, taskStatePending, taskStateCancelled) { + e.Cancel() // retry cancel + } + + e.Logger.Info(fmt.Sprintf( + "replication task: %v encountered cancellation event", + e.taskID, + )) + now := time.Now().UTC() + e.emitFinishMetrics(now) +} + +func (e *ExecutableTaskImpl) Reschedule() { + taskState := atomic.LoadInt32(&e.taskState) + if taskState != taskStatePending { + e.Logger.Error(fmt.Sprintf( + "replication task: %v encountered concurrent completion event", + e.taskID, + )) + return + } + + atomic.AddInt32(&e.attempt, 1) +} + +func (e *ExecutableTaskImpl) IsRetryableError(err error) bool { + switch err.(type) { + case *serviceerror.InvalidArgument: + return false + default: + return true + } +} + +func (e *ExecutableTaskImpl) RetryPolicy() backoff.RetryPolicy { + return TaskRetryPolicy +} + +func (e *ExecutableTaskImpl) State() ctasks.State { + return ctasks.State(atomic.LoadInt32(&e.taskState)) +} + +func (e *ExecutableTaskImpl) Attempt() int { + return int(atomic.LoadInt32(&e.attempt)) +} + +func (e *ExecutableTaskImpl) emitFinishMetrics( + now time.Time, +) { + e.MetricsHandler.Timer(metrics.ServiceLatency.GetMetricName()).Record( + now.Sub(e.taskReceivedTime), + metrics.OperationTag(e.metricsTag), + ) + e.MetricsHandler.Timer(metrics.ServiceLatency.GetMetricName()).Record( + e.taskReceivedTime.Sub(e.taskCreationTime), + metrics.OperationTag(e.metricsTag), + ) + // TODO consider emit attempt metrics +} + +func (e *ExecutableTaskImpl) Resend( + ctx context.Context, + remoteCluster string, + retryErr *serviceerrors.RetryReplication, +) error { + e.MetricsHandler.Counter(metrics.ClientRequests.GetMetricName()).Record( + 1, + metrics.OperationTag(e.metricsTag+"Resend"), + ) + startTime := time.Now().UTC() + defer func() { + e.MetricsHandler.Timer(metrics.ClientLatency.GetMetricName()).Record( + time.Since(startTime), + metrics.OperationTag(e.metricsTag+"Resend"), + ) + }() + + resendErr := e.ProcessToolBox.NDCHistoryResender.SendSingleWorkflowHistory( + ctx, + remoteCluster, + namespace.ID(retryErr.NamespaceId), + retryErr.WorkflowId, + retryErr.RunId, + retryErr.StartEventId, + retryErr.StartEventVersion, + retryErr.EndEventId, + retryErr.EndEventVersion, + ) + switch resendErr.(type) { + case nil: + // no-op + return nil + case *serviceerror.NotFound: + e.Logger.Error( + "workflow not found in source cluster, proceed to cleanup", + tag.WorkflowNamespaceID(retryErr.NamespaceId), + tag.WorkflowID(retryErr.WorkflowId), + tag.WorkflowRunID(retryErr.RunId), + ) + // workflow is not found in source cluster, cleanup workflow in target cluster + return e.DeleteWorkflow( + ctx, + definition.NewWorkflowKey( + retryErr.NamespaceId, + retryErr.WorkflowId, + retryErr.RunId, + ), + ) + default: + e.Logger.Error("error resend history for history event", tag.Error(resendErr)) + return resendErr + } +} + +func (e *ExecutableTaskImpl) DeleteWorkflow( + ctx context.Context, + workflowKey definition.WorkflowKey, +) (retError error) { + shardContext, err := e.ShardController.GetShardByNamespaceWorkflow( + namespace.ID(workflowKey.NamespaceID), + workflowKey.WorkflowID, + ) + if err != nil { + return err + } + engine, err := shardContext.GetEngine(ctx) + if err != nil { + return err + } + _, err = engine.DeleteWorkflowExecution(ctx, &historyservice.DeleteWorkflowExecutionRequest{ + NamespaceId: workflowKey.NamespaceID, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: workflowKey.WorkflowID, + RunId: workflowKey.RunID, + }, + WorkflowVersion: common.EmptyVersion, + ClosedWorkflowOnly: false, + }) + return err +} + +func (e *ExecutableTaskImpl) GetNamespaceInfo( + namespaceID string, +) (string, bool, error) { + namespaceEntry, err := e.NamespaceCache.GetNamespaceByID(namespace.ID(namespaceID)) + switch err.(type) { + case nil: + shouldProcessTask := false + FilterLoop: + for _, targetCluster := range namespaceEntry.ClusterNames() { + if e.ClusterMetadata.GetCurrentClusterName() == targetCluster { + shouldProcessTask = true + break FilterLoop + } + } + return string(namespaceEntry.Name()), shouldProcessTask, nil + case *serviceerror.NamespaceNotFound: + return "", false, nil + default: + return "", false, err + } +} + +func newTaskContext( + namespaceName string, +) (context.Context, context.CancelFunc) { + ctx := headers.SetCallerInfo( + context.Background(), + headers.SystemPreemptableCallerInfo, + ) + ctx = headers.SetCallerName(ctx, namespaceName) + return context.WithTimeout(ctx, replicationTimeout) +} diff --git a/service/history/replication/executable_task_initializer.go b/service/history/replication/executable_task_initializer.go new file mode 100644 index 00000000000..ba88915edad --- /dev/null +++ b/service/history/replication/executable_task_initializer.go @@ -0,0 +1,126 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "fmt" + "time" + + "go.uber.org/fx" + + enumsspb "go.temporal.io/server/api/enums/v1" + replicationspb "go.temporal.io/server/api/replication/v1" + "go.temporal.io/server/client" + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/xdc" + "go.temporal.io/server/service/history/shard" +) + +type ( + ProcessToolBox struct { + fx.In + + ClusterMetadata cluster.Metadata + ClientBean client.Bean + ShardController shard.Controller + NamespaceCache namespace.Registry + NDCHistoryResender xdc.NDCHistoryResender + MetricsHandler metrics.Handler + Logger log.Logger + } +) + +func (i *ProcessToolBox) ConvertTasks( + sourceClusterName string, + replicationTasks ...*replicationspb.ReplicationTask, +) []TrackableExecutableTask { + tasks := make([]TrackableExecutableTask, len(replicationTasks)) + for _, replicationTask := range replicationTasks { + tasks = append(tasks, i.convertOne(sourceClusterName, replicationTask)) + } + return tasks +} + +func (i *ProcessToolBox) convertOne( + sourceClusterName string, + replicationTask *replicationspb.ReplicationTask, +) TrackableExecutableTask { + var taskCreationTime time.Time + if replicationTask.VisibilityTime != nil { + taskCreationTime = *replicationTask.VisibilityTime + } else { + taskCreationTime = time.Now().UTC() + } + + switch replicationTask.GetTaskType() { + case enumsspb.REPLICATION_TASK_TYPE_SYNC_SHARD_STATUS_TASK: // TODO to be deprecated + return NewExecutableNoopTask( + *i, + replicationTask.SourceTaskId, + taskCreationTime, + ) + case enumsspb.REPLICATION_TASK_TYPE_HISTORY_METADATA_TASK: // TODO to be deprecated + return NewExecutableNoopTask( + *i, + replicationTask.SourceTaskId, + taskCreationTime, + ) + case enumsspb.REPLICATION_TASK_TYPE_SYNC_ACTIVITY_TASK: + return NewExecutableActivityStateTask( + *i, + replicationTask.SourceTaskId, + taskCreationTime, + replicationTask.GetSyncActivityTaskAttributes(), + sourceClusterName, + ) + case enumsspb.REPLICATION_TASK_TYPE_SYNC_WORKFLOW_STATE_TASK: + return NewExecutableWorkflowStateTask( + *i, + replicationTask.SourceTaskId, + taskCreationTime, + replicationTask.GetSyncWorkflowStateTaskAttributes(), + sourceClusterName, + ) + case enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK: + return NewExecutableHistoryTask( + *i, + replicationTask.SourceTaskId, + taskCreationTime, + replicationTask.GetHistoryTaskAttributes(), + sourceClusterName, + ) + default: + i.Logger.Error(fmt.Sprintf("unknown replication task: %v", replicationTask)) + return NewExecutableUnknownTask( + *i, + replicationTask.SourceTaskId, + taskCreationTime, + replicationTask, + ) + } +} diff --git a/service/history/replication/executable_task_mock.go b/service/history/replication/executable_task_mock.go new file mode 100644 index 00000000000..63a6628f305 --- /dev/null +++ b/service/history/replication/executable_task_mock.go @@ -0,0 +1,240 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: executable_task.go + +// Package replication is a generated GoMock package. +package replication + +import ( + context "context" + reflect "reflect" + time "time" + + gomock "github.com/golang/mock/gomock" + backoff "go.temporal.io/server/common/backoff" + definition "go.temporal.io/server/common/definition" + serviceerror "go.temporal.io/server/common/serviceerror" + tasks "go.temporal.io/server/common/tasks" +) + +// MockExecutableTask is a mock of ExecutableTask interface. +type MockExecutableTask struct { + ctrl *gomock.Controller + recorder *MockExecutableTaskMockRecorder +} + +// MockExecutableTaskMockRecorder is the mock recorder for MockExecutableTask. +type MockExecutableTaskMockRecorder struct { + mock *MockExecutableTask +} + +// NewMockExecutableTask creates a new mock instance. +func NewMockExecutableTask(ctrl *gomock.Controller) *MockExecutableTask { + mock := &MockExecutableTask{ctrl: ctrl} + mock.recorder = &MockExecutableTaskMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockExecutableTask) EXPECT() *MockExecutableTaskMockRecorder { + return m.recorder +} + +// Ack mocks base method. +func (m *MockExecutableTask) Ack() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Ack") +} + +// Ack indicates an expected call of Ack. +func (mr *MockExecutableTaskMockRecorder) Ack() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ack", reflect.TypeOf((*MockExecutableTask)(nil).Ack)) +} + +// Attempt mocks base method. +func (m *MockExecutableTask) Attempt() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Attempt") + ret0, _ := ret[0].(int) + return ret0 +} + +// Attempt indicates an expected call of Attempt. +func (mr *MockExecutableTaskMockRecorder) Attempt() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Attempt", reflect.TypeOf((*MockExecutableTask)(nil).Attempt)) +} + +// Cancel mocks base method. +func (m *MockExecutableTask) Cancel() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Cancel") +} + +// Cancel indicates an expected call of Cancel. +func (mr *MockExecutableTaskMockRecorder) Cancel() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cancel", reflect.TypeOf((*MockExecutableTask)(nil).Cancel)) +} + +// DeleteWorkflow mocks base method. +func (m *MockExecutableTask) DeleteWorkflow(ctx context.Context, workflowKey definition.WorkflowKey) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteWorkflow", ctx, workflowKey) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteWorkflow indicates an expected call of DeleteWorkflow. +func (mr *MockExecutableTaskMockRecorder) DeleteWorkflow(ctx, workflowKey interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorkflow", reflect.TypeOf((*MockExecutableTask)(nil).DeleteWorkflow), ctx, workflowKey) +} + +// GetNamespaceInfo mocks base method. +func (m *MockExecutableTask) GetNamespaceInfo(namespaceID string) (string, bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetNamespaceInfo", namespaceID) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(bool) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetNamespaceInfo indicates an expected call of GetNamespaceInfo. +func (mr *MockExecutableTaskMockRecorder) GetNamespaceInfo(namespaceID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNamespaceInfo", reflect.TypeOf((*MockExecutableTask)(nil).GetNamespaceInfo), namespaceID) +} + +// IsRetryableError mocks base method. +func (m *MockExecutableTask) IsRetryableError(err error) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsRetryableError", err) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsRetryableError indicates an expected call of IsRetryableError. +func (mr *MockExecutableTaskMockRecorder) IsRetryableError(err interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsRetryableError", reflect.TypeOf((*MockExecutableTask)(nil).IsRetryableError), err) +} + +// Nack mocks base method. +func (m *MockExecutableTask) Nack(err error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Nack", err) +} + +// Nack indicates an expected call of Nack. +func (mr *MockExecutableTaskMockRecorder) Nack(err interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Nack", reflect.TypeOf((*MockExecutableTask)(nil).Nack), err) +} + +// Reschedule mocks base method. +func (m *MockExecutableTask) Reschedule() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Reschedule") +} + +// Reschedule indicates an expected call of Reschedule. +func (mr *MockExecutableTaskMockRecorder) Reschedule() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reschedule", reflect.TypeOf((*MockExecutableTask)(nil).Reschedule)) +} + +// Resend mocks base method. +func (m *MockExecutableTask) Resend(ctx context.Context, remoteCluster string, retryErr *serviceerror.RetryReplication) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Resend", ctx, remoteCluster, retryErr) + ret0, _ := ret[0].(error) + return ret0 +} + +// Resend indicates an expected call of Resend. +func (mr *MockExecutableTaskMockRecorder) Resend(ctx, remoteCluster, retryErr interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Resend", reflect.TypeOf((*MockExecutableTask)(nil).Resend), ctx, remoteCluster, retryErr) +} + +// RetryPolicy mocks base method. +func (m *MockExecutableTask) RetryPolicy() backoff.RetryPolicy { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RetryPolicy") + ret0, _ := ret[0].(backoff.RetryPolicy) + return ret0 +} + +// RetryPolicy indicates an expected call of RetryPolicy. +func (mr *MockExecutableTaskMockRecorder) RetryPolicy() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RetryPolicy", reflect.TypeOf((*MockExecutableTask)(nil).RetryPolicy)) +} + +// State mocks base method. +func (m *MockExecutableTask) State() tasks.State { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "State") + ret0, _ := ret[0].(tasks.State) + return ret0 +} + +// State indicates an expected call of State. +func (mr *MockExecutableTaskMockRecorder) State() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "State", reflect.TypeOf((*MockExecutableTask)(nil).State)) +} + +// TaskCreationTime mocks base method. +func (m *MockExecutableTask) TaskCreationTime() time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TaskCreationTime") + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// TaskCreationTime indicates an expected call of TaskCreationTime. +func (mr *MockExecutableTaskMockRecorder) TaskCreationTime() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TaskCreationTime", reflect.TypeOf((*MockExecutableTask)(nil).TaskCreationTime)) +} + +// TaskID mocks base method. +func (m *MockExecutableTask) TaskID() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TaskID") + ret0, _ := ret[0].(int64) + return ret0 +} + +// TaskID indicates an expected call of TaskID. +func (mr *MockExecutableTaskMockRecorder) TaskID() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TaskID", reflect.TypeOf((*MockExecutableTask)(nil).TaskID)) +} diff --git a/service/history/replication/executable_task_test.go b/service/history/replication/executable_task_test.go new file mode 100644 index 00000000000..65c36d79a6a --- /dev/null +++ b/service/history/replication/executable_task_test.go @@ -0,0 +1,363 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "context" + "errors" + "math/rand" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/api/historyservice/v1" + persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/client" + "go.temporal.io/server/common" + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/persistence" + serviceerrors "go.temporal.io/server/common/serviceerror" + ctasks "go.temporal.io/server/common/tasks" + "go.temporal.io/server/common/xdc" + "go.temporal.io/server/service/history/shard" +) + +type ( + executableTaskSuite struct { + suite.Suite + *require.Assertions + + controller *gomock.Controller + clusterMetadata *cluster.MockMetadata + clientBean *client.MockBean + shardController *shard.MockController + namespaceCache *namespace.MockRegistry + ndcHistoryResender *xdc.MockNDCHistoryResender + metricsHandler metrics.Handler + logger log.Logger + + task *ExecutableTaskImpl + } +) + +func TestExecutableTaskSuite(t *testing.T) { + s := new(executableTaskSuite) + suite.Run(t, s) +} + +func (s *executableTaskSuite) SetupSuite() { + s.Assertions = require.New(s.T()) +} + +func (s *executableTaskSuite) TearDownSuite() { + +} + +func (s *executableTaskSuite) SetupTest() { + s.controller = gomock.NewController(s.T()) + s.clusterMetadata = cluster.NewMockMetadata(s.controller) + s.clientBean = client.NewMockBean(s.controller) + s.shardController = shard.NewMockController(s.controller) + s.namespaceCache = namespace.NewMockRegistry(s.controller) + s.ndcHistoryResender = xdc.NewMockNDCHistoryResender(s.controller) + s.metricsHandler = metrics.NoopMetricsHandler + s.logger = log.NewNoopLogger() + + creationTime := time.Unix(0, rand.Int63()) + receivedTime := creationTime.Add(time.Duration(rand.Int63())) + s.task = NewExecutableTask( + ProcessToolBox{ + ClusterMetadata: s.clusterMetadata, + ClientBean: s.clientBean, + ShardController: s.shardController, + NamespaceCache: s.namespaceCache, + NDCHistoryResender: s.ndcHistoryResender, + MetricsHandler: s.metricsHandler, + Logger: s.logger, + }, + rand.Int63(), + "metrics-tag", + creationTime, + receivedTime, + ) +} + +func (s *executableTaskSuite) TearDownTest() { + s.controller.Finish() +} + +func (s *executableTaskSuite) TestTaskID() { + s.Equal(s.task.taskID, s.task.TaskID()) +} + +func (s *executableTaskSuite) TestCreationTime() { + s.Equal(s.task.taskCreationTime, s.task.TaskCreationTime()) +} + +func (s *executableTaskSuite) TestAckStateAttempt() { + s.Equal(ctasks.TaskStatePending, s.task.State()) + + s.task.Ack() + s.Equal(ctasks.TaskStateAcked, s.task.State()) + s.Equal(1, s.task.Attempt()) + + s.task.Nack(nil) + s.Equal(ctasks.TaskStateAcked, s.task.State()) + s.Equal(1, s.task.Attempt()) + s.task.Cancel() + s.Equal(ctasks.TaskStateAcked, s.task.State()) + s.Equal(1, s.task.Attempt()) + s.task.Reschedule() + s.Equal(ctasks.TaskStateAcked, s.task.State()) + s.Equal(1, s.task.Attempt()) +} + +func (s *executableTaskSuite) TestNackStateAttempt() { + s.Equal(ctasks.TaskStatePending, s.task.State()) + + s.task.Nack(nil) + s.Equal(ctasks.TaskStateNacked, s.task.State()) + s.Equal(1, s.task.Attempt()) + + s.task.Ack() + s.Equal(ctasks.TaskStateNacked, s.task.State()) + s.Equal(1, s.task.Attempt()) + s.task.Cancel() + s.Equal(ctasks.TaskStateNacked, s.task.State()) + s.Equal(1, s.task.Attempt()) + s.task.Reschedule() + s.Equal(ctasks.TaskStateNacked, s.task.State()) + s.Equal(1, s.task.Attempt()) +} + +func (s *executableTaskSuite) TestCancelStateAttempt() { + s.Equal(ctasks.TaskStatePending, s.task.State()) + + s.task.Cancel() + s.Equal(ctasks.TaskStateCancelled, s.task.State()) + s.Equal(1, s.task.Attempt()) + + s.task.Ack() + s.Equal(ctasks.TaskStateCancelled, s.task.State()) + s.Equal(1, s.task.Attempt()) + s.task.Nack(nil) + s.Equal(ctasks.TaskStateCancelled, s.task.State()) + s.Equal(1, s.task.Attempt()) +} + +func (s *executableTaskSuite) TestRescheduleStateAttempt() { + s.Equal(ctasks.TaskStatePending, s.task.State()) + + s.task.Reschedule() + s.Equal(ctasks.TaskStatePending, s.task.State()) + s.Equal(2, s.task.Attempt()) +} + +func (s *executableTaskSuite) TestIsRetryableError() { + err := errors.New("OwO") + s.True(s.task.IsRetryableError(err)) + + err = serviceerror.NewInternal("OwO") + s.True(s.task.IsRetryableError(err)) + + err = serviceerror.NewUnavailable("OwO") + s.True(s.task.IsRetryableError(err)) + + err = serviceerror.NewInvalidArgument("OwO") + s.False(s.task.IsRetryableError(err)) +} + +func (s *executableTaskSuite) TestResend_Success() { + remoteCluster := cluster.TestAlternativeClusterName + resendErr := &serviceerrors.RetryReplication{ + NamespaceId: uuid.NewString(), + WorkflowId: uuid.NewString(), + RunId: uuid.NewString(), + StartEventId: rand.Int63(), + StartEventVersion: rand.Int63(), + EndEventId: rand.Int63(), + EndEventVersion: rand.Int63(), + } + + s.ndcHistoryResender.EXPECT().SendSingleWorkflowHistory( + gomock.Any(), + remoteCluster, + namespace.ID(resendErr.NamespaceId), + resendErr.WorkflowId, + resendErr.RunId, + resendErr.StartEventId, + resendErr.StartEventVersion, + resendErr.EndEventId, + resendErr.EndEventVersion, + ).Return(nil) + + err := s.task.Resend(context.Background(), remoteCluster, resendErr) + s.NoError(err) +} + +func (s *executableTaskSuite) TestResend_NotFound() { + remoteCluster := cluster.TestAlternativeClusterName + resendErr := &serviceerrors.RetryReplication{ + NamespaceId: uuid.NewString(), + WorkflowId: uuid.NewString(), + RunId: uuid.NewString(), + StartEventId: rand.Int63(), + StartEventVersion: rand.Int63(), + EndEventId: rand.Int63(), + EndEventVersion: rand.Int63(), + } + + s.ndcHistoryResender.EXPECT().SendSingleWorkflowHistory( + gomock.Any(), + remoteCluster, + namespace.ID(resendErr.NamespaceId), + resendErr.WorkflowId, + resendErr.RunId, + resendErr.StartEventId, + resendErr.StartEventVersion, + resendErr.EndEventId, + resendErr.EndEventVersion, + ).Return(serviceerror.NewNotFound("")) + shardContext := shard.NewMockContext(s.controller) + engine := shard.NewMockEngine(s.controller) + s.shardController.EXPECT().GetShardByNamespaceWorkflow( + namespace.ID(resendErr.NamespaceId), + resendErr.WorkflowId, + ).Return(shardContext, nil).AnyTimes() + shardContext.EXPECT().GetEngine(gomock.Any()).Return(engine, nil).AnyTimes() + engine.EXPECT().DeleteWorkflowExecution(gomock.Any(), &historyservice.DeleteWorkflowExecutionRequest{ + NamespaceId: resendErr.NamespaceId, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: resendErr.WorkflowId, + RunId: resendErr.RunId, + }, + WorkflowVersion: common.EmptyVersion, + ClosedWorkflowOnly: false, + }).Return(&historyservice.DeleteWorkflowExecutionResponse{}, nil) + + err := s.task.Resend(context.Background(), remoteCluster, resendErr) + s.NoError(err) +} + +func (s *executableTaskSuite) TestResend_Error() { + remoteCluster := cluster.TestAlternativeClusterName + resendErr := &serviceerrors.RetryReplication{ + NamespaceId: uuid.NewString(), + WorkflowId: uuid.NewString(), + RunId: uuid.NewString(), + StartEventId: rand.Int63(), + StartEventVersion: rand.Int63(), + EndEventId: rand.Int63(), + EndEventVersion: rand.Int63(), + } + + s.ndcHistoryResender.EXPECT().SendSingleWorkflowHistory( + gomock.Any(), + remoteCluster, + namespace.ID(resendErr.NamespaceId), + resendErr.WorkflowId, + resendErr.RunId, + resendErr.StartEventId, + resendErr.StartEventVersion, + resendErr.EndEventId, + resendErr.EndEventVersion, + ).Return(serviceerror.NewUnavailable("")) + + err := s.task.Resend(context.Background(), remoteCluster, resendErr) + s.Error(err) +} + +func (s *executableTaskSuite) TestGetNamespaceInfo_Process() { + namespaceID := uuid.NewString() + namespaceName := uuid.NewString() + namespaceEntry := namespace.FromPersistentState(&persistence.GetNamespaceResponse{ + Namespace: &persistencespb.NamespaceDetail{ + Info: &persistencespb.NamespaceInfo{ + Id: namespaceID, + Name: namespaceName, + }, + Config: &persistencespb.NamespaceConfig{}, + ReplicationConfig: &persistencespb.NamespaceReplicationConfig{ + ActiveClusterName: cluster.TestAlternativeClusterName, + Clusters: []string{ + cluster.TestCurrentClusterName, + cluster.TestAlternativeClusterName, + }, + }, + }, + }) + s.namespaceCache.EXPECT().GetNamespaceByID(namespace.ID(namespaceID)).Return(namespaceEntry, nil).AnyTimes() + s.clusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() + + name, toProcess, err := s.task.GetNamespaceInfo(namespaceID) + s.NoError(err) + s.Equal(namespaceName, name) + s.True(toProcess) +} + +func (s *executableTaskSuite) TestGetNamespaceInfo_Skip() { + namespaceID := uuid.NewString() + namespaceName := uuid.NewString() + namespaceEntry := namespace.FromPersistentState(&persistence.GetNamespaceResponse{ + Namespace: &persistencespb.NamespaceDetail{ + Info: &persistencespb.NamespaceInfo{ + Id: namespaceID, + Name: namespaceName, + }, + Config: &persistencespb.NamespaceConfig{}, + ReplicationConfig: &persistencespb.NamespaceReplicationConfig{ + ActiveClusterName: cluster.TestAlternativeClusterName, + Clusters: []string{ + cluster.TestAlternativeClusterName, + }, + }, + }, + }) + s.namespaceCache.EXPECT().GetNamespaceByID(namespace.ID(namespaceID)).Return(namespaceEntry, nil).AnyTimes() + s.clusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() + + name, toProcess, err := s.task.GetNamespaceInfo(namespaceID) + s.NoError(err) + s.Equal(namespaceName, name) + s.False(toProcess) +} + +func (s *executableTaskSuite) TestGetNamespaceInfo_Error() { + namespaceID := uuid.NewString() + s.namespaceCache.EXPECT().GetNamespaceByID(namespace.ID(namespaceID)).Return(nil, errors.New("OwO")).AnyTimes() + s.clusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() + + _, _, err := s.task.GetNamespaceInfo(namespaceID) + s.Error(err) +} diff --git a/service/history/replication/executable_task_tracker.go b/service/history/replication/executable_task_tracker.go new file mode 100644 index 00000000000..51a8785a531 --- /dev/null +++ b/service/history/replication/executable_task_tracker.go @@ -0,0 +1,141 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "container/list" + "fmt" + "sync" + "time" + + "go.temporal.io/server/common/log" + ctasks "go.temporal.io/server/common/tasks" +) + +type ( + TrackableExecutableTask interface { + ctasks.Task + TaskID() int64 + TaskCreationTime() time.Time + } + WatermarkInfo struct { + Watermark int64 + Timestamp time.Time + } + ExecutableTaskTracker interface { + TrackTasks(highWatermarkInfo WatermarkInfo, tasks ...TrackableExecutableTask) + LowWatermark() *WatermarkInfo + } + ExecutableTaskTrackerImpl struct { + logger log.Logger + + sync.Mutex + highWatermarkInfo *WatermarkInfo + taskQueue *list.List // sorted by task ID + } +) + +var _ ExecutableTaskTracker = (*ExecutableTaskTrackerImpl)(nil) + +func NewExecutableTaskTracker( + logger log.Logger, +) *ExecutableTaskTrackerImpl { + return &ExecutableTaskTrackerImpl{ + logger: logger, + + highWatermarkInfo: nil, + taskQueue: list.New(), + } +} + +func (t *ExecutableTaskTrackerImpl) TrackTasks( + highWatermarkInfo WatermarkInfo, + tasks ...TrackableExecutableTask, +) { + t.Lock() + defer t.Unlock() + + lastTaskID := int64(0) + if item := t.taskQueue.Back(); item != nil { + lastTaskID = item.Value.(TrackableExecutableTask).TaskID() + } + for _, task := range tasks { + if lastTaskID >= task.TaskID() { + panic(fmt.Sprintf( + "ExecutableTaskTracker encountered out of order task, ID: %v", + task.TaskID(), + )) + } + t.taskQueue.PushBack(task) + } + + if t.highWatermarkInfo != nil && highWatermarkInfo.Watermark < t.highWatermarkInfo.Watermark { + panic(fmt.Sprintf( + "ExecutableTaskTracker encountered lower high watermark: %v < %v", + highWatermarkInfo.Watermark, + t.highWatermarkInfo.Watermark, + )) + } + t.highWatermarkInfo = &highWatermarkInfo +} + +func (t *ExecutableTaskTrackerImpl) LowWatermark() *WatermarkInfo { + t.Lock() + defer t.Unlock() + + for element := t.taskQueue.Front(); element != nil; element = element.Next() { + task := element.Value.(TrackableExecutableTask) + taskState := task.State() + switch taskState { + case ctasks.TaskStateAcked: + t.taskQueue.Remove(element) + case ctasks.TaskStateNacked: + // TODO put to DLQ, only after <- is successful, then remove from tracker + panic("implement me") + case ctasks.TaskStateCancelled: + // noop, do not remove from queue, let it block low watermark + case ctasks.TaskStatePending: + // noop, do not remove from queue, let it block low watermark + default: + panic(fmt.Sprintf( + "ExecutableTaskTracker encountered unknown task state: %v", + taskState, + )) + } + } + + if element := t.taskQueue.Front(); element != nil { + lowWatermarkInfo := WatermarkInfo{ + Watermark: element.Value.(TrackableExecutableTask).TaskID(), + Timestamp: element.Value.(TrackableExecutableTask).TaskCreationTime(), + } + return &lowWatermarkInfo + } else if t.highWatermarkInfo != nil { + lowWatermarkInfo := *t.highWatermarkInfo + return &lowWatermarkInfo + } else { + return nil + } +} diff --git a/service/history/replication/executable_unknown_task.go b/service/history/replication/executable_unknown_task.go new file mode 100644 index 00000000000..0bef8992682 --- /dev/null +++ b/service/history/replication/executable_unknown_task.go @@ -0,0 +1,77 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "fmt" + "time" + + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/common/metrics" + ctasks "go.temporal.io/server/common/tasks" +) + +type ( + ExecutableUnknownTask struct { + ExecutableTask + task any + } +) + +var _ ctasks.Task = (*ExecutableUnknownTask)(nil) +var _ TrackableExecutableTask = (*ExecutableUnknownTask)(nil) + +func NewExecutableUnknownTask( + processToolBox ProcessToolBox, + taskID int64, + taskCreationTime time.Time, + task any, +) *ExecutableUnknownTask { + return &ExecutableUnknownTask{ + ExecutableTask: NewExecutableTask( + processToolBox, + taskID, + metrics.UnknownTaskScope, + taskCreationTime, + time.Now().UTC(), + ), + task: task, + } +} + +func (e *ExecutableUnknownTask) Execute() error { + return serviceerror.NewInvalidArgument( + fmt.Sprintf("unknown task, ID: %v, task: %v", e.TaskID(), e.task), + ) +} + +func (e *ExecutableUnknownTask) HandleErr(err error) error { + return err +} + +func (e *ExecutableUnknownTask) IsRetryableError(err error) bool { + return false +} diff --git a/service/history/replication/executable_unknown_task_test.go b/service/history/replication/executable_unknown_task_test.go new file mode 100644 index 00000000000..901bbd90212 --- /dev/null +++ b/service/history/replication/executable_unknown_task_test.go @@ -0,0 +1,119 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "errors" + "math/rand" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/client" + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/xdc" + "go.temporal.io/server/service/history/shard" +) + +type ( + executableUnknownTaskSuite struct { + suite.Suite + *require.Assertions + + controller *gomock.Controller + clusterMetadata *cluster.MockMetadata + clientBean *client.MockBean + shardController *shard.MockController + namespaceCache *namespace.MockRegistry + ndcHistoryResender *xdc.MockNDCHistoryResender + metricsHandler metrics.Handler + logger log.Logger + + task *ExecutableUnknownTask + } +) + +func TestExecutableUnknownTaskSuite(t *testing.T) { + s := new(executableUnknownTaskSuite) + suite.Run(t, s) +} + +func (s *executableUnknownTaskSuite) SetupSuite() { + s.Assertions = require.New(s.T()) +} + +func (s *executableUnknownTaskSuite) TearDownSuite() { + +} + +func (s *executableUnknownTaskSuite) SetupTest() { + s.controller = gomock.NewController(s.T()) + s.clusterMetadata = cluster.NewMockMetadata(s.controller) + s.clientBean = client.NewMockBean(s.controller) + s.shardController = shard.NewMockController(s.controller) + s.namespaceCache = namespace.NewMockRegistry(s.controller) + s.ndcHistoryResender = xdc.NewMockNDCHistoryResender(s.controller) + s.metricsHandler = metrics.NoopMetricsHandler + s.logger = log.NewNoopLogger() + + s.task = NewExecutableUnknownTask( + ProcessToolBox{ + ClusterMetadata: s.clusterMetadata, + ClientBean: s.clientBean, + ShardController: s.shardController, + NamespaceCache: s.namespaceCache, + NDCHistoryResender: s.ndcHistoryResender, + MetricsHandler: s.metricsHandler, + Logger: s.logger, + }, + rand.Int63(), + time.Unix(0, rand.Int63()), + nil, + ) +} + +func (s *executableUnknownTaskSuite) TearDownTest() { + s.controller.Finish() +} + +func (s *executableUnknownTaskSuite) TestExecute() { + err := s.task.Execute() + s.IsType(serviceerror.NewInvalidArgument(""), err) +} + +func (s *executableUnknownTaskSuite) TestHandleErr() { + err := errors.New("OwO") + s.Equal(err, s.task.HandleErr(err)) + + err = serviceerror.NewUnavailable("") + s.Equal(err, s.task.HandleErr(err)) +} diff --git a/service/history/replication/executable_workflow_state_task.go b/service/history/replication/executable_workflow_state_task.go new file mode 100644 index 00000000000..824c7de0384 --- /dev/null +++ b/service/history/replication/executable_workflow_state_task.go @@ -0,0 +1,121 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "time" + + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/api/historyservice/v1" + replicationspb "go.temporal.io/server/api/replication/v1" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + ctasks "go.temporal.io/server/common/tasks" +) + +type ( + ExecutableWorkflowStateTask struct { + ProcessToolBox + + definition.WorkflowKey + ExecutableTask + req *historyservice.ReplicateWorkflowStateRequest + + // variables to be perhaps removed (not essential to logic) + sourceClusterName string + } +) + +var _ ctasks.Task = (*ExecutableWorkflowStateTask)(nil) +var _ TrackableExecutableTask = (*ExecutableWorkflowStateTask)(nil) + +// TODO should workflow task be batched? + +func NewExecutableWorkflowStateTask( + processToolBox ProcessToolBox, + taskID int64, + taskCreationTime time.Time, + task *replicationspb.SyncWorkflowStateTaskAttributes, + sourceClusterName string, +) *ExecutableWorkflowStateTask { + namespaceID := task.GetWorkflowState().ExecutionInfo.NamespaceId + workflowID := task.GetWorkflowState().ExecutionInfo.WorkflowId + runID := task.GetWorkflowState().ExecutionState.RunId + return &ExecutableWorkflowStateTask{ + ProcessToolBox: processToolBox, + + WorkflowKey: definition.NewWorkflowKey(namespaceID, workflowID, runID), + ExecutableTask: NewExecutableTask( + processToolBox, + taskID, + metrics.SyncWorkflowStateTaskScope, + taskCreationTime, + time.Now().UTC(), + ), + req: &historyservice.ReplicateWorkflowStateRequest{ + NamespaceId: namespaceID, + WorkflowState: task.GetWorkflowState(), + RemoteCluster: sourceClusterName, + }, + + sourceClusterName: sourceClusterName, + } +} + +func (e *ExecutableWorkflowStateTask) Execute() error { + namespaceName, apply, err := e.GetNamespaceInfo(e.NamespaceID) + if err != nil { + return err + } else if !apply { + return nil + } + ctx, cancel := newTaskContext(namespaceName) + defer cancel() + + shardContext, err := e.ShardController.GetShardByNamespaceWorkflow( + namespace.ID(e.NamespaceID), + e.WorkflowID, + ) + if err != nil { + return err + } + engine, err := shardContext.GetEngine(ctx) + if err != nil { + return err + } + return engine.ReplicateWorkflowState(ctx, e.req) +} + +func (e *ExecutableWorkflowStateTask) HandleErr(err error) error { + // no resend is required + switch err.(type) { + case nil, *serviceerror.NotFound: + return nil + default: + return err + } +} diff --git a/service/history/replication/executable_workflow_state_task_test.go b/service/history/replication/executable_workflow_state_task_test.go new file mode 100644 index 00000000000..85a2af411c8 --- /dev/null +++ b/service/history/replication/executable_workflow_state_task_test.go @@ -0,0 +1,180 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "errors" + "math/rand" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/api/historyservice/v1" + persistencepb "go.temporal.io/server/api/persistence/v1" + replicationspb "go.temporal.io/server/api/replication/v1" + "go.temporal.io/server/client" + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/xdc" + "go.temporal.io/server/service/history/shard" +) + +type ( + executableWorkflowStateTaskSuite struct { + suite.Suite + *require.Assertions + + controller *gomock.Controller + clusterMetadata *cluster.MockMetadata + clientBean *client.MockBean + shardController *shard.MockController + namespaceCache *namespace.MockRegistry + ndcHistoryResender *xdc.MockNDCHistoryResender + metricsHandler metrics.Handler + logger log.Logger + executableTask *MockExecutableTask + + replicationTask *replicationspb.SyncWorkflowStateTaskAttributes + sourceClusterName string + + task *ExecutableWorkflowStateTask + } +) + +func TestExecutableWorkflowStateTaskSuite(t *testing.T) { + s := new(executableWorkflowStateTaskSuite) + suite.Run(t, s) +} + +func (s *executableWorkflowStateTaskSuite) SetupSuite() { + s.Assertions = require.New(s.T()) +} + +func (s *executableWorkflowStateTaskSuite) TearDownSuite() { + +} + +func (s *executableWorkflowStateTaskSuite) SetupTest() { + s.controller = gomock.NewController(s.T()) + s.clusterMetadata = cluster.NewMockMetadata(s.controller) + s.clientBean = client.NewMockBean(s.controller) + s.shardController = shard.NewMockController(s.controller) + s.namespaceCache = namespace.NewMockRegistry(s.controller) + s.ndcHistoryResender = xdc.NewMockNDCHistoryResender(s.controller) + s.metricsHandler = metrics.NoopMetricsHandler + s.logger = log.NewNoopLogger() + s.executableTask = NewMockExecutableTask(s.controller) + s.replicationTask = &replicationspb.SyncWorkflowStateTaskAttributes{ + WorkflowState: &persistencepb.WorkflowMutableState{ + ExecutionInfo: &persistencepb.WorkflowExecutionInfo{ + NamespaceId: uuid.NewString(), + WorkflowId: uuid.NewString(), + }, + ExecutionState: &persistencepb.WorkflowExecutionState{ + RunId: uuid.NewString(), + }, + }, + } + s.sourceClusterName = cluster.TestCurrentClusterName + + s.task = NewExecutableWorkflowStateTask( + ProcessToolBox{ + ClusterMetadata: s.clusterMetadata, + ClientBean: s.clientBean, + ShardController: s.shardController, + NamespaceCache: s.namespaceCache, + NDCHistoryResender: s.ndcHistoryResender, + MetricsHandler: s.metricsHandler, + Logger: s.logger, + }, + rand.Int63(), + time.Unix(0, rand.Int63()), + s.replicationTask, + s.sourceClusterName, + ) + s.task.ExecutableTask = s.executableTask +} + +func (s *executableWorkflowStateTaskSuite) TearDownTest() { + s.controller.Finish() +} + +func (s *executableWorkflowStateTaskSuite) TestExecute_Process() { + s.executableTask.EXPECT().GetNamespaceInfo(s.task.NamespaceID).Return( + uuid.NewString(), true, nil, + ).AnyTimes() + + shardContext := shard.NewMockContext(s.controller) + engine := shard.NewMockEngine(s.controller) + s.shardController.EXPECT().GetShardByNamespaceWorkflow( + namespace.ID(s.task.NamespaceID), + s.task.WorkflowID, + ).Return(shardContext, nil).AnyTimes() + shardContext.EXPECT().GetEngine(gomock.Any()).Return(engine, nil).AnyTimes() + engine.EXPECT().ReplicateWorkflowState(gomock.Any(), &historyservice.ReplicateWorkflowStateRequest{ + NamespaceId: s.task.NamespaceID, + WorkflowState: s.replicationTask.GetWorkflowState(), + RemoteCluster: s.sourceClusterName, + }).Return(nil) + + err := s.task.Execute() + s.NoError(err) +} + +func (s *executableWorkflowStateTaskSuite) TestExecute_Skip() { + s.executableTask.EXPECT().GetNamespaceInfo(s.task.NamespaceID).Return( + uuid.NewString(), false, nil, + ).AnyTimes() + + err := s.task.Execute() + s.NoError(err) +} + +func (s *executableWorkflowStateTaskSuite) TestExecute_Err() { + err := errors.New("OwO") + s.executableTask.EXPECT().GetNamespaceInfo(s.task.NamespaceID).Return( + "", false, err, + ).AnyTimes() + + s.Equal(err, s.task.Execute()) +} + +func (s *executableWorkflowStateTaskSuite) TestHandleErr() { + err := errors.New("OwO") + s.Equal(err, s.task.HandleErr(err)) + + err = serviceerror.NewNotFound("") + s.Equal(nil, s.task.HandleErr(err)) + + err = serviceerror.NewUnavailable("") + s.Equal(err, s.task.HandleErr(err)) +} diff --git a/service/history/replication/fx.go b/service/history/replication/fx.go index bddb32fedd2..a73911c00b8 100644 --- a/service/history/replication/fx.go +++ b/service/history/replication/fx.go @@ -25,17 +25,24 @@ package replication import ( + "context" + "go.uber.org/fx" + "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/client" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/log" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/persistence/serialization" + "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/configs" ) var Module = fx.Options( fx.Provide(ReplicationTaskFetcherFactoryProvider), fx.Provide(ReplicationTaskExecutorProvider), + fx.Provide(NDCHistoryResenderProvider), ) func ReplicationTaskFetcherFactoryProvider( @@ -63,3 +70,23 @@ func ReplicationTaskExecutorProvider() TaskExecutorProvider { ) } } + +func NDCHistoryResenderProvider( + config *configs.Config, + namespaceRegistry namespace.Registry, + clientBean client.Bean, + serializer serialization.Serializer, + logger log.Logger, +) xdc.NDCHistoryResender { + return xdc.NewNDCHistoryResender( + namespaceRegistry, + clientBean, + func(ctx context.Context, request *historyservice.ReplicateEventsV2Request) error { + _, err := clientBean.GetHistoryClient().ReplicateEventsV2(ctx, request) + return err + }, + serializer, + config.StandbyTaskReReplicationContextTimeout, + logger, + ) +} diff --git a/service/history/replication/task_executor.go b/service/history/replication/task_executor.go index 46d870b7aaf..bb851479c7b 100644 --- a/service/history/replication/task_executor.go +++ b/service/history/replication/task_executor.go @@ -43,7 +43,7 @@ import ( "go.temporal.io/server/common/namespace" serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/common/xdc" - deletemanager "go.temporal.io/server/service/history/deletemanager" + "go.temporal.io/server/service/history/deletemanager" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/workflow" wcache "go.temporal.io/server/service/history/workflow/cache"