Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functional test to cover conversion of scheduled speculative workflow task to normal #4364

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4121,6 +4121,9 @@ func (ms *MutableStateImpl) CloseTransactionAsMutation(
return nil, nil, err
}

// It is important to convert speculative WT to normal before prepareEventsAndReplicationTasks,
// because prepareEventsAndReplicationTasks will move internal buffered events to the history,
// and WT related events (WTScheduled, in particular) need to go first.
if err := ms.workflowTaskManager.convertSpeculativeWorkflowTaskToNormal(); err != nil {
return nil, nil, err
}
Expand Down
188 changes: 187 additions & 1 deletion tests/update_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2114,7 +2114,7 @@ func (s *integrationSuite) TestUpdateWorkflow_FailWorkflowTask() {
8 WorkflowTaskFailed`, events)
}

func (s *integrationSuite) TestUpdateWorkflow_ConvertSpeculativeWorkflowTaskToNormal_BecauseOfBufferedSignal() {
func (s *integrationSuite) TestUpdateWorkflow_ConvertStartedSpeculativeWorkflowTaskToNormal_BecauseOfBufferedSignal() {
id := "integration-update-workflow-test-8"
wt := "integration-update-workflow-test-8-type"
tq := "integration-update-workflow-test-8-task-queue"
Expand Down Expand Up @@ -2297,6 +2297,192 @@ func (s *integrationSuite) TestUpdateWorkflow_ConvertSpeculativeWorkflowTaskToNo
13 WorkflowExecutionCompleted`, events)
}

func (s *integrationSuite) TestUpdateWorkflow_ConvertScheduledSpeculativeWorkflowTaskToNormal_BecauseOfSignal() {
id := "integration-update-workflow-test-csswtn"
wt := "integration-update-workflow-test-csswtn-type"
tq := "integration-update-workflow-test-csswtn-task-queue"

workflowType := &commonpb.WorkflowType{Name: wt}
taskQueue := &taskqueuepb.TaskQueue{Name: tq}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
}

startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
s.NoError(err)

we := &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: startResp.GetRunId(),
}

wtHandlerCalls := 0
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType, previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandlerCalls++
switch wtHandlerCalls {
case 1:
// Completes first WT with update unrelated command.
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: strconv.Itoa(1),
ActivityType: &commonpb.ActivityType{Name: "activity_type_1"},
TaskQueue: &taskqueuepb.TaskQueue{Name: tq},
ScheduleToCloseTimeout: timestamp.DurationPtr(10 * time.Hour),
}},
}}, nil
case 2:
// Speculative WT was already converted to normal because of the signal.
s.EqualHistory(`
1 WorkflowExecutionStarted
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 ActivityTaskScheduled
6 WorkflowTaskScheduled
7 WorkflowExecutionSignaled
8 WorkflowTaskStarted`, history)
return nil, nil
case 3:
s.EqualHistory(`
9 WorkflowTaskCompleted
10 WorkflowTaskScheduled
11 WorkflowTaskStarted`, history)
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("done"),
}},
}}, nil
default:
s.Failf("wtHandler called too many times", "wtHandler shouldn't be called %d times", wtHandlerCalls)
return nil, nil
}
}

msgHandlerCalls := 0
msgHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*protocolpb.Message, error) {
msgHandlerCalls++
switch msgHandlerCalls {
case 1:
return nil, nil
case 2:
updRequestMsg := task.Messages[0]
updRequest := unmarshalAny[*updatepb.Request](s, updRequestMsg.GetBody())

s.Equal(payloads.EncodeString("update args"), updRequest.GetInput().GetArgs())
s.Equal("update_handler", updRequest.GetInput().GetName())
s.EqualValues(7, updRequestMsg.GetEventId())

// Update is rejected but corresponding speculative WT was already converted to normal,
// and will be in the history anyway.
return []*protocolpb.Message{
{
Id: uuid.New(),
ProtocolInstanceId: updRequest.GetMeta().GetUpdateId(),
SequencingId: nil,
Body: marshalAny(s, &updatepb.Rejection{
RejectedRequestMessageId: updRequestMsg.GetId(),
RejectedRequestSequencingEventId: updRequestMsg.GetEventId(),
RejectedRequest: updRequest,
Failure: &failurepb.Failure{
Message: "update rejected",
FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{}},
},
}),
},
}, nil
case 3:
return nil, nil
default:
s.Failf("msgHandler called too many times", "msgHandler shouldn't be called %d times", msgHandlerCalls)
return nil, nil
}
}

poller := &TaskPoller{
Engine: s.engine,
Namespace: s.namespace,
TaskQueue: taskQueue,
WorkflowTaskHandler: wtHandler,
MessageHandler: msgHandler,
Logger: s.Logger,
T: s.T(),
}

// Start activity using existing workflow task.
_, err = poller.PollAndProcessWorkflowTask(true, false)
s.NoError(err)

type UpdateResult struct {
Response *workflowservice.UpdateWorkflowExecutionResponse
Err error
}
updateResultCh := make(chan UpdateResult)
updateWorkflowFn := func() {
updateResponse, err1 := s.engine.UpdateWorkflowExecution(NewContext(), &workflowservice.UpdateWorkflowExecutionRequest{
Namespace: s.namespace,
WorkflowExecution: we,
Request: &updatepb.Request{
Meta: &updatepb.Meta{UpdateId: uuid.New()},
Input: &updatepb.Input{
Name: "update_handler",
Args: payloads.EncodeString("update args"),
},
},
})
assert.NoError(s.T(), err1)
updateResultCh <- UpdateResult{Response: updateResponse, Err: err1}
}
go updateWorkflowFn()
time.Sleep(500 * time.Millisecond) // This is to make sure that update gets to the server and speculative WT was created.

// Send signal which will NOT be buffered because speculative WT is not started yet (only scheduled).
// This will persist MS and speculative WT must be converted to normal.
err = s.sendSignal(s.namespace, we, "SignalName", payloads.EncodeString("signal_data"), "worker_identity")
s.NoError(err)

// Process update in workflow.
_, updateResp, err := poller.PollAndProcessWorkflowTaskWithAttemptAndRetryAndForceNewWorkflowTask(false, false, false, false, 1, 5, true, nil)
s.NoError(err)
updateResult := <-updateResultCh
s.NoError(updateResult.Err)
s.Equal("update rejected", updateResult.Response.GetOutcome().GetFailure().GetMessage())
s.EqualValues(0, updateResp.ResetHistoryEventId)

// Complete workflow.
completeWorkflowResp, err := poller.HandlePartialWorkflowTask(updateResp.GetWorkflowTask(), true)
s.NoError(err)
s.NotNil(completeWorkflowResp)
s.Nil(completeWorkflowResp.GetWorkflowTask())
s.EqualValues(0, completeWorkflowResp.ResetHistoryEventId)

s.Equal(3, wtHandlerCalls)
s.Equal(3, msgHandlerCalls)

events := s.getHistory(s.namespace, we)

s.EqualHistoryEvents(`
1 WorkflowExecutionStarted
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 ActivityTaskScheduled
6 WorkflowTaskScheduled
7 WorkflowExecutionSignaled
8 WorkflowTaskStarted
9 WorkflowTaskCompleted
10 WorkflowTaskScheduled
11 WorkflowTaskStarted
12 WorkflowTaskCompleted
13 WorkflowExecutionCompleted`, events)
}

func (s *integrationSuite) TestUpdateWorkflow_TimeoutSpeculativeWorkflowTask() {
id := "integration-update-workflow-test-9"
wt := "integration-update-workflow-test-9-type"
Expand Down