Skip to content

Commit

Permalink
Fix child WF ID generation (#1803)
Browse files Browse the repository at this point in the history
  • Loading branch information
gow authored Feb 19, 2025
1 parent 8f38795 commit 7e3d821
Show file tree
Hide file tree
Showing 10 changed files with 839 additions and 6 deletions.
8 changes: 6 additions & 2 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
params ExecuteWorkflowParams, callback ResultHandler, startedHandler func(r WorkflowExecution, e error),
) {
if params.WorkflowID == "" {
params.WorkflowID = wc.workflowInfo.WorkflowExecution.RunID + "_" + wc.GenerateSequenceID()
params.WorkflowID = wc.workflowInfo.currentRunID + "_" + wc.GenerateSequenceID()
}
memo, err := getWorkflowMemo(params.Memo, wc.dataConverter)
if err != nil {
Expand Down Expand Up @@ -1220,7 +1220,11 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
case enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT:
// No Operation
case enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED:
// No Operation
// update the childWorkflowIDSeed if the workflow was reset at this point.
attr := event.GetWorkflowTaskFailedEventAttributes()
if attr.GetCause() == enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW {
weh.workflowInfo.currentRunID = attr.GetNewRunId()
}
case enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
// No Operation
case enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
Expand Down
7 changes: 5 additions & 2 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,7 @@ OrderEvents:
break OrderEvents
}
case enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED,
enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT,
enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED:
enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT:
// Skip
default:
if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
Expand Down Expand Up @@ -744,6 +743,10 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice.
Memo: attributes.Memo,
SearchAttributes: attributes.SearchAttributes,
RetryPolicy: convertFromPBRetryPolicy(attributes.RetryPolicy),
// Use the original execution run ID from the start event as the initial seed.
// Original execution run ID stays the same for the entire chain of workflow resets.
// This helps us keep child workflow IDs consistent up until a reset-point is encountered.
currentRunID: attributes.GetOriginalExecutionRunId(),
}

return newWorkflowExecutionContext(workflowInfo, wth), nil
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_task_handlers_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (s *PollLayerInterfacesTestSuite) TestGetNextCommands() {
createTestEventWorkflowTaskStarted(3),
{
EventId: 4,
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED,
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT,
},
{
EventId: 5,
Expand Down Expand Up @@ -278,7 +278,7 @@ func (s *PollLayerInterfacesTestSuite) TestMessageCommands() {
createTestEventWorkflowTaskStarted(3),
{
EventId: 4,
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED,
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT,
},
createTestEventWorkflowTaskScheduled(5, &historypb.WorkflowTaskScheduledEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}),
createTestEventWorkflowTaskStarted(6),
Expand Down
2 changes: 2 additions & 0 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,8 @@ type WorkflowInfo struct {
continueAsNewSuggested bool
currentHistorySize int
currentHistoryLength int
// currentRunID is the current run ID of the workflow task, deterministic over reset
currentRunID string
}

// UpdateInfo information about a currently running update
Expand Down
137 changes: 137 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2040,6 +2040,143 @@ func (ts *IntegrationTestSuite) TestResetWorkflowExecution() {
ts.Equal(originalResult, newResult)
}

// TestResetWorkflowExecutionWithChildren tests the behavior of child workflow ID generation when a workflow with children is reset.
// It repeatedly resets the workflow at different points in its execution and verifies that the child workflow IDs are generated correctly.
func (ts *IntegrationTestSuite) TestResetWorkflowExecutionWithChildren() {
wfID := "reset-workflow-with-children"
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()

// Start a workflow with 3 children.
options := ts.startWorkflowOptions(wfID)
run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.WorkflowWithChildren)
ts.NoError(err)
var originalResult string
err = run.Get(ctx, &originalResult)
ts.NoError(err)

// save child init childIDs for later comparison.
childIDs := ts.getChildWFIDsFromHistory(ctx, wfID, run.GetRunID())
ts.Len(childIDs, 3)
child1IDBeforeReset := childIDs[0]
child2IDBeforeReset := childIDs[1]
child3IDBeforeReset := childIDs[2]

resetRequest := &workflowservice.ResetWorkflowExecutionRequest{
Namespace: ts.config.Namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: wfID,
RunId: run.GetRunID(),
},
Reason: "integration test",
}
// (reset #1) - resetting the workflow execution before both child workflows are started.
resetRequest.RequestId = "reset-request-1"
resetRequest.WorkflowTaskFinishEventId = 4
resp, err := ts.client.ResetWorkflowExecution(context.Background(), resetRequest)
ts.NoError(err)
// Wait for the new run to complete.
var resultAfterReset1 string
err = ts.client.GetWorkflow(context.Background(), wfID, resp.GetRunId()).Get(ctx, &resultAfterReset1)
ts.NoError(err)
ts.Equal(originalResult, resultAfterReset1)

childIDsAfterReset1 := ts.getChildWFIDsFromHistory(ctx, wfID, resp.GetRunId())
ts.Len(childIDsAfterReset1, 3)
// All 3 child workflow IDs should be different after reset.
ts.NotEqual(child1IDBeforeReset, childIDsAfterReset1[0])
ts.NotEqual(child2IDBeforeReset, childIDsAfterReset1[1])
ts.NotEqual(child3IDBeforeReset, childIDsAfterReset1[2])

// (reset #2) - resetting the new workflow execution after child-1 but before child-2
resetRequest.RequestId = "reset-request-2"
resetRequest.WorkflowExecution.RunId = resp.GetRunId()
resetRequest.WorkflowTaskFinishEventId = ts.getWorkflowTaskFinishEventIdAfterChild(ctx, wfID, resp.GetRunId(), childIDsAfterReset1[0])
resp, err = ts.client.ResetWorkflowExecution(context.Background(), resetRequest)
ts.NoError(err)
// Wait for the new run to complete.
var resultAfterReset2 string
err = ts.client.GetWorkflow(context.Background(), wfID, resp.GetRunId()).Get(ctx, &resultAfterReset2)
ts.NoError(err)
ts.Equal(originalResult, resultAfterReset2)

childIDsAfterReset2 := ts.getChildWFIDsFromHistory(ctx, wfID, resp.GetRunId())
ts.Len(childIDsAfterReset2, 3)
ts.Equal(childIDsAfterReset1[0], childIDsAfterReset2[0]) // child-1 should be the same as before reset.
ts.NotEqual(childIDsAfterReset1[1], childIDsAfterReset2[1]) // child-2 should be different after reset.
ts.NotEqual(childIDsAfterReset1[2], childIDsAfterReset2[2]) // Child-3 should be different after reset.

// (reset #3) - resetting the new workflow execution after child-2 but before child-3
resetRequest.RequestId = "reset-request-3"
resetRequest.WorkflowExecution.RunId = resp.GetRunId()
resetRequest.WorkflowTaskFinishEventId = ts.getWorkflowTaskFinishEventIdAfterChild(ctx, wfID, resp.GetRunId(), childIDsAfterReset2[1])
resp, err = ts.client.ResetWorkflowExecution(context.Background(), resetRequest)
ts.NoError(err)
// Wait for the new run to complete.
var resultAfterReset3 string
err = ts.client.GetWorkflow(context.Background(), wfID, resp.GetRunId()).Get(ctx, &resultAfterReset3)
ts.NoError(err)
ts.Equal(originalResult, resultAfterReset3)

childIDsAfterReset3 := ts.getChildWFIDsFromHistory(ctx, wfID, resp.GetRunId())
ts.Len(childIDsAfterReset3, 3)
// child-1 & child-2 workflow IDs should be the same as before reset. Child-3 should be different.
ts.Equal(childIDsAfterReset2[0], childIDsAfterReset3[0])
ts.Equal(childIDsAfterReset2[1], childIDsAfterReset3[1])
ts.NotEqual(childIDsAfterReset2[2], childIDsAfterReset3[2])

// (reset #3) - resetting the new workflow execution one last time after child-3
// This should successfully replay all child events and not change the child workflow IDs from previous run.
resetRequest.RequestId = "reset-request-4"
resetRequest.WorkflowExecution.RunId = resp.GetRunId()
resetRequest.WorkflowTaskFinishEventId = ts.getWorkflowTaskFinishEventIdAfterChild(ctx, wfID, resp.GetRunId(), childIDsAfterReset3[2])
resp, err = ts.client.ResetWorkflowExecution(context.Background(), resetRequest)
ts.NoError(err)
childIDsFinal := ts.getChildWFIDsFromHistory(ctx, wfID, resp.GetRunId())
ts.Len(childIDsFinal, 3)
ts.Equal(childIDsAfterReset3[0], childIDsFinal[0])
ts.Equal(childIDsAfterReset3[1], childIDsFinal[1])
ts.Equal(childIDsAfterReset3[2], childIDsFinal[2])
}

func (ts *IntegrationTestSuite) getChildWFIDsFromHistory(ctx context.Context, wfID string, runID string) []string {
iter := ts.client.GetWorkflowHistory(ctx, wfID, runID, false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
var childIDs []string
for iter.HasNext() {
event, err1 := iter.Next()
if err1 != nil {
break
}
if event.GetEventType() == enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED {
childIDs = append(childIDs, event.GetStartChildWorkflowExecutionInitiatedEventAttributes().GetWorkflowId())
}
}
return childIDs
}

func (ts *IntegrationTestSuite) getWorkflowTaskFinishEventIdAfterChild(ctx context.Context, wfID string, runID string, childID string) int64 {
iter := ts.client.GetWorkflowHistory(ctx, wfID, runID, false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
childFound := false
for iter.HasNext() {
event, err := iter.Next()
if err != nil {
break
}
if event.GetEventType() == enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED {
if event.GetChildWorkflowExecutionCompletedEventAttributes().GetWorkflowExecution().GetWorkflowId() == childID {
childFound = true
}
}
if !childFound {
continue
}
if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
return event.GetEventId()
}
}
return 0
}

func (ts *IntegrationTestSuite) TestResetWorkflowExecutionWithUpdate() {
ctx := context.Background()
wfId := "reset-workflow-execution-with-update"
Expand Down
18 changes: 18 additions & 0 deletions test/replaytests/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,24 @@ func (s *replayTestSuite) TestPartialReplayNonCommandEvent() {
require.NoError(s.T(), err)
}

func (s *replayTestSuite) TestResetWorkflowBeforeChildInit() {
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflow(ResetWorkflowWithChild)
// Verify we can replay workflow history containing a reset before StartChildWorkflowExecutionInitiated & ChildWorkflowExecutionCompleted events.
err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "reset-workflow-before-child-init.json")
s.NoError(err)
require.NoError(s.T(), err)
}

func (s *replayTestSuite) TestResetWorkflowAfterChildComplete() {
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflow(ResetWorkflowWithChild)
// Verify we can replay workflow history containing a reset event after StartChildWorkflowExecutionInitiated & ChildWorkflowExecutionCompleted events.
err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "reset-workflow-after-child-complete.json")
s.NoError(err)
require.NoError(s.T(), err)
}

type captureConverter struct {
converter.DataConverter
toPayloads []interface{}
Expand Down
Loading

0 comments on commit 7e3d821

Please sign in to comment.