Skip to content

Commit

Permalink
Add functional test to cover conversion of scheduled speculative work…
Browse files Browse the repository at this point in the history
…flow task to normal
  • Loading branch information
alexshtin committed May 18, 2023
1 parent 9a6edae commit 194a79d
Showing 1 changed file with 186 additions and 0 deletions.
186 changes: 186 additions & 0 deletions tests/update_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2297,6 +2297,192 @@ func (s *integrationSuite) TestUpdateWorkflow_ConvertSpeculativeWorkflowTaskToNo
13 WorkflowExecutionCompleted`, events)
}

func (s *integrationSuite) TestUpdateWorkflow_ConvertScheduledSpeculativeWorkflowTaskToNormal_BecauseOfSignal() {
id := "integration-update-workflow-test-csswtn"
wt := "integration-update-workflow-test-csswtn-type"
tq := "integration-update-workflow-test-csswtn-task-queue"

workflowType := &commonpb.WorkflowType{Name: wt}
taskQueue := &taskqueuepb.TaskQueue{Name: tq}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
}

startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
s.NoError(err)

we := &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: startResp.GetRunId(),
}

wtHandlerCalls := 0
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType, previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandlerCalls++
switch wtHandlerCalls {
case 1:
// Completes first WT with update unrelated command.
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: strconv.Itoa(1),
ActivityType: &commonpb.ActivityType{Name: "activity_type_1"},
TaskQueue: &taskqueuepb.TaskQueue{Name: tq},
ScheduleToCloseTimeout: timestamp.DurationPtr(10 * time.Hour),
}},
}}, nil
case 2:
// Speculative WT was already converted to normal because of the signal.
s.EqualHistory(`
1 WorkflowExecutionStarted
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 ActivityTaskScheduled
6 WorkflowTaskScheduled
7 WorkflowExecutionSignaled
8 WorkflowTaskStarted`, history)
return nil, nil
case 3:
s.EqualHistory(`
9 WorkflowTaskCompleted
10 WorkflowTaskScheduled
11 WorkflowTaskStarted`, history)
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("done"),
}},
}}, nil
default:
s.Failf("wtHandler called too many times", "wtHandler shouldn't be called %d times", wtHandlerCalls)
return nil, nil
}
}

msgHandlerCalls := 0
msgHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*protocolpb.Message, error) {
msgHandlerCalls++
switch msgHandlerCalls {
case 1:
return nil, nil
case 2:
updRequestMsg := task.Messages[0]
updRequest := unmarshalAny[*updatepb.Request](s, updRequestMsg.GetBody())

s.Equal(payloads.EncodeString("update args"), updRequest.GetInput().GetArgs())
s.Equal("update_handler", updRequest.GetInput().GetName())
s.EqualValues(7, updRequestMsg.GetEventId())

// Update is rejected but corresponding speculative WT was already converted to normal,
// and will be in the history anyway.
return []*protocolpb.Message{
{
Id: uuid.New(),
ProtocolInstanceId: updRequest.GetMeta().GetUpdateId(),
SequencingId: nil,
Body: marshalAny(s, &updatepb.Rejection{
RejectedRequestMessageId: updRequestMsg.GetId(),
RejectedRequestSequencingEventId: updRequestMsg.GetEventId(),
RejectedRequest: updRequest,
Failure: &failurepb.Failure{
Message: "update rejected",
FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{}},
},
}),
},
}, nil
case 3:
return nil, nil
default:
s.Failf("msgHandler called too many times", "msgHandler shouldn't be called %d times", msgHandlerCalls)
return nil, nil
}
}

poller := &TaskPoller{
Engine: s.engine,
Namespace: s.namespace,
TaskQueue: taskQueue,
WorkflowTaskHandler: wtHandler,
MessageHandler: msgHandler,
Logger: s.Logger,
T: s.T(),
}

// Start activity using existing workflow task.
_, err = poller.PollAndProcessWorkflowTask(true, false)
s.NoError(err)

type UpdateResult struct {
Response *workflowservice.UpdateWorkflowExecutionResponse
Err error
}
updateResultCh := make(chan UpdateResult)
updateWorkflowFn := func() {
updateResponse, err1 := s.engine.UpdateWorkflowExecution(NewContext(), &workflowservice.UpdateWorkflowExecutionRequest{
Namespace: s.namespace,
WorkflowExecution: we,
Request: &updatepb.Request{
Meta: &updatepb.Meta{UpdateId: uuid.New()},
Input: &updatepb.Input{
Name: "update_handler",
Args: payloads.EncodeString("update args"),
},
},
})
assert.NoError(s.T(), err1)
updateResultCh <- UpdateResult{Response: updateResponse, Err: err1}
}
go updateWorkflowFn()
time.Sleep(500 * time.Millisecond) // This is to make sure that update gets to the server and speculative WT was created.

// Send signal which will NOT be buffered because speculative WT is not started yet (only scheduled).
// This will persist MS and speculative WT must be converted to normal.
err = s.sendSignal(s.namespace, we, "SignalName", payloads.EncodeString("signal_data"), "worker_identity")
s.NoError(err)

// Process update in workflow.
_, updateResp, err := poller.PollAndProcessWorkflowTaskWithAttemptAndRetryAndForceNewWorkflowTask(false, false, false, false, 1, 5, true, nil)
s.NoError(err)
updateResult := <-updateResultCh
s.NoError(updateResult.Err)
s.Equal("update rejected", updateResult.Response.GetOutcome().GetFailure().GetMessage())
s.EqualValues(0, updateResp.ResetHistoryEventId)

// Complete workflow.
completeWorkflowResp, err := poller.HandlePartialWorkflowTask(updateResp.GetWorkflowTask(), true)
s.NoError(err)
s.NotNil(completeWorkflowResp)
s.Nil(completeWorkflowResp.GetWorkflowTask())
s.EqualValues(0, completeWorkflowResp.ResetHistoryEventId)

s.Equal(3, wtHandlerCalls)
s.Equal(3, msgHandlerCalls)

events := s.getHistory(s.namespace, we)

s.EqualHistoryEvents(`
1 WorkflowExecutionStarted
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 ActivityTaskScheduled
6 WorkflowTaskScheduled
7 WorkflowExecutionSignaled
8 WorkflowTaskStarted
9 WorkflowTaskCompleted
10 WorkflowTaskScheduled
11 WorkflowTaskStarted
12 WorkflowTaskCompleted
13 WorkflowExecutionCompleted`, events)
}

func (s *integrationSuite) TestUpdateWorkflow_TimeoutSpeculativeWorkflowTask() {
id := "integration-update-workflow-test-9"
wt := "integration-update-workflow-test-9-type"
Expand Down

0 comments on commit 194a79d

Please sign in to comment.