Skip to content

Commit

Permalink
Test sync update on sticky task queue (temporalio#4200)
Browse files Browse the repository at this point in the history
Test speculative workflow task with sticky enabled
  • Loading branch information
yiminc authored and samanbarghi committed May 2, 2023
1 parent dcb5936 commit f3c04ba
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 1 deletion.
2 changes: 1 addition & 1 deletion service/history/api/updateworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func Invoke(
if createNewWorkflowTask {
// This will try not to add an event but will create speculative WT in mutable state.
// Task generation will be skipped if WT is created as speculative.
wt, err := ms.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE)
wt, err := ms.AddWorkflowTaskScheduledEvent(true, enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE)
if err != nil {
return nil, err
}
Expand Down
193 changes: 193 additions & 0 deletions tests/update_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_AcceptComplete()

workflowType := &commonpb.WorkflowType{Name: wt}
taskQueue := &taskqueuepb.TaskQueue{Name: tq}
stickyQueue := &taskqueuepb.TaskQueue{Name: tq + "-sticky"}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Expand Down Expand Up @@ -165,6 +166,7 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_AcceptComplete()
Engine: s.engine,
Namespace: s.namespace,
TaskQueue: taskQueue,
StickyTaskQueue: stickyQueue,
WorkflowTaskHandler: wtHandler,
MessageHandler: msgHandler,
Logger: s.Logger,
Expand Down Expand Up @@ -633,6 +635,197 @@ func (s *integrationSuite) TestUpdateWorkflow_NewWorkflowTask_AcceptComplete() {
14 WorkflowExecutionCompleted`, events)
}

func (s *integrationSuite) TestUpdateWorkflow_NewWorkflowTask_AcceptComplete_Sticky() {
id := "integration-update-workflow-test-2"
wt := "integration-update-workflow-test-2-type"
tq := "integration-update-workflow-test-2-task-queue"

workflowType := &commonpb.WorkflowType{Name: wt}
taskQueue := &taskqueuepb.TaskQueue{Name: tq}
stickyQueue := &taskqueuepb.TaskQueue{Name: tq + "-sticky"}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
}

startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
s.NoError(err)

we := &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: startResp.GetRunId(),
}

wtHandlerCalls := 0
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType, previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandlerCalls++
switch wtHandlerCalls {
case 1:
// Completes first WT with update unrelated command.
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: strconv.Itoa(1),
ActivityType: &commonpb.ActivityType{Name: "activity_type_1"},
TaskQueue: &taskqueuepb.TaskQueue{Name: tq},
ScheduleToCloseTimeout: timestamp.DurationPtr(10 * time.Hour),
}},
}}, nil
case 2:
// Speculative WT, with update.Request message.
// We make sure that this task contains partial history because it is sticky enabled
s.EqualHistory(`
4 WorkflowTaskCompleted
5 ActivityTaskScheduled
6 WorkflowTaskScheduled
7 WorkflowTaskStarted`, history)
return nil, nil
case 3:
s.EqualHistory(`
8 WorkflowTaskCompleted
9 WorkflowExecutionUpdateAccepted
10 WorkflowExecutionUpdateCompleted
11 WorkflowTaskScheduled
12 WorkflowTaskStarted`, history)
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("done"),
}},
}}, nil
default:
s.Failf("wtHandler called too many times", "wtHandler shouldn't be called %d times", wtHandlerCalls)
return nil, nil
}
}

msgHandlerCalls := 0
msgHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*protocolpb.Message, error) {
msgHandlerCalls++
switch msgHandlerCalls {
case 1:
return nil, nil
case 2:
updRequestMsg := task.Messages[0]
updRequest := unmarshalAny[*updatepb.Request](s, updRequestMsg.GetBody())

s.Equal(payloads.EncodeString("update args"), updRequest.GetInput().GetArgs())
s.Equal("update_handler", updRequest.GetInput().GetName())
s.EqualValues(6, updRequestMsg.GetEventId())

return []*protocolpb.Message{
{
Id: uuid.New(),
ProtocolInstanceId: updRequest.GetMeta().GetUpdateId(),
SequencingId: nil,
Body: marshalAny(s, &updatepb.Acceptance{
AcceptedRequestMessageId: updRequestMsg.GetId(),
AcceptedRequestSequencingEventId: updRequestMsg.GetEventId(),
AcceptedRequest: updRequest,
}),
},
{
Id: uuid.New(),
ProtocolInstanceId: updRequest.GetMeta().GetUpdateId(),
SequencingId: nil,
Body: marshalAny(s, &updatepb.Response{
Meta: updRequest.GetMeta(),
Outcome: &updatepb.Outcome{
Value: &updatepb.Outcome_Success{
Success: payloads.EncodeString("update success"),
},
},
}),
},
}, nil
case 3:
return nil, nil
default:
s.Failf("msgHandler called too many times", "msgHandler shouldn't be called %d times", msgHandlerCalls)
return nil, nil
}
}

poller := &TaskPoller{
Engine: s.engine,
Namespace: s.namespace,
TaskQueue: taskQueue,
StickyTaskQueue: stickyQueue,
WorkflowTaskHandler: wtHandler,
MessageHandler: msgHandler,
Logger: s.Logger,
T: s.T(),
}

// poll from regular task queue, but respond with sticky enabled response, next wft will be sticky
_, err = poller.PollAndProcessWorkflowTaskWithAttempt(false, false, false, true, 1)
s.NoError(err)

type UpdateResult struct {
Response *workflowservice.UpdateWorkflowExecutionResponse
Err error
}
updateResultCh := make(chan UpdateResult)
updateWorkflowFn := func() {
time.Sleep(500 * time.Millisecond) // This is to make sure that sticky poller reach to server first

updateResponse, err1 := s.engine.UpdateWorkflowExecution(NewContext(), &workflowservice.UpdateWorkflowExecutionRequest{
Namespace: s.namespace,
WorkflowExecution: we,
Request: &updatepb.Request{
Meta: &updatepb.Meta{UpdateId: uuid.New()},
Input: &updatepb.Input{
Name: "update_handler",
Args: payloads.EncodeString("update args"),
},
},
})
s.NoError(err1)
updateResultCh <- UpdateResult{Response: updateResponse, Err: err1}
}
go updateWorkflowFn()

// Process update in workflow task (it is sticky).
_, updateResp, err := poller.PollAndProcessWorkflowTaskWithAttemptAndRetryAndForceNewWorkflowTask(false, false, true, false, 1, 5, true, nil)
s.NoError(err)
updateResult := <-updateResultCh
s.NoError(updateResult.Err)
s.EqualValues(payloads.EncodeString("update success"), updateResult.Response.GetOutcome().GetSuccess())
s.EqualValues(0, updateResp.ResetHistoryEventId)

// Complete workflow.
completeWorkflowResp, err := poller.HandlePartialWorkflowTask(updateResp.GetWorkflowTask(), true)
s.NoError(err)
s.NotNil(completeWorkflowResp)
s.Nil(completeWorkflowResp.GetWorkflowTask())
s.EqualValues(0, completeWorkflowResp.ResetHistoryEventId)

s.Equal(3, wtHandlerCalls)
s.Equal(3, msgHandlerCalls)

events := s.getHistory(s.namespace, we)

s.EqualHistoryEvents(`
1 WorkflowExecutionStarted
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 ActivityTaskScheduled
6 WorkflowTaskScheduled
7 WorkflowTaskStarted
8 WorkflowTaskCompleted
9 WorkflowExecutionUpdateAccepted
10 WorkflowExecutionUpdateCompleted
11 WorkflowTaskScheduled
12 WorkflowTaskStarted
13 WorkflowTaskCompleted
14 WorkflowExecutionCompleted`, events)
}

func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_Reject() {
id := "integration-update-workflow-test-3"
wt := "integration-update-workflow-test-3-type"
Expand Down

0 comments on commit f3c04ba

Please sign in to comment.