diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 6df7a4a2c3e..e6ddce94b6a 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -3940,6 +3940,10 @@ func (ms *MutableStateImpl) CloseTransactionAsMutation( return nil, nil, err } + if err := ms.workflowTaskManager.convertSpeculativeWorkflowTaskToNormal(); err != nil { + return nil, nil, err + } + workflowEventsSeq, bufferEvents, clearBuffer, err := ms.prepareEventsAndReplicationTasks(transactionPolicy) if err != nil { return nil, nil, err @@ -4017,6 +4021,10 @@ func (ms *MutableStateImpl) CloseTransactionAsSnapshot( return nil, nil, err } + if err := ms.workflowTaskManager.convertSpeculativeWorkflowTaskToNormal(); err != nil { + return nil, nil, err + } + workflowEventsSeq, bufferEvents, _, err := ms.prepareEventsAndReplicationTasks(transactionPolicy) if err != nil { return nil, nil, err diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 1c8ee4896cc..5ace7b21547 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -705,6 +705,7 @@ func (s *mutableStateSuite) buildWorkflowMutableState() *persistencespb.Workflow WorkflowTaskStartedEventId: 102, WorkflowTaskTimeout: timestamp.DurationFromSeconds(100), WorkflowTaskAttempt: 1, + WorkflowTaskType: enumsspb.WORKFLOW_TASK_TYPE_NORMAL, VersionHistories: &historyspb.VersionHistories{ Histories: []*historyspb.VersionHistory{ { @@ -919,3 +920,65 @@ func (s *mutableStateSuite) TestTotalEntitiesCount() { s.Equal(int64(1), mutation.ExecutionInfo.SignalExternalCount) s.Equal(int64(1), mutation.ExecutionInfo.SignalCount) } + +func (s *mutableStateSuite) TestSpeculativeWorkflowTaskNotPersisted() { + testCases := []struct { + name string + enableBufferedEvents bool + closeTxFunc func(ms *MutableStateImpl) (*persistencespb.WorkflowExecutionInfo, error) + }{ + { + name: "CloseTransactionAsSnapshot", + closeTxFunc: func(ms *MutableStateImpl) (*persistencespb.WorkflowExecutionInfo, error) { + snapshot, _, err := ms.CloseTransactionAsSnapshot(TransactionPolicyPassive) + if err != nil { + return nil, err + } + return snapshot.ExecutionInfo, err + }, + }, + { + name: "CloseTransactionAsMutation", + enableBufferedEvents: true, + closeTxFunc: func(ms *MutableStateImpl) (*persistencespb.WorkflowExecutionInfo, error) { + mutation, _, err := ms.CloseTransactionAsMutation(TransactionPolicyPassive) + if err != nil { + return nil, err + } + return mutation.ExecutionInfo, err + }, + }, + } + + for _, tc := range testCases { + s.T().Run(tc.name, func(t *testing.T) { + dbState := s.buildWorkflowMutableState() + if !tc.enableBufferedEvents { + dbState.BufferedEvents = nil + } + + var err error + s.mutableState, err = newMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, tests.LocalNamespaceEntry, dbState, 123) + s.NoError(err) + + s.mutableState.executionInfo.WorkflowTaskScheduledEventId = s.mutableState.GetNextEventID() + s.mutableState.executionInfo.WorkflowTaskStartedEventId = s.mutableState.GetNextEventID() + 1 + + // Normal WT is persisted as is. + execInfo, err := tc.closeTxFunc(s.mutableState) + s.Nil(err) + s.Equal(enumsspb.WORKFLOW_TASK_TYPE_NORMAL, execInfo.WorkflowTaskType) + s.NotEqual(common.EmptyEventID, execInfo.WorkflowTaskScheduledEventId) + s.NotEqual(common.EmptyEventID, execInfo.WorkflowTaskStartedEventId) + + s.mutableState.executionInfo.WorkflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE + + // Speculative WT is converted to normal. + execInfo, err = tc.closeTxFunc(s.mutableState) + s.Nil(err) + s.Equal(enumsspb.WORKFLOW_TASK_TYPE_NORMAL, execInfo.WorkflowTaskType) + s.NotEqual(common.EmptyEventID, execInfo.WorkflowTaskScheduledEventId) + s.NotEqual(common.EmptyEventID, execInfo.WorkflowTaskStartedEventId) + }) + } +} diff --git a/service/history/workflow/mutable_state_rebuilder.go b/service/history/workflow/mutable_state_rebuilder.go index c4e729ab07e..7581b8988eb 100644 --- a/service/history/workflow/mutable_state_rebuilder.go +++ b/service/history/workflow/mutable_state_rebuilder.go @@ -235,6 +235,7 @@ func (b *MutableStateRebuilderImpl) applyEvents( // NOTE: at the beginning of the loop, stickyness is cleared if err := taskGenerator.GenerateScheduleWorkflowTaskTasks( workflowTask.ScheduledEventID, + false, ); err != nil { return nil, err } @@ -287,6 +288,7 @@ func (b *MutableStateRebuilderImpl) applyEvents( // NOTE: at the beginning of the loop, stickyness is cleared if err := taskGenerator.GenerateScheduleWorkflowTaskTasks( workflowTask.ScheduledEventID, + false, ); err != nil { return nil, err } @@ -309,6 +311,7 @@ func (b *MutableStateRebuilderImpl) applyEvents( // NOTE: at the beginning of the loop, stickyness is cleared if err := taskGenerator.GenerateScheduleWorkflowTaskTasks( workflowTask.ScheduledEventID, + false, ); err != nil { return nil, err } diff --git a/service/history/workflow/mutable_state_rebuilder_test.go b/service/history/workflow/mutable_state_rebuilder_test.go index a3ab339e5f9..bb63553dcaf 100644 --- a/service/history/workflow/mutable_state_rebuilder_test.go +++ b/service/history/workflow/mutable_state_rebuilder_test.go @@ -519,6 +519,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA ).Return(nil) s.mockTaskGeneratorForNew.EXPECT().GenerateScheduleWorkflowTaskTasks( newRunWorkflowTaskEvent.GetEventId(), + false, ).Return(nil) s.mockTaskGeneratorForNew.EXPECT().GenerateActivityTimerTasks().Return(nil) s.mockTaskGeneratorForNew.EXPECT().GenerateUserTimerTasks().Return(nil) @@ -764,6 +765,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowTaskScheduled() { s.mockUpdateVersion(event) s.mockTaskGenerator.EXPECT().GenerateScheduleWorkflowTaskTasks( wt.ScheduledEventID, + false, ).Return(nil) s.mockMutableState.EXPECT().ClearStickyness() @@ -858,6 +860,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowTaskTimedOut() { s.mockUpdateVersion(event) s.mockTaskGenerator.EXPECT().GenerateScheduleWorkflowTaskTasks( newScheduledEventID, + false, ).Return(nil) s.mockMutableState.EXPECT().ClearStickyness() @@ -902,6 +905,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowTaskFailed() { s.mockUpdateVersion(event) s.mockTaskGenerator.EXPECT().GenerateScheduleWorkflowTaskTasks( newScheduledEventID, + false, ).Return(nil) s.mockMutableState.EXPECT().ClearStickyness() diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index 93572d501e9..7d73c99e6bb 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -71,6 +71,7 @@ type ( ) error GenerateScheduleWorkflowTaskTasks( workflowTaskScheduledEventID int64, + generateTimeoutTaskOnly bool, ) error GenerateStartWorkflowTaskTasks( workflowTaskScheduledEventID int64, @@ -332,6 +333,7 @@ func (r *TaskGeneratorImpl) GenerateRecordWorkflowStartedTasks( func (r *TaskGeneratorImpl) GenerateScheduleWorkflowTaskTasks( workflowTaskScheduledEventID int64, + generateTimeoutTaskOnly bool, // Only generate SCHEDULE_TO_START timeout timer task, but not a transfer task which push WT to matching. ) error { workflowTask := r.mutableState.GetWorkflowTaskByID( @@ -341,14 +343,6 @@ func (r *TaskGeneratorImpl) GenerateScheduleWorkflowTaskTasks( return serviceerror.NewInternal(fmt.Sprintf("it could be a bug, cannot get pending workflow task: %v", workflowTaskScheduledEventID)) } - r.mutableState.AddTasks(&tasks.WorkflowTask{ - // TaskID, VisibilityTimestamp is set by shard - WorkflowKey: r.mutableState.GetWorkflowKey(), - TaskQueue: workflowTask.TaskQueue.GetName(), - ScheduledEventID: workflowTask.ScheduledEventID, - Version: workflowTask.Version, - }) - if r.mutableState.IsStickyTaskQueueEnabled() { scheduledTime := timestamp.TimeValue(workflowTask.ScheduledTime) scheduleToStartTimeout := timestamp.DurationValue(r.mutableState.GetExecutionInfo().StickyScheduleToStartTimeout) @@ -364,6 +358,18 @@ func (r *TaskGeneratorImpl) GenerateScheduleWorkflowTaskTasks( }) } + if generateTimeoutTaskOnly { + return nil + } + + r.mutableState.AddTasks(&tasks.WorkflowTask{ + // TaskID, VisibilityTimestamp is set by shard + WorkflowKey: r.mutableState.GetWorkflowKey(), + TaskQueue: workflowTask.TaskQueue.GetName(), + ScheduledEventID: workflowTask.ScheduledEventID, + Version: workflowTask.Version, + }) + return nil } diff --git a/service/history/workflow/task_generator_mock.go b/service/history/workflow/task_generator_mock.go index 9af984d5df4..cea22af279a 100644 --- a/service/history/workflow/task_generator_mock.go +++ b/service/history/workflow/task_generator_mock.go @@ -217,17 +217,17 @@ func (mr *MockTaskGeneratorMockRecorder) GenerateRequestCancelExternalTasks(even } // GenerateScheduleWorkflowTaskTasks mocks base method. -func (m *MockTaskGenerator) GenerateScheduleWorkflowTaskTasks(workflowTaskScheduledEventID int64) error { +func (m *MockTaskGenerator) GenerateScheduleWorkflowTaskTasks(workflowTaskScheduledEventID int64, generateTimeoutTaskOnly bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GenerateScheduleWorkflowTaskTasks", workflowTaskScheduledEventID) + ret := m.ctrl.Call(m, "GenerateScheduleWorkflowTaskTasks", workflowTaskScheduledEventID, generateTimeoutTaskOnly) ret0, _ := ret[0].(error) return ret0 } // GenerateScheduleWorkflowTaskTasks indicates an expected call of GenerateScheduleWorkflowTaskTasks. -func (mr *MockTaskGeneratorMockRecorder) GenerateScheduleWorkflowTaskTasks(workflowTaskScheduledEventID interface{}) *gomock.Call { +func (mr *MockTaskGeneratorMockRecorder) GenerateScheduleWorkflowTaskTasks(workflowTaskScheduledEventID, generateTimeoutTaskOnly interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateScheduleWorkflowTaskTasks", reflect.TypeOf((*MockTaskGenerator)(nil).GenerateScheduleWorkflowTaskTasks), workflowTaskScheduledEventID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateScheduleWorkflowTaskTasks", reflect.TypeOf((*MockTaskGenerator)(nil).GenerateScheduleWorkflowTaskTasks), workflowTaskScheduledEventID, generateTimeoutTaskOnly) } // GenerateSignalExternalTasks mocks base method. diff --git a/service/history/workflow/task_refresher.go b/service/history/workflow/task_refresher.go index 884bc70d413..5b9bb7b9519 100644 --- a/service/history/workflow/task_refresher.go +++ b/service/history/workflow/task_refresher.go @@ -268,6 +268,7 @@ func (r *TaskRefresherImpl) refreshWorkflowTaskTasks( // workflowTask only scheduled return taskGenerator.GenerateScheduleWorkflowTaskTasks( workflowTask.ScheduledEventID, + false, ) } diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index 134d89824b5..91afe7c6366 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -42,7 +42,6 @@ import ( enumsspb "go.temporal.io/server/api/enums/v1" historyspb "go.temporal.io/server/api/history/v1" - "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/log/tag" @@ -345,6 +344,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( if !bypassTaskGeneration && workflowTaskType != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { if err := m.ms.taskGenerator.GenerateScheduleWorkflowTaskTasks( scheduledEventID, + false, ); err != nil { return nil, err } @@ -362,7 +362,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEvent( return m.AddWorkflowTaskScheduledEventAsHeartbeat(bypassTaskGeneration, timestamp.TimePtr(m.ms.timeSource.Now()), workflowTaskType) } -// AddFirstWorkflowTaskScheduled adds the first workflow task scehduled event unless it should be delayed as indicated +// AddFirstWorkflowTaskScheduled adds the first workflow task scheduled event unless it should be delayed as indicated // by the startEvent's FirstWorkflowTaskBackoff. // If bypassTaskGeneration is specified, a transfer task will not be created. // Returns the workflow task's scheduled event ID if a task was scheduled, 0 otherwise. @@ -936,3 +936,64 @@ func (m *workflowTaskStateMachine) getHistorySizeInfo() (bool, int64) { suggestContinueAsNew := historySize >= sizeLimit || historyCount >= countLimit return suggestContinueAsNew, historySize } + +func (m *workflowTaskStateMachine) convertSpeculativeWorkflowTaskToNormal() error { + if m.ms.executionInfo.WorkflowTaskType != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { + return nil + } + + // Workflow task can't be persisted as Speculative, because when it is completed, + // it gets deleted from memory only but not from the database. + // If execution info in mutable state has speculative workflow task, then + // convert it to normal workflow task before persisting. + m.ms.executionInfo.WorkflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL + + wt := m.getWorkflowTaskInfo() + + // TODO (alex-update): cancel in-memory timer for this speculative WT. + + scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent( + wt.TaskQueue, + wt.WorkflowTaskTimeout, + wt.Attempt, + timestamp.TimeValue(wt.ScheduledTime), + ) + + if scheduledEvent.EventId != wt.ScheduledEventID { + return serviceerror.NewInternal(fmt.Sprintf("it could be a bug, scheduled event Id: %d for normal workflow task doesn't match the one from speculative workflow task: %d", scheduledEvent.EventId, wt.ScheduledEventID)) + } + + if wt.StartedEventID != common.EmptyEventID { + // If WT is has started then started event is written to the history and + // timeout timer task (for START_TO_CLOSE timeout) is created. + + _ = m.ms.hBuilder.AddWorkflowTaskStartedEvent( + scheduledEvent.EventId, + wt.RequestID, + "", // identity is not stored in the mutable state. + timestamp.TimeValue(wt.StartedTime), + wt.SuggestContinueAsNew, + wt.HistorySizeBytes, + ) + m.ms.hBuilder.FlushAndCreateNewBatch() + + if err := m.ms.taskGenerator.GenerateStartWorkflowTaskTasks( + scheduledEvent.EventId, + ); err != nil { + return err + } + } else { + // If WT was only scheduled but not started yet, then SCHEDULE_TO_START timeout timer task is created only if using sticky task queue. + // Normal task queue doesn't have a timeout. + if m.ms.IsStickyTaskQueueEnabled() { + if err := m.ms.taskGenerator.GenerateScheduleWorkflowTaskTasks( + scheduledEvent.EventId, + true, // Only generate SCHEDULE_TO_START timeout timer task, but not a transfer task which push WT to matching because WT was already pushed to matching. + ); err != nil { + return err + } + } + } + + return nil +} diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index e09b418cf7e..fe881fa079f 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -250,6 +250,10 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskStarted( return nil, err } + if workflowTask.Type == enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { + updateAction.Noop = true + } + workflowScheduleToStartLatency := workflowTask.StartedTime.Sub(*workflowTask.ScheduledTime) namespaceName := namespaceEntry.Name() taskQueue := workflowTask.TaskQueue @@ -582,40 +586,38 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( } else if ms.IsWorkflowExecutionRunning() && hasPendingUpdates { newWorkflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE } - createNewWorkflowTask := newWorkflowTaskType != enumsspb.WORKFLOW_TASK_TYPE_UNSPECIFIED - - var newWorkflowTaskScheduledEventID int64 - if createNewWorkflowTask { - // TODO (alex-update): Need to support case when ReturnNewWorkflowTask=false and WT.Type=Speculative. - // In this case WT needs to be added directly to matching. - // Current implementation will create normal WT. - bypassTaskGeneration := request.GetReturnNewWorkflowTask() && wtFailedCause == nil - if !bypassTaskGeneration { - // If task generation can't be bypassed workflow task must be of Normal type because Speculative workflow task always skip task generation. - newWorkflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL - } - var newWorkflowTask *workflow.WorkflowTaskInfo - var err error + bypassTaskGeneration := request.GetReturnNewWorkflowTask() && wtFailedCause == nil + // TODO (alex-update): Need to support case when ReturnNewWorkflowTask=false and WT.Type=Speculative. + // In this case WT needs to be added directly to matching. + // Current implementation will create normal WT. + if newWorkflowTaskType == enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE && !bypassTaskGeneration { + // If task generation can't be bypassed workflow task must be of Normal type because Speculative workflow task always skip task generation. + newWorkflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL + } + + var newWorkflowTask *workflow.WorkflowTaskInfo + // Speculative workflow task will be created after mutable state is persisted. + if newWorkflowTaskType == enumsspb.WORKFLOW_TASK_TYPE_NORMAL { + var newWTErr error if workflowTaskHeartbeating && !workflowTaskHeartbeatTimeout { - newWorkflowTask, err = ms.AddWorkflowTaskScheduledEventAsHeartbeat( + newWorkflowTask, newWTErr = ms.AddWorkflowTaskScheduledEventAsHeartbeat( bypassTaskGeneration, currentWorkflowTask.OriginalScheduledTime, enumsspb.WORKFLOW_TASK_TYPE_NORMAL, // Heartbeat workflow task is always of Normal type. ) } else { - newWorkflowTask, err = ms.AddWorkflowTaskScheduledEvent(bypassTaskGeneration, newWorkflowTaskType) + newWorkflowTask, newWTErr = ms.AddWorkflowTaskScheduledEvent(bypassTaskGeneration, newWorkflowTaskType) } - if err != nil { - return nil, err + if newWTErr != nil { + return nil, newWTErr } - newWorkflowTaskScheduledEventID = newWorkflowTask.ScheduledEventID // skip transfer task for workflow task if request asking to return new workflow task if bypassTaskGeneration { // start the new workflow task if request asked to do so // TODO: replace the poll request - _, _, err := ms.AddWorkflowTaskStartedEvent( + _, newWorkflowTask, err = ms.AddWorkflowTaskStartedEvent( newWorkflowTask.ScheduledEventID, "request-from-RespondWorkflowTaskCompleted", newWorkflowTask.TaskQueue, @@ -645,7 +647,12 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( newMutableState, ) } else { - updateErr = weContext.UpdateWorkflowExecutionAsActive(ctx) + // If completedEvent is not nil (which it means that WT wasn't speculative) + // OR new WT is normal, then mutable state is persisted. + // Otherwise, (both old and new WT are speculative) mutable state is updated in memory only but not persisted. + if completedEvent != nil || newWorkflowTaskType == enumsspb.WORKFLOW_TASK_TYPE_NORMAL { + updateErr = weContext.UpdateWorkflowExecutionAsActive(ctx) + } } if updateErr != nil { @@ -686,13 +693,30 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( return nil, updateErr } + // Create speculative workflow task after mutable state is persisted. + if newWorkflowTaskType == enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { + newWorkflowTask, err = ms.AddWorkflowTaskScheduledEvent(bypassTaskGeneration, newWorkflowTaskType) + if err != nil { + return nil, err + } + _, newWorkflowTask, err = ms.AddWorkflowTaskStartedEvent( + newWorkflowTask.ScheduledEventID, + "request-from-RespondWorkflowTaskCompleted", + newWorkflowTask.TaskQueue, + request.Identity, + ) + if err != nil { + return nil, err + } + } + // Send update results to gRPC API callers. err = weContext.UpdateRegistry().ProcessIncomingMessages(req.GetCompleteRequest().GetMessages()) if err != nil { return nil, err } - handler.handleBufferedQueries(ms, req.GetCompleteRequest().GetQueryResults(), createNewWorkflowTask, namespaceEntry, workflowTaskHeartbeating) + handler.handleBufferedQueries(ms, req.GetCompleteRequest().GetQueryResults(), newWorkflowTask != nil, namespaceEntry, workflowTaskHeartbeating) if workflowTaskHeartbeatTimeout { // at this point, update is successful, but we still return an error to client so that the worker will give up this workflow @@ -704,9 +728,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( } resp := &historyservice.RespondWorkflowTaskCompletedResponse{} - if request.GetReturnNewWorkflowTask() && createNewWorkflowTask { - workflowTask := ms.GetWorkflowTaskByID(newWorkflowTaskScheduledEventID) - resp.StartedResponse, err = handler.createRecordWorkflowTaskStartedResponse(ms, weContext.UpdateRegistry(), workflowTask, request.GetIdentity()) + if request.GetReturnNewWorkflowTask() && newWorkflowTask != nil { + resp.StartedResponse, err = handler.createRecordWorkflowTaskStartedResponse(ms, weContext.UpdateRegistry(), newWorkflowTask, request.GetIdentity()) if err != nil { return nil, err } diff --git a/tests/update_workflow_test.go b/tests/update_workflow_test.go index eaf1f11975c..731449d4335 100644 --- a/tests/update_workflow_test.go +++ b/tests/update_workflow_test.go @@ -1532,3 +1532,186 @@ func (s *integrationSuite) TestUpdateWorkflow_FailWorkflowTask() { 11 WorkflowTaskCompleted 12 WorkflowExecutionCompleted`, events) } + +func (s *integrationSuite) TestUpdateWorkflow_ConvertSpeculativeWorkflowTaskToNormal_BecauseOfBufferedSignal() { + id := "integration-update-workflow-test-8" + wt := "integration-update-workflow-test-8-type" + tq := "integration-update-workflow-test-8-task-queue" + + workflowType := &commonpb.WorkflowType{Name: wt} + taskQueue := &taskqueuepb.TaskQueue{Name: tq} + + request := &workflowservice.StartWorkflowExecutionRequest{ + RequestId: uuid.New(), + Namespace: s.namespace, + WorkflowId: id, + WorkflowType: workflowType, + TaskQueue: taskQueue, + } + + startResp, err := s.engine.StartWorkflowExecution(NewContext(), request) + s.NoError(err) + + we := &commonpb.WorkflowExecution{ + WorkflowId: id, + RunId: startResp.GetRunId(), + } + + wtHandlerCalls := 0 + wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType, previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) { + wtHandlerCalls++ + switch wtHandlerCalls { + case 1: + // Completes first WT with update unrelated command. + return []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: strconv.Itoa(1), + ActivityType: &commonpb.ActivityType{Name: "activity_type_1"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: tq}, + ScheduleToCloseTimeout: timestamp.DurationPtr(10 * time.Hour), + }}, + }}, nil + case 2: + // Speculative WT, with update.Request message. + s.EqualHistory(` + 1 WorkflowExecutionStarted + 2 WorkflowTaskScheduled + 3 WorkflowTaskStarted + 4 WorkflowTaskCompleted + 5 ActivityTaskScheduled + 6 WorkflowTaskScheduled + 7 WorkflowTaskStarted`, history) + // Send signal which will be buffered. This will persist MS and speculative WT must be converted to normal. + err = s.sendSignal(s.namespace, we, "SignalName", payloads.EncodeString("signal_data"), "worker_identity") + s.NoError(err) + return nil, nil + case 3: + s.EqualHistory(` + 8 WorkflowTaskCompleted + 9 WorkflowExecutionSignaled + 10 WorkflowTaskScheduled + 11 WorkflowTaskStarted`, history) + return []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{ + Result: payloads.EncodeString("done"), + }}, + }}, nil + default: + s.Failf("wtHandler called too many times", "wtHandler shouldn't be called %d times", wtHandlerCalls) + return nil, nil + } + } + + msgHandlerCalls := 0 + msgHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*protocolpb.Message, error) { + msgHandlerCalls++ + switch msgHandlerCalls { + case 1: + return nil, nil + case 2: + updRequestMsg := task.Messages[0] + updRequest := unmarshalAny[*updatepb.Request](s, updRequestMsg.GetBody()) + + s.Equal(payloads.EncodeString("update args"), updRequest.GetInput().GetArgs()) + s.Equal("update_handler", updRequest.GetInput().GetName()) + s.EqualValues(6, updRequestMsg.GetEventId()) + + // Update is rejected but corresponding speculative WT will be in the history anyway, because it was converted to normal due to buffered signal. + return []*protocolpb.Message{ + { + Id: uuid.New(), + ProtocolInstanceId: updRequest.GetMeta().GetUpdateId(), + SequencingId: nil, + Body: marshalAny(s, &updatepb.Rejection{ + RejectedRequestMessageId: updRequestMsg.GetId(), + RejectedRequestSequencingEventId: updRequestMsg.GetEventId(), + RejectedRequest: updRequest, + Failure: &failurepb.Failure{ + Message: "update rejected", + FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{}}, + }, + }), + }, + }, nil + case 3: + return nil, nil + default: + s.Failf("msgHandler called too many times", "msgHandler shouldn't be called %d times", msgHandlerCalls) + return nil, nil + } + } + + poller := &TaskPoller{ + Engine: s.engine, + Namespace: s.namespace, + TaskQueue: taskQueue, + WorkflowTaskHandler: wtHandler, + MessageHandler: msgHandler, + Logger: s.Logger, + T: s.T(), + } + + // Start activity using existing workflow task. + _, err = poller.PollAndProcessWorkflowTask(true, false) + s.NoError(err) + + type UpdateResult struct { + Response *workflowservice.UpdateWorkflowExecutionResponse + Err error + } + updateResultCh := make(chan UpdateResult) + updateWorkflowFn := func() { + updateResponse, err1 := s.engine.UpdateWorkflowExecution(NewContext(), &workflowservice.UpdateWorkflowExecutionRequest{ + Namespace: s.namespace, + WorkflowExecution: we, + Request: &updatepb.Request{ + Meta: &updatepb.Meta{UpdateId: uuid.New()}, + Input: &updatepb.Input{ + Name: "update_handler", + Args: payloads.EncodeString("update args"), + }, + }, + }) + s.NoError(err1) + updateResultCh <- UpdateResult{Response: updateResponse, Err: err1} + } + go updateWorkflowFn() + time.Sleep(500 * time.Millisecond) // This is to make sure that update gets to the server. + + // Process update in workflow. + _, updateResp, err := poller.PollAndProcessWorkflowTaskWithAttemptAndRetryAndForceNewWorkflowTask(false, false, false, false, 1, 5, true, nil) + s.NoError(err) + updateResult := <-updateResultCh + s.NoError(updateResult.Err) + s.Equal("update rejected", updateResult.Response.GetOutcome().GetFailure().GetMessage()) + s.EqualValues(0, updateResp.ResetHistoryEventId) + + // Complete workflow. + completeWorkflowResp, err := poller.HandlePartialWorkflowTask(updateResp.GetWorkflowTask(), true) + s.NoError(err) + s.NotNil(completeWorkflowResp) + s.Nil(completeWorkflowResp.GetWorkflowTask()) + s.EqualValues(0, completeWorkflowResp.ResetHistoryEventId) + + s.Equal(3, wtHandlerCalls) + s.Equal(3, msgHandlerCalls) + + events := s.getHistory(s.namespace, we) + + s.EqualHistoryEvents(` + 1 WorkflowExecutionStarted + 2 WorkflowTaskScheduled + 3 WorkflowTaskStarted + 4 WorkflowTaskCompleted + 5 ActivityTaskScheduled + 6 WorkflowTaskScheduled + 7 WorkflowTaskStarted + 8 WorkflowTaskCompleted + 9 WorkflowExecutionSignaled + 10 WorkflowTaskScheduled + 11 WorkflowTaskStarted + 12 WorkflowTaskCompleted + 13 WorkflowExecutionCompleted`, events) +}