diff --git a/service/history/ndc/history_replicator.go b/service/history/ndc/history_replicator.go index 8fc3d58f6d92..d9f10abe93e0 100644 --- a/service/history/ndc/history_replicator.go +++ b/service/history/ndc/history_replicator.go @@ -303,6 +303,46 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState( } }() + // Handle existing workflows + ms, err := wfCtx.LoadMutableState(ctx) + switch err.(type) { + case *serviceerror.NotFound: + // no-op, continue to replicate workflow state + case nil: + // workflow exists, do resend if version histories are not match. + localVersionHistory, err := versionhistory.GetCurrentVersionHistory(ms.GetExecutionInfo().GetVersionHistories()) + if err != nil { + return err + } + localHistoryLastItem, err := versionhistory.GetLastVersionHistoryItem(localVersionHistory) + if err != nil { + return err + } + incomingVersionHistory, err := versionhistory.GetCurrentVersionHistory(request.GetWorkflowState().GetExecutionInfo().GetVersionHistories()) + if err != nil { + return err + } + incomingHistoryLastItem, err := versionhistory.GetLastVersionHistoryItem(incomingVersionHistory) + if err != nil { + return err + } + if !versionhistory.IsEqualVersionHistoryItem(localHistoryLastItem, incomingHistoryLastItem) { + return serviceerrors.NewRetryReplication( + "Failed to sync workflow state due to version history mismatch", + namespaceID.String(), + wid, + rid, + localHistoryLastItem.GetEventId(), + localHistoryLastItem.GetVersion(), + common.EmptyEventID, + common.EmptyVersion, + ) + } + return nil + default: + return err + } + currentVersionHistory, err := versionhistory.GetCurrentVersionHistory(executionInfo.VersionHistories) if err != nil { return err diff --git a/service/history/ndc/history_replicator_test.go b/service/history/ndc/history_replicator_test.go index eb2147aff9c6..6d0038dd9e65 100644 --- a/service/history/ndc/history_replicator_test.go +++ b/service/history/ndc/history_replicator_test.go @@ -32,6 +32,7 @@ import ( historypb "go.temporal.io/api/history/v1" "go.temporal.io/server/common" + serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/service/history/events" "github.com/golang/mock/gomock" @@ -179,6 +180,7 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_BrandNew() { we, workflow.LockPriorityLow, ).Return(mockWeCtx, wcache.NoopReleaseFn, nil) + mockWeCtx.EXPECT().LoadMutableState(gomock.Any()).Return(nil, serviceerror.NewNotFound("ms not found")) mockWeCtx.EXPECT().CreateWorkflowExecution( gomock.Any(), persistence.CreateWorkflowModeBrandNew, @@ -281,6 +283,7 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_Ancestors() { we, workflow.LockPriorityLow, ).Return(mockWeCtx, wcache.NoopReleaseFn, nil) + mockWeCtx.EXPECT().LoadMutableState(gomock.Any()).Return(nil, serviceerror.NewNotFound("ms not found")) mockWeCtx.EXPECT().CreateWorkflowExecution( gomock.Any(), persistence.CreateWorkflowModeBrandNew, @@ -398,3 +401,82 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_NoClosedWorkflow_Error( var internalErr *serviceerror.Internal s.ErrorAs(err, &internalErr) } + +func (s *historyReplicatorSuite) Test_ApplyWorkflowState_ExistWorkflow_Resend() { + namespaceID := uuid.New() + branchInfo := &persistencespb.HistoryBranch{ + TreeId: uuid.New(), + BranchId: uuid.New(), + Ancestors: nil, + } + historyBranch, err := serialization.HistoryBranchToBlob(branchInfo) + s.NoError(err) + completionEventBatchId := int64(5) + nextEventID := int64(7) + request := &historyservice.ReplicateWorkflowStateRequest{ + WorkflowState: &persistencespb.WorkflowMutableState{ + ExecutionInfo: &persistencespb.WorkflowExecutionInfo{ + WorkflowId: s.workflowID, + NamespaceId: namespaceID, + VersionHistories: &historyspb.VersionHistories{ + CurrentVersionHistoryIndex: 0, + Histories: []*historyspb.VersionHistory{ + { + BranchToken: historyBranch.GetData(), + Items: []*historyspb.VersionHistoryItem{ + { + EventId: int64(100), + Version: int64(100), + }, + }, + }, + }, + }, + CompletionEventBatchId: completionEventBatchId, + }, + ExecutionState: &persistencespb.WorkflowExecutionState{ + RunId: s.runID, + State: enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, + Status: enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED, + }, + NextEventId: nextEventID, + }, + RemoteCluster: "test", + } + we := commonpb.WorkflowExecution{ + WorkflowId: s.workflowID, + RunId: s.runID, + } + mockWeCtx := workflow.NewMockContext(s.controller) + mockMutableState := workflow.NewMockMutableState(s.controller) + s.mockWorkflowCache.EXPECT().GetOrCreateWorkflowExecution( + gomock.Any(), + namespace.ID(namespaceID), + we, + workflow.LockPriorityLow, + ).Return(mockWeCtx, wcache.NoopReleaseFn, nil) + mockWeCtx.EXPECT().LoadMutableState(gomock.Any()).Return(mockMutableState, nil) + mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ + VersionHistories: &historyspb.VersionHistories{ + CurrentVersionHistoryIndex: 0, + Histories: []*historyspb.VersionHistory{ + { + Items: []*historyspb.VersionHistoryItem{ + { + EventId: int64(1), + Version: int64(1), + }, + }, + }, + }, + }, + }) + err = s.historyReplicator.ApplyWorkflowState(context.Background(), request) + var expectedErr *serviceerrors.RetryReplication + s.ErrorAs(err, &expectedErr) + s.Equal(namespaceID, expectedErr.NamespaceId) + s.Equal(s.workflowID, expectedErr.WorkflowId) + s.Equal(s.runID, expectedErr.RunId) + s.Equal(int64(1), expectedErr.StartEventId) + s.Equal(int64(1), expectedErr.StartEventVersion) +} diff --git a/service/history/replication/executable_workflow_state_task.go b/service/history/replication/executable_workflow_state_task.go index e937be0d6bff..5e5357f3d1ef 100644 --- a/service/history/replication/executable_workflow_state_task.go +++ b/service/history/replication/executable_workflow_state_task.go @@ -38,6 +38,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" + serviceerrors "go.temporal.io/server/common/serviceerror" ctasks "go.temporal.io/server/common/tasks" ) @@ -128,10 +129,25 @@ func (e *ExecutableWorkflowStateTask) Execute() error { } func (e *ExecutableWorkflowStateTask) HandleErr(err error) error { - // no resend is required - switch err.(type) { + switch retryErr := err.(type) { case nil, *serviceerror.NotFound: return nil + case *serviceerrors.RetryReplication: + namespaceName, _, nsError := e.GetNamespaceInfo(e.NamespaceID) + if nsError != nil { + return err + } + ctx, cancel := newTaskContext(namespaceName) + defer cancel() + + if resendErr := e.Resend( + ctx, + e.sourceClusterName, + retryErr, + ); resendErr != nil { + return err + } + return e.Execute() default: return err } diff --git a/service/history/replication/executable_workflow_state_task_test.go b/service/history/replication/executable_workflow_state_task_test.go index 8d78ae5549df..b68f7674fb31 100644 --- a/service/history/replication/executable_workflow_state_task_test.go +++ b/service/history/replication/executable_workflow_state_task_test.go @@ -34,7 +34,6 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "go.temporal.io/api/serviceerror" enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/api/historyservice/v1" @@ -46,6 +45,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" + serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/shard" ) @@ -183,14 +183,49 @@ func (s *executableWorkflowStateTaskSuite) TestExecute_Err() { s.Equal(err, s.task.Execute()) } -func (s *executableWorkflowStateTaskSuite) TestHandleErr() { - err := errors.New("OwO") - s.Equal(err, s.task.HandleErr(err)) +func (s *executableWorkflowStateTaskSuite) TestHandleErr_Resend_Success() { + s.executableTask.EXPECT().TerminalState().Return(false) + s.executableTask.EXPECT().GetNamespaceInfo(s.task.NamespaceID).Return( + uuid.NewString(), true, nil, + ).AnyTimes() + shardContext := shard.NewMockContext(s.controller) + engine := shard.NewMockEngine(s.controller) + s.shardController.EXPECT().GetShardByNamespaceWorkflow( + namespace.ID(s.task.NamespaceID), + s.task.WorkflowID, + ).Return(shardContext, nil).AnyTimes() + shardContext.EXPECT().GetEngine(gomock.Any()).Return(engine, nil).AnyTimes() + err := serviceerrors.NewRetryReplication( + "", + s.task.NamespaceID, + s.task.WorkflowID, + s.task.RunID, + rand.Int63(), + rand.Int63(), + rand.Int63(), + rand.Int63(), + ) + s.executableTask.EXPECT().Resend(gomock.Any(), s.sourceClusterName, err).Return(nil) + engine.EXPECT().ReplicateWorkflowState(gomock.Any(), gomock.Any()).Return(nil) + s.NoError(s.task.HandleErr(err)) +} - err = serviceerror.NewNotFound("") - s.Equal(nil, s.task.HandleErr(err)) +func (s *executableWorkflowStateTaskSuite) TestHandleErr_Resend_Error() { + s.executableTask.EXPECT().GetNamespaceInfo(s.task.NamespaceID).Return( + uuid.NewString(), true, nil, + ).AnyTimes() + err := serviceerrors.NewRetryReplication( + "", + s.task.NamespaceID, + s.task.WorkflowID, + s.task.RunID, + rand.Int63(), + rand.Int63(), + rand.Int63(), + rand.Int63(), + ) + s.executableTask.EXPECT().Resend(gomock.Any(), s.sourceClusterName, err).Return(errors.New("OwO")) - err = serviceerror.NewUnavailable("") s.Equal(err, s.task.HandleErr(err)) } diff --git a/service/history/replication/task_executor.go b/service/history/replication/task_executor.go index 611b1beebcad..a4d1c023a9e1 100644 --- a/service/history/replication/task_executor.go +++ b/service/history/replication/task_executor.go @@ -320,12 +320,41 @@ func (e *taskExecutorImpl) handleSyncWorkflowStateTask( // This might be extra cost if the workflow belongs to local shard. // Add a wrapper of the history client to call history engine directly if it becomes an issue. - _, err = e.shardContext.GetHistoryClient().ReplicateWorkflowState(ctx, &historyservice.ReplicateWorkflowStateRequest{ + request := &historyservice.ReplicateWorkflowStateRequest{ NamespaceId: namespaceID.String(), WorkflowState: attr.GetWorkflowState(), RemoteCluster: e.remoteCluster, - }) - return err + } + _, err = e.shardContext.GetHistoryClient().ReplicateWorkflowState(ctx, request) + switch retryErr := err.(type) { + case nil: + return nil + case *serviceerrors.RetryReplication: + resendErr := e.nDCHistoryResender.SendSingleWorkflowHistory( + ctx, + e.remoteCluster, + namespace.ID(retryErr.NamespaceId), + retryErr.WorkflowId, + retryErr.RunId, + retryErr.StartEventId, + retryErr.StartEventVersion, + retryErr.EndEventId, + retryErr.EndEventVersion, + ) + switch resendErr.(type) { + case *serviceerror.NotFound: + // workflow is not found in source cluster, cleanup workflow in target cluster + return e.cleanupWorkflowExecution(ctx, retryErr.NamespaceId, retryErr.WorkflowId, retryErr.RunId) + case nil: + _, err = e.shardContext.GetHistoryClient().ReplicateWorkflowState(ctx, request) + return err + default: + e.logger.Error("error resend history for replicate workflow state", tag.Error(resendErr)) + return err + } + default: + return err + } } func (e *taskExecutorImpl) filterTask( diff --git a/tests/ndc/ndc_integration_test.go b/tests/ndc/ndc_integration_test.go index 37ae175ea8b8..f17718b0a080 100644 --- a/tests/ndc/ndc_integration_test.go +++ b/tests/ndc/ndc_integration_test.go @@ -32,6 +32,7 @@ import ( "testing" "time" + "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/serialization" @@ -402,6 +403,80 @@ func (s *nDCIntegrationTestSuite) TestEmptyVersionAndNonEmptyVersion() { ) } +func (s *nDCIntegrationTestSuite) TestReplicateWorkflowState_PartialReplicated() { + + s.setupRemoteFrontendClients() + workflowID := "replicate-workflow-state-partially-replicated" + uuid.New() + runID := uuid.New() + workflowType := "event-generator-workflow-type" + taskqueue := "event-generator-taskQueue" + + // active has initial version 1 + historyClient := s.active.GetHistoryClient() + var historyBatch []*historypb.History + // standby initial failover version 2 + s.generator = test.InitializeHistoryEventGenerator(s.namespace, s.namespaceID, 12) + + for s.generator.HasNextVertex() { + events := s.generator.GetNextVertices() + historyEvents := &historypb.History{} + for _, event := range events { + historyEvents.Events = append(historyEvents.Events, event.GetData().(*historypb.HistoryEvent)) + } + historyBatch = append(historyBatch, historyEvents) + } + + partialHistoryBatch := historyBatch[:1] + partialVersionHistory := s.eventBatchesToVersionHistory(nil, partialHistoryBatch) + versionHistory := s.eventBatchesToVersionHistory(nil, historyBatch) + workflowState := &persistence.WorkflowMutableState{ + ExecutionState: &persistence.WorkflowExecutionState{ + State: enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, + Status: enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED, + RunId: runID, + }, + ExecutionInfo: &persistence.WorkflowExecutionInfo{ + NamespaceId: s.namespaceID.String(), + WorkflowId: workflowID, + VersionHistories: &historyspb.VersionHistories{ + CurrentVersionHistoryIndex: 0, + Histories: []*historyspb.VersionHistory{versionHistory}, + }, + }, + } + s.applyEvents( + workflowID, + runID, + workflowType, + taskqueue, + partialVersionHistory, + partialHistoryBatch, + historyClient, + ) + _, err := historyClient.ReplicateWorkflowState(context.Background(), &historyservice.ReplicateWorkflowStateRequest{ + WorkflowState: workflowState, + RemoteCluster: "standby", + NamespaceId: s.namespaceID.String(), + }) + s.Error(err) + + s.applyEvents( + workflowID, + runID, + workflowType, + taskqueue, + versionHistory, + historyBatch, + historyClient, + ) + _, err = historyClient.ReplicateWorkflowState(context.Background(), &historyservice.ReplicateWorkflowStateRequest{ + WorkflowState: workflowState, + RemoteCluster: "standby", + NamespaceId: s.namespaceID.String(), + }) + s.NoError(err) +} + func (s *nDCIntegrationTestSuite) TestHandcraftedMultipleBranches() { s.setupRemoteFrontendClients()