Skip to content

Commit

Permalink
Handle replicate workflow state with exist workflow data (#4617)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
Handle replicate workflow state with exist workflow data.

<!-- Tell your future self why have you made these changes -->
**Why?**
During migration interruption, some workflow data might be partially
migrated. This case is not handle in replicate workflow state.

<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
Update current unit test. 

A manual test will be perform: 
1. Connect two cluster.
2. Start a long running workflow.
3. Disconnect two cluster.
4. Terminate the old workflow and start a new long running workflow.
5. Force replication open workflow.
6. Force replication closed workflow.

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**


<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
Yes.
  • Loading branch information
yux0 authored Jul 13, 2023
1 parent 1b82336 commit 6e811dd
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 12 deletions.
40 changes: 40 additions & 0 deletions service/history/ndc/history_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,46 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState(
}
}()

// Handle existing workflows
ms, err := wfCtx.LoadMutableState(ctx)
switch err.(type) {
case *serviceerror.NotFound:
// no-op, continue to replicate workflow state
case nil:
// workflow exists, do resend if version histories are not match.
localVersionHistory, err := versionhistory.GetCurrentVersionHistory(ms.GetExecutionInfo().GetVersionHistories())
if err != nil {
return err
}
localHistoryLastItem, err := versionhistory.GetLastVersionHistoryItem(localVersionHistory)
if err != nil {
return err
}
incomingVersionHistory, err := versionhistory.GetCurrentVersionHistory(request.GetWorkflowState().GetExecutionInfo().GetVersionHistories())
if err != nil {
return err
}
incomingHistoryLastItem, err := versionhistory.GetLastVersionHistoryItem(incomingVersionHistory)
if err != nil {
return err
}
if !versionhistory.IsEqualVersionHistoryItem(localHistoryLastItem, incomingHistoryLastItem) {
return serviceerrors.NewRetryReplication(
"Failed to sync workflow state due to version history mismatch",
namespaceID.String(),
wid,
rid,
localHistoryLastItem.GetEventId(),
localHistoryLastItem.GetVersion(),
common.EmptyEventID,
common.EmptyVersion,
)
}
return nil
default:
return err
}

currentVersionHistory, err := versionhistory.GetCurrentVersionHistory(executionInfo.VersionHistories)
if err != nil {
return err
Expand Down
82 changes: 82 additions & 0 deletions service/history/ndc/history_replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
historypb "go.temporal.io/api/history/v1"

"go.temporal.io/server/common"
serviceerrors "go.temporal.io/server/common/serviceerror"
"go.temporal.io/server/service/history/events"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -179,6 +180,7 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_BrandNew() {
we,
workflow.LockPriorityLow,
).Return(mockWeCtx, wcache.NoopReleaseFn, nil)
mockWeCtx.EXPECT().LoadMutableState(gomock.Any()).Return(nil, serviceerror.NewNotFound("ms not found"))
mockWeCtx.EXPECT().CreateWorkflowExecution(
gomock.Any(),
persistence.CreateWorkflowModeBrandNew,
Expand Down Expand Up @@ -281,6 +283,7 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_Ancestors() {
we,
workflow.LockPriorityLow,
).Return(mockWeCtx, wcache.NoopReleaseFn, nil)
mockWeCtx.EXPECT().LoadMutableState(gomock.Any()).Return(nil, serviceerror.NewNotFound("ms not found"))
mockWeCtx.EXPECT().CreateWorkflowExecution(
gomock.Any(),
persistence.CreateWorkflowModeBrandNew,
Expand Down Expand Up @@ -398,3 +401,82 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_NoClosedWorkflow_Error(
var internalErr *serviceerror.Internal
s.ErrorAs(err, &internalErr)
}

func (s *historyReplicatorSuite) Test_ApplyWorkflowState_ExistWorkflow_Resend() {
namespaceID := uuid.New()
branchInfo := &persistencespb.HistoryBranch{
TreeId: uuid.New(),
BranchId: uuid.New(),
Ancestors: nil,
}
historyBranch, err := serialization.HistoryBranchToBlob(branchInfo)
s.NoError(err)
completionEventBatchId := int64(5)
nextEventID := int64(7)
request := &historyservice.ReplicateWorkflowStateRequest{
WorkflowState: &persistencespb.WorkflowMutableState{
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
WorkflowId: s.workflowID,
NamespaceId: namespaceID,
VersionHistories: &historyspb.VersionHistories{
CurrentVersionHistoryIndex: 0,
Histories: []*historyspb.VersionHistory{
{
BranchToken: historyBranch.GetData(),
Items: []*historyspb.VersionHistoryItem{
{
EventId: int64(100),
Version: int64(100),
},
},
},
},
},
CompletionEventBatchId: completionEventBatchId,
},
ExecutionState: &persistencespb.WorkflowExecutionState{
RunId: s.runID,
State: enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED,
Status: enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED,
},
NextEventId: nextEventID,
},
RemoteCluster: "test",
}
we := commonpb.WorkflowExecution{
WorkflowId: s.workflowID,
RunId: s.runID,
}
mockWeCtx := workflow.NewMockContext(s.controller)
mockMutableState := workflow.NewMockMutableState(s.controller)
s.mockWorkflowCache.EXPECT().GetOrCreateWorkflowExecution(
gomock.Any(),
namespace.ID(namespaceID),
we,
workflow.LockPriorityLow,
).Return(mockWeCtx, wcache.NoopReleaseFn, nil)
mockWeCtx.EXPECT().LoadMutableState(gomock.Any()).Return(mockMutableState, nil)
mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
VersionHistories: &historyspb.VersionHistories{
CurrentVersionHistoryIndex: 0,
Histories: []*historyspb.VersionHistory{
{
Items: []*historyspb.VersionHistoryItem{
{
EventId: int64(1),
Version: int64(1),
},
},
},
},
},
})
err = s.historyReplicator.ApplyWorkflowState(context.Background(), request)
var expectedErr *serviceerrors.RetryReplication
s.ErrorAs(err, &expectedErr)
s.Equal(namespaceID, expectedErr.NamespaceId)
s.Equal(s.workflowID, expectedErr.WorkflowId)
s.Equal(s.runID, expectedErr.RunId)
s.Equal(int64(1), expectedErr.StartEventId)
s.Equal(int64(1), expectedErr.StartEventVersion)
}
20 changes: 18 additions & 2 deletions service/history/replication/executable_workflow_state_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
serviceerrors "go.temporal.io/server/common/serviceerror"
ctasks "go.temporal.io/server/common/tasks"
)

Expand Down Expand Up @@ -128,10 +129,25 @@ func (e *ExecutableWorkflowStateTask) Execute() error {
}

func (e *ExecutableWorkflowStateTask) HandleErr(err error) error {
// no resend is required
switch err.(type) {
switch retryErr := err.(type) {
case nil, *serviceerror.NotFound:
return nil
case *serviceerrors.RetryReplication:
namespaceName, _, nsError := e.GetNamespaceInfo(e.NamespaceID)
if nsError != nil {
return err
}
ctx, cancel := newTaskContext(namespaceName)
defer cancel()

if resendErr := e.Resend(
ctx,
e.sourceClusterName,
retryErr,
); resendErr != nil {
return err
}
return e.Execute()
default:
return err
}
Expand Down
49 changes: 42 additions & 7 deletions service/history/replication/executable_workflow_state_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.temporal.io/api/serviceerror"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
Expand All @@ -46,6 +45,7 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
serviceerrors "go.temporal.io/server/common/serviceerror"
"go.temporal.io/server/common/xdc"
"go.temporal.io/server/service/history/shard"
)
Expand Down Expand Up @@ -183,14 +183,49 @@ func (s *executableWorkflowStateTaskSuite) TestExecute_Err() {
s.Equal(err, s.task.Execute())
}

func (s *executableWorkflowStateTaskSuite) TestHandleErr() {
err := errors.New("OwO")
s.Equal(err, s.task.HandleErr(err))
func (s *executableWorkflowStateTaskSuite) TestHandleErr_Resend_Success() {
s.executableTask.EXPECT().TerminalState().Return(false)
s.executableTask.EXPECT().GetNamespaceInfo(s.task.NamespaceID).Return(
uuid.NewString(), true, nil,
).AnyTimes()
shardContext := shard.NewMockContext(s.controller)
engine := shard.NewMockEngine(s.controller)
s.shardController.EXPECT().GetShardByNamespaceWorkflow(
namespace.ID(s.task.NamespaceID),
s.task.WorkflowID,
).Return(shardContext, nil).AnyTimes()
shardContext.EXPECT().GetEngine(gomock.Any()).Return(engine, nil).AnyTimes()
err := serviceerrors.NewRetryReplication(
"",
s.task.NamespaceID,
s.task.WorkflowID,
s.task.RunID,
rand.Int63(),
rand.Int63(),
rand.Int63(),
rand.Int63(),
)
s.executableTask.EXPECT().Resend(gomock.Any(), s.sourceClusterName, err).Return(nil)
engine.EXPECT().ReplicateWorkflowState(gomock.Any(), gomock.Any()).Return(nil)
s.NoError(s.task.HandleErr(err))
}

err = serviceerror.NewNotFound("")
s.Equal(nil, s.task.HandleErr(err))
func (s *executableWorkflowStateTaskSuite) TestHandleErr_Resend_Error() {
s.executableTask.EXPECT().GetNamespaceInfo(s.task.NamespaceID).Return(
uuid.NewString(), true, nil,
).AnyTimes()
err := serviceerrors.NewRetryReplication(
"",
s.task.NamespaceID,
s.task.WorkflowID,
s.task.RunID,
rand.Int63(),
rand.Int63(),
rand.Int63(),
rand.Int63(),
)
s.executableTask.EXPECT().Resend(gomock.Any(), s.sourceClusterName, err).Return(errors.New("OwO"))

err = serviceerror.NewUnavailable("")
s.Equal(err, s.task.HandleErr(err))
}

Expand Down
35 changes: 32 additions & 3 deletions service/history/replication/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,41 @@ func (e *taskExecutorImpl) handleSyncWorkflowStateTask(

// This might be extra cost if the workflow belongs to local shard.
// Add a wrapper of the history client to call history engine directly if it becomes an issue.
_, err = e.shardContext.GetHistoryClient().ReplicateWorkflowState(ctx, &historyservice.ReplicateWorkflowStateRequest{
request := &historyservice.ReplicateWorkflowStateRequest{
NamespaceId: namespaceID.String(),
WorkflowState: attr.GetWorkflowState(),
RemoteCluster: e.remoteCluster,
})
return err
}
_, err = e.shardContext.GetHistoryClient().ReplicateWorkflowState(ctx, request)
switch retryErr := err.(type) {
case nil:
return nil
case *serviceerrors.RetryReplication:
resendErr := e.nDCHistoryResender.SendSingleWorkflowHistory(
ctx,
e.remoteCluster,
namespace.ID(retryErr.NamespaceId),
retryErr.WorkflowId,
retryErr.RunId,
retryErr.StartEventId,
retryErr.StartEventVersion,
retryErr.EndEventId,
retryErr.EndEventVersion,
)
switch resendErr.(type) {
case *serviceerror.NotFound:
// workflow is not found in source cluster, cleanup workflow in target cluster
return e.cleanupWorkflowExecution(ctx, retryErr.NamespaceId, retryErr.WorkflowId, retryErr.RunId)
case nil:
_, err = e.shardContext.GetHistoryClient().ReplicateWorkflowState(ctx, request)
return err
default:
e.logger.Error("error resend history for replicate workflow state", tag.Error(resendErr))
return err
}
default:
return err
}
}

func (e *taskExecutorImpl) filterTask(
Expand Down
75 changes: 75 additions & 0 deletions tests/ndc/ndc_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"testing"
"time"

"go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/serialization"

Expand Down Expand Up @@ -402,6 +403,80 @@ func (s *nDCIntegrationTestSuite) TestEmptyVersionAndNonEmptyVersion() {
)
}

func (s *nDCIntegrationTestSuite) TestReplicateWorkflowState_PartialReplicated() {

s.setupRemoteFrontendClients()
workflowID := "replicate-workflow-state-partially-replicated" + uuid.New()
runID := uuid.New()
workflowType := "event-generator-workflow-type"
taskqueue := "event-generator-taskQueue"

// active has initial version 1
historyClient := s.active.GetHistoryClient()
var historyBatch []*historypb.History
// standby initial failover version 2
s.generator = test.InitializeHistoryEventGenerator(s.namespace, s.namespaceID, 12)

for s.generator.HasNextVertex() {
events := s.generator.GetNextVertices()
historyEvents := &historypb.History{}
for _, event := range events {
historyEvents.Events = append(historyEvents.Events, event.GetData().(*historypb.HistoryEvent))
}
historyBatch = append(historyBatch, historyEvents)
}

partialHistoryBatch := historyBatch[:1]
partialVersionHistory := s.eventBatchesToVersionHistory(nil, partialHistoryBatch)
versionHistory := s.eventBatchesToVersionHistory(nil, historyBatch)
workflowState := &persistence.WorkflowMutableState{
ExecutionState: &persistence.WorkflowExecutionState{
State: enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED,
Status: enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED,
RunId: runID,
},
ExecutionInfo: &persistence.WorkflowExecutionInfo{
NamespaceId: s.namespaceID.String(),
WorkflowId: workflowID,
VersionHistories: &historyspb.VersionHistories{
CurrentVersionHistoryIndex: 0,
Histories: []*historyspb.VersionHistory{versionHistory},
},
},
}
s.applyEvents(
workflowID,
runID,
workflowType,
taskqueue,
partialVersionHistory,
partialHistoryBatch,
historyClient,
)
_, err := historyClient.ReplicateWorkflowState(context.Background(), &historyservice.ReplicateWorkflowStateRequest{
WorkflowState: workflowState,
RemoteCluster: "standby",
NamespaceId: s.namespaceID.String(),
})
s.Error(err)

s.applyEvents(
workflowID,
runID,
workflowType,
taskqueue,
versionHistory,
historyBatch,
historyClient,
)
_, err = historyClient.ReplicateWorkflowState(context.Background(), &historyservice.ReplicateWorkflowStateRequest{
WorkflowState: workflowState,
RemoteCluster: "standby",
NamespaceId: s.namespaceID.String(),
})
s.NoError(err)
}

func (s *nDCIntegrationTestSuite) TestHandcraftedMultipleBranches() {

s.setupRemoteFrontendClients()
Expand Down

0 comments on commit 6e811dd

Please sign in to comment.