From f0f931dee0d841fc7d507c326464082b69770c58 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Thu, 18 May 2023 13:04:51 -0700 Subject: [PATCH 1/2] Add functional test to cover conversion of scheduled speculative workflow task to normal --- tests/update_workflow_test.go | 188 +++++++++++++++++++++++++++++++++- 1 file changed, 187 insertions(+), 1 deletion(-) diff --git a/tests/update_workflow_test.go b/tests/update_workflow_test.go index 22d60294f3a..8ab59524c14 100644 --- a/tests/update_workflow_test.go +++ b/tests/update_workflow_test.go @@ -2114,7 +2114,7 @@ func (s *integrationSuite) TestUpdateWorkflow_FailWorkflowTask() { 8 WorkflowTaskFailed`, events) } -func (s *integrationSuite) TestUpdateWorkflow_ConvertSpeculativeWorkflowTaskToNormal_BecauseOfBufferedSignal() { +func (s *integrationSuite) TestUpdateWorkflow_ConvertStartedSpeculativeWorkflowTaskToNormal_BecauseOfBufferedSignal() { id := "integration-update-workflow-test-8" wt := "integration-update-workflow-test-8-type" tq := "integration-update-workflow-test-8-task-queue" @@ -2297,6 +2297,192 @@ func (s *integrationSuite) TestUpdateWorkflow_ConvertSpeculativeWorkflowTaskToNo 13 WorkflowExecutionCompleted`, events) } +func (s *integrationSuite) TestUpdateWorkflow_ConvertScheduledSpeculativeWorkflowTaskToNormal_BecauseOfSignal() { + id := "integration-update-workflow-test-csswtn" + wt := "integration-update-workflow-test-csswtn-type" + tq := "integration-update-workflow-test-csswtn-task-queue" + + workflowType := &commonpb.WorkflowType{Name: wt} + taskQueue := &taskqueuepb.TaskQueue{Name: tq} + + request := &workflowservice.StartWorkflowExecutionRequest{ + RequestId: uuid.New(), + Namespace: s.namespace, + WorkflowId: id, + WorkflowType: workflowType, + TaskQueue: taskQueue, + } + + startResp, err := s.engine.StartWorkflowExecution(NewContext(), request) + s.NoError(err) + + we := &commonpb.WorkflowExecution{ + WorkflowId: id, + RunId: startResp.GetRunId(), + } + + wtHandlerCalls := 0 + wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType, previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) { + wtHandlerCalls++ + switch wtHandlerCalls { + case 1: + // Completes first WT with update unrelated command. + return []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: strconv.Itoa(1), + ActivityType: &commonpb.ActivityType{Name: "activity_type_1"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: tq}, + ScheduleToCloseTimeout: timestamp.DurationPtr(10 * time.Hour), + }}, + }}, nil + case 2: + // Speculative WT was already converted to normal because of the signal. + s.EqualHistory(` + 1 WorkflowExecutionStarted + 2 WorkflowTaskScheduled + 3 WorkflowTaskStarted + 4 WorkflowTaskCompleted + 5 ActivityTaskScheduled + 6 WorkflowTaskScheduled + 7 WorkflowExecutionSignaled + 8 WorkflowTaskStarted`, history) + return nil, nil + case 3: + s.EqualHistory(` + 9 WorkflowTaskCompleted + 10 WorkflowTaskScheduled + 11 WorkflowTaskStarted`, history) + return []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{ + Result: payloads.EncodeString("done"), + }}, + }}, nil + default: + s.Failf("wtHandler called too many times", "wtHandler shouldn't be called %d times", wtHandlerCalls) + return nil, nil + } + } + + msgHandlerCalls := 0 + msgHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*protocolpb.Message, error) { + msgHandlerCalls++ + switch msgHandlerCalls { + case 1: + return nil, nil + case 2: + updRequestMsg := task.Messages[0] + updRequest := unmarshalAny[*updatepb.Request](s, updRequestMsg.GetBody()) + + s.Equal(payloads.EncodeString("update args"), updRequest.GetInput().GetArgs()) + s.Equal("update_handler", updRequest.GetInput().GetName()) + s.EqualValues(7, updRequestMsg.GetEventId()) + + // Update is rejected but corresponding speculative WT was already converted to normal, + // and will be in the history anyway. + return []*protocolpb.Message{ + { + Id: uuid.New(), + ProtocolInstanceId: updRequest.GetMeta().GetUpdateId(), + SequencingId: nil, + Body: marshalAny(s, &updatepb.Rejection{ + RejectedRequestMessageId: updRequestMsg.GetId(), + RejectedRequestSequencingEventId: updRequestMsg.GetEventId(), + RejectedRequest: updRequest, + Failure: &failurepb.Failure{ + Message: "update rejected", + FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{}}, + }, + }), + }, + }, nil + case 3: + return nil, nil + default: + s.Failf("msgHandler called too many times", "msgHandler shouldn't be called %d times", msgHandlerCalls) + return nil, nil + } + } + + poller := &TaskPoller{ + Engine: s.engine, + Namespace: s.namespace, + TaskQueue: taskQueue, + WorkflowTaskHandler: wtHandler, + MessageHandler: msgHandler, + Logger: s.Logger, + T: s.T(), + } + + // Start activity using existing workflow task. + _, err = poller.PollAndProcessWorkflowTask(true, false) + s.NoError(err) + + type UpdateResult struct { + Response *workflowservice.UpdateWorkflowExecutionResponse + Err error + } + updateResultCh := make(chan UpdateResult) + updateWorkflowFn := func() { + updateResponse, err1 := s.engine.UpdateWorkflowExecution(NewContext(), &workflowservice.UpdateWorkflowExecutionRequest{ + Namespace: s.namespace, + WorkflowExecution: we, + Request: &updatepb.Request{ + Meta: &updatepb.Meta{UpdateId: uuid.New()}, + Input: &updatepb.Input{ + Name: "update_handler", + Args: payloads.EncodeString("update args"), + }, + }, + }) + assert.NoError(s.T(), err1) + updateResultCh <- UpdateResult{Response: updateResponse, Err: err1} + } + go updateWorkflowFn() + time.Sleep(500 * time.Millisecond) // This is to make sure that update gets to the server and speculative WT was created. + + // Send signal which will NOT be buffered because speculative WT is not started yet (only scheduled). + // This will persist MS and speculative WT must be converted to normal. + err = s.sendSignal(s.namespace, we, "SignalName", payloads.EncodeString("signal_data"), "worker_identity") + s.NoError(err) + + // Process update in workflow. + _, updateResp, err := poller.PollAndProcessWorkflowTaskWithAttemptAndRetryAndForceNewWorkflowTask(false, false, false, false, 1, 5, true, nil) + s.NoError(err) + updateResult := <-updateResultCh + s.NoError(updateResult.Err) + s.Equal("update rejected", updateResult.Response.GetOutcome().GetFailure().GetMessage()) + s.EqualValues(0, updateResp.ResetHistoryEventId) + + // Complete workflow. + completeWorkflowResp, err := poller.HandlePartialWorkflowTask(updateResp.GetWorkflowTask(), true) + s.NoError(err) + s.NotNil(completeWorkflowResp) + s.Nil(completeWorkflowResp.GetWorkflowTask()) + s.EqualValues(0, completeWorkflowResp.ResetHistoryEventId) + + s.Equal(3, wtHandlerCalls) + s.Equal(3, msgHandlerCalls) + + events := s.getHistory(s.namespace, we) + + s.EqualHistoryEvents(` + 1 WorkflowExecutionStarted + 2 WorkflowTaskScheduled + 3 WorkflowTaskStarted + 4 WorkflowTaskCompleted + 5 ActivityTaskScheduled + 6 WorkflowTaskScheduled + 7 WorkflowExecutionSignaled + 8 WorkflowTaskStarted + 9 WorkflowTaskCompleted + 10 WorkflowTaskScheduled + 11 WorkflowTaskStarted + 12 WorkflowTaskCompleted + 13 WorkflowExecutionCompleted`, events) +} + func (s *integrationSuite) TestUpdateWorkflow_TimeoutSpeculativeWorkflowTask() { id := "integration-update-workflow-test-9" wt := "integration-update-workflow-test-9-type" From 63ca52763f4135abc185702b7daa6806ff9593c1 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Thu, 18 May 2023 18:30:18 -0700 Subject: [PATCH 2/2] Update comment --- service/history/workflow/mutable_state_impl.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 3ac41d01ba4..bcd6b2e712b 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4121,6 +4121,9 @@ func (ms *MutableStateImpl) CloseTransactionAsMutation( return nil, nil, err } + // It is important to convert speculative WT to normal before prepareEventsAndReplicationTasks, + // because prepareEventsAndReplicationTasks will move internal buffered events to the history, + // and WT related events (WTScheduled, in particular) need to go first. if err := ms.workflowTaskManager.convertSpeculativeWorkflowTaskToNormal(); err != nil { return nil, nil, err }