diff --git a/service/history/api/deleteworkflow/api.go b/service/history/api/deleteworkflow/api.go index 368bd269605..b608153aa1a 100644 --- a/service/history/api/deleteworkflow/api.go +++ b/service/history/api/deleteworkflow/api.go @@ -89,11 +89,9 @@ func Invoke( weCtx, func(workflowContext api.WorkflowContext) (*api.UpdateWorkflowAction, error) { mutableState := workflowContext.GetMutableState() - eventBatchFirstEventID := mutableState.GetNextEventID() return api.UpdateWorkflowWithoutWorkflowTask, workflow.TerminateWorkflow( mutableState, - eventBatchFirstEventID, "Delete workflow execution", nil, consts.IdentityHistoryService, diff --git a/service/history/api/terminateworkflow/api.go b/service/history/api/terminateworkflow/api.go index ec85ae07fb9..17f4e1044c6 100644 --- a/service/history/api/terminateworkflow/api.go +++ b/service/history/api/terminateworkflow/api.go @@ -89,11 +89,8 @@ func Invoke( } } - eventBatchFirstEventID := mutableState.GetNextEventID() - return api.UpdateWorkflowWithoutWorkflowTask, workflow.TerminateWorkflow( mutableState, - eventBatchFirstEventID, request.GetReason(), request.GetDetails(), request.GetIdentity(), diff --git a/service/history/api/workflow_id_reuse_policy.go b/service/history/api/workflow_id_reuse_policy.go index 0ab12d1d4e7..b712949e057 100644 --- a/service/history/api/workflow_id_reuse_policy.go +++ b/service/history/api/workflow_id_reuse_policy.go @@ -65,7 +65,6 @@ func ApplyWorkflowIDReusePolicy( return UpdateWorkflowWithoutWorkflowTask, workflow.TerminateWorkflow( mutableState, - mutableState.GetNextEventID(), "TerminateIfRunning WorkflowIdReusePolicy Policy", payloads.EncodeString( fmt.Sprintf("terminated by new runID: %s", runID), diff --git a/service/history/ndc/workflow.go b/service/history/ndc/workflow.go index e89496b1fb0..0518a088191 100644 --- a/service/history/ndc/workflow.go +++ b/service/history/ndc/workflow.go @@ -31,6 +31,7 @@ import ( "fmt" enumspb "go.temporal.io/api/enums/v1" + historypb "go.temporal.io/api/history/v1" "go.temporal.io/api/serviceerror" enumsspb "go.temporal.io/server/api/enums/v1" @@ -220,24 +221,25 @@ func (r *WorkflowImpl) FlushBufferedEvents() error { return serviceerror.NewInternal("Workflow encountered workflow with buffered events but last write not from current cluster") } - return r.failWorkflowTask(lastWriteVersion) + _, err = r.failWorkflowTask(lastWriteVersion) + return err } func (r *WorkflowImpl) failWorkflowTask( lastWriteVersion int64, -) error { +) (*historypb.HistoryEvent, error) { // do not persist the change right now, Workflow requires transaction if err := r.mutableState.UpdateCurrentVersion(lastWriteVersion, true); err != nil { - return err + return nil, err } workflowTask := r.mutableState.GetStartedWorkflowTask() if workflowTask == nil { - return nil + return nil, nil } - if _, err := r.mutableState.AddWorkflowTaskFailedEvent( + wtFailedEvent, err := r.mutableState.AddWorkflowTaskFailedEvent( workflowTask, enumspb.WORKFLOW_TASK_FAILED_CAUSE_FAILOVER_CLOSE_COMMAND, nil, @@ -246,12 +248,13 @@ func (r *WorkflowImpl) failWorkflowTask( "", "", 0, - ); err != nil { - return err + ) + if err != nil { + return nil, err } r.mutableState.FlushBufferedEvents() - return nil + return wtFailedEvent, nil } func (r *WorkflowImpl) terminateWorkflow( @@ -260,16 +263,21 @@ func (r *WorkflowImpl) terminateWorkflow( ) error { eventBatchFirstEventID := r.GetMutableState().GetNextEventID() - if err := r.failWorkflowTask(lastWriteVersion); err != nil { + wtFailedEvent, err := r.failWorkflowTask(lastWriteVersion) + if err != nil { return err } + if wtFailedEvent != nil { + eventBatchFirstEventID = wtFailedEvent.GetEventId() + } + // do not persist the change right now, Workflow requires transaction - if err := r.mutableState.UpdateCurrentVersion(lastWriteVersion, true); err != nil { + if err = r.mutableState.UpdateCurrentVersion(lastWriteVersion, true); err != nil { return err } - _, err := r.mutableState.AddWorkflowExecutionTerminatedEvent( + _, err = r.mutableState.AddWorkflowExecutionTerminatedEvent( eventBatchFirstEventID, workflowTerminationReason, payloads.EncodeString(fmt.Sprintf("terminated by version: %v", incomingLastWriteVersion)), diff --git a/service/history/ndc/workflow_resetter.go b/service/history/ndc/workflow_resetter.go index ec4625c8700..941743c8261 100644 --- a/service/history/ndc/workflow_resetter.go +++ b/service/history/ndc/workflow_resetter.go @@ -576,10 +576,8 @@ func (r *workflowResetterImpl) terminateWorkflow( terminateReason string, ) error { - eventBatchFirstEventID := mutableState.GetNextEventID() return workflow.TerminateWorkflow( mutableState, - eventBatchFirstEventID, terminateReason, nil, consts.IdentityResetter, diff --git a/service/history/ndc/workflow_resetter_test.go b/service/history/ndc/workflow_resetter_test.go index 631ab97b907..5cb91ee5e72 100644 --- a/service/history/ndc/workflow_resetter_test.go +++ b/service/history/ndc/workflow_resetter_test.go @@ -528,12 +528,13 @@ func (s *workflowResetterSuite) TestTerminateWorkflow() { ScheduledEventID: 1234, StartedEventID: 5678, } - nextEventID := int64(666) + wtFailedEventID := int64(666) terminateReason := "some random terminate reason" mutableState := workflow.NewMockMutableState(s.controller) - mutableState.EXPECT().GetNextEventID().Return(nextEventID).AnyTimes() + randomEventID := int64(2208) + mutableState.EXPECT().GetNextEventID().Return(randomEventID).AnyTimes() // This doesn't matter, GetNextEventID is not used if there is started WT. mutableState.EXPECT().GetStartedWorkflowTask().Return(workflowTask) mutableState.EXPECT().AddWorkflowTaskFailedEvent( workflowTask, @@ -544,10 +545,10 @@ func (s *workflowResetterSuite) TestTerminateWorkflow() { "", "", int64(0), - ).Return(&historypb.HistoryEvent{}, nil) + ).Return(&historypb.HistoryEvent{EventId: wtFailedEventID}, nil) mutableState.EXPECT().FlushBufferedEvents() mutableState.EXPECT().AddWorkflowExecutionTerminatedEvent( - nextEventID, + wtFailedEventID, terminateReason, nil, consts.IdentityResetter, diff --git a/service/history/ndc/workflow_test.go b/service/history/ndc/workflow_test.go index 5c72bb1dccf..14f0153ae46 100644 --- a/service/history/ndc/workflow_test.go +++ b/service/history/ndc/workflow_test.go @@ -231,10 +231,11 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Error() { } func (s *workflowSuite) TestSuppressWorkflowBy_Terminate() { - lastEventID := int64(2) + randomEventID := int64(2208) + wtFailedEventID := int64(2) lastEventTaskID := int64(144) lastEventVersion := int64(12) - s.mockMutableState.EXPECT().GetNextEventID().Return(lastEventID + 1).AnyTimes() + s.mockMutableState.EXPECT().GetNextEventID().Return(randomEventID).AnyTimes() // This doesn't matter, GetNextEventID is not used if there is started WT. s.mockMutableState.EXPECT().GetLastWriteVersion().Return(lastEventVersion, nil).AnyTimes() s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ NamespaceId: s.namespaceID, @@ -294,11 +295,11 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Terminate() { "", "", int64(0), - ).Return(&historypb.HistoryEvent{}, nil) + ).Return(&historypb.HistoryEvent{EventId: wtFailedEventID}, nil) s.mockMutableState.EXPECT().FlushBufferedEvents() s.mockMutableState.EXPECT().AddWorkflowExecutionTerminatedEvent( - lastEventID+1, workflowTerminationReason, gomock.Any(), workflowTerminationIdentity, false, + wtFailedEventID, workflowTerminationReason, gomock.Any(), workflowTerminationIdentity, false, ).Return(&historypb.HistoryEvent{}, nil) // if workflow is in zombie or finished state, keep as is diff --git a/service/history/timerQueueActiveTaskExecutor.go b/service/history/timerQueueActiveTaskExecutor.go index 371028d01e2..6f41abf346d 100644 --- a/service/history/timerQueueActiveTaskExecutor.go +++ b/service/history/timerQueueActiveTaskExecutor.go @@ -509,7 +509,6 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowTimeoutTask( return err } - eventBatchFirstEventID := mutableState.GetNextEventID() timeoutFailure := failure.NewTimeoutFailure("workflow timeout", enumspb.TIMEOUT_TYPE_START_TO_CLOSE) backoffInterval := backoff.NoBackoff retryState := enumspb.RETRY_STATE_TIMEOUT @@ -535,7 +534,6 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowTimeoutTask( // First add timeout workflow event, no matter what we're doing next. if err := workflow.TimeoutWorkflow( mutableState, - eventBatchFirstEventID, retryState, newRunID, ); err != nil { diff --git a/service/history/workflow/context.go b/service/history/workflow/context.go index 8f10347a9a6..468d89b693e 100644 --- a/service/history/workflow/context.go +++ b/service/history/workflow/context.go @@ -991,12 +991,8 @@ func (c *ContextImpl) forceTerminateWorkflow( return err } - // Terminate workflow is written as a separate batch and might result in more than one event as we close the - // outstanding workflow task before terminating the workflow - eventBatchFirstEventID := mutableState.GetNextEventID() return TerminateWorkflow( mutableState, - eventBatchFirstEventID, failureReason, nil, consts.IdentityHistoryService, diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index c62776ba2d2..9901f8a0d3d 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4762,7 +4762,7 @@ func (ms *MutableStateImpl) startTransactionHandleWorkflowTaskFailover() (bool, } // we have a workflow task with buffered events on the fly with a lower version, fail it - if err := failWorkflowTask( + if _, err := failWorkflowTask( ms, workflowTask, enumspb.WORKFLOW_TASK_FAILED_CAUSE_FAILOVER_CLOSE_COMMAND, @@ -4827,7 +4827,7 @@ func (ms *MutableStateImpl) closeTransactionHandleBufferedEventsLimit( // Handling buffered events size issue if workflowTask := ms.GetStartedWorkflowTask(); workflowTask != nil { // we have a workflow task on the fly with a lower version, fail it - if err := failWorkflowTask( + if _, err := failWorkflowTask( ms, workflowTask, enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND, diff --git a/service/history/workflow/util.go b/service/history/workflow/util.go index ca7a82e026a..e23ce35eff6 100644 --- a/service/history/workflow/util.go +++ b/service/history/workflow/util.go @@ -30,6 +30,7 @@ import ( commandpb "go.temporal.io/api/command/v1" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" + historypb "go.temporal.io/api/history/v1" "go.temporal.io/api/serviceerror" workflowpb "go.temporal.io/api/workflow/v1" @@ -46,9 +47,9 @@ func failWorkflowTask( mutableState MutableState, workflowTask *WorkflowTaskInfo, workflowTaskFailureCause enumspb.WorkflowTaskFailedCause, -) error { +) (*historypb.HistoryEvent, error) { - if _, err := mutableState.AddWorkflowTaskFailedEvent( + wtFailedEvent, err := mutableState.AddWorkflowTaskFailedEvent( workflowTask, workflowTaskFailureCause, nil, @@ -57,12 +58,13 @@ func failWorkflowTask( "", "", 0, - ); err != nil { - return err + ) + if err != nil { + return nil, err } mutableState.FlushBufferedEvents() - return nil + return wtFailedEvent, nil } func ScheduleWorkflowTask( @@ -83,19 +85,22 @@ func ScheduleWorkflowTask( func RetryWorkflow( ctx context.Context, mutableState MutableState, - eventBatchFirstEventID int64, parentNamespace namespace.Name, continueAsNewAttributes *commandpb.ContinueAsNewWorkflowExecutionCommandAttributes, ) (MutableState, error) { + // Check TerminateWorkflow comment bellow. + eventBatchFirstEventID := mutableState.GetNextEventID() if workflowTask := mutableState.GetStartedWorkflowTask(); workflowTask != nil { - if err := failWorkflowTask( + wtFailedEvent, err := failWorkflowTask( mutableState, workflowTask, enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND, - ); err != nil { + ) + if err != nil { return nil, err } + eventBatchFirstEventID = wtFailedEvent.GetEventId() } _, newMutableState, err := mutableState.AddContinueAsNewEvent( @@ -113,19 +118,22 @@ func RetryWorkflow( func TimeoutWorkflow( mutableState MutableState, - eventBatchFirstEventID int64, retryState enumspb.RetryState, continuedRunID string, ) error { + // Check TerminateWorkflow comment bellow. + eventBatchFirstEventID := mutableState.GetNextEventID() if workflowTask := mutableState.GetStartedWorkflowTask(); workflowTask != nil { - if err := failWorkflowTask( + wtFailedEvent, err := failWorkflowTask( mutableState, workflowTask, enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND, - ); err != nil { + ) + if err != nil { return err } + eventBatchFirstEventID = wtFailedEvent.GetEventId() } _, err := mutableState.AddTimeoutWorkflowEvent( @@ -138,21 +146,29 @@ func TimeoutWorkflow( func TerminateWorkflow( mutableState MutableState, - eventBatchFirstEventID int64, terminateReason string, terminateDetails *commonpb.Payloads, terminateIdentity string, deleteAfterTerminate bool, ) error { + // Terminate workflow is written as a separate batch and might result in more than one event + // if there is started WT which needs to be failed before. + // Failing speculative WT creates 3 events: WTScheduled, WTStarted, and WTFailed. + // First 2 goes to separate batch and eventBatchFirstEventID has to point to WTFailed event. + // If there is no started WT, then eventBatchFirstEventID points to TerminateWorkflow event (which is next event). + eventBatchFirstEventID := mutableState.GetNextEventID() + if workflowTask := mutableState.GetStartedWorkflowTask(); workflowTask != nil { - if err := failWorkflowTask( + wtFailedEvent, err := failWorkflowTask( mutableState, workflowTask, enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND, - ); err != nil { + ) + if err != nil { return err } + eventBatchFirstEventID = wtFailedEvent.GetEventId() } _, err := mutableState.AddWorkflowExecutionTerminatedEvent( diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index a6af161dbe6..915a5e80fb9 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -692,10 +692,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( return nil, err } - eventBatchFirstEventID := ms.GetNextEventID() if err := workflow.TerminateWorkflow( ms, - eventBatchFirstEventID, common.FailureReasonTransactionSizeExceedsLimit, payloads.EncodeString(updateErr.Error()), consts.IdentityHistoryService, @@ -980,7 +978,7 @@ func failWorkflowTask( if err != nil { return nil, common.EmptyEventID, err } - if _, err = mutableState.AddWorkflowTaskFailedEvent( + wtFailedEvent, err := mutableState.AddWorkflowTaskFailedEvent( workflowTask, wtFailedCause.failedCause, failure.NewServerFailure(wtFailedCause.Message(), true), @@ -988,13 +986,13 @@ func failWorkflowTask( request.GetBinaryChecksum(), "", "", - 0); err != nil { + 0) + if err != nil { return nil, common.EmptyEventID, err } - nextEventBatchId := mutableState.GetNextEventID() - 1 // Return new mutable state back to the caller for further updates - return mutableState, nextEventBatchId, nil + return mutableState, wtFailedEvent.GetEventId(), nil } // Filter function to be passed to mutable_state.HasAnyBufferedEvent diff --git a/tests/update_workflow_test.go b/tests/update_workflow_test.go index af142d415ba..629beef0346 100644 --- a/tests/update_workflow_test.go +++ b/tests/update_workflow_test.go @@ -42,6 +42,7 @@ import ( updatepb "go.temporal.io/api/update/v1" "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/api/adminservice/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/primitives/timestamp" @@ -2441,6 +2442,14 @@ func (s *integrationSuite) TestUpdateWorkflow_StartedSpeculativeWorkflowTask_Ter 7 WorkflowTaskStarted 8 WorkflowTaskFailed 9 WorkflowExecutionTerminated`, events) + + msResp, err := s.adminClient.DescribeMutableState(NewContext(), &adminservice.DescribeMutableStateRequest{ + Namespace: s.namespace, + Execution: tv.WorkflowExecution(), + }) + s.NoError(err) + // completion_event_batch_id should point to WTFailed event. + s.EqualValues(8, msResp.GetDatabaseMutableState().GetExecutionInfo().GetCompletionEventBatchId()) } func (s *integrationSuite) TestUpdateWorkflow_ScheduledSpeculativeWorkflowTask_TerminateWorkflow() { @@ -2541,4 +2550,12 @@ func (s *integrationSuite) TestUpdateWorkflow_ScheduledSpeculativeWorkflowTask_T 4 WorkflowTaskCompleted 5 ActivityTaskScheduled 6 WorkflowExecutionTerminated`, events) + + msResp, err := s.adminClient.DescribeMutableState(NewContext(), &adminservice.DescribeMutableStateRequest{ + Namespace: s.namespace, + Execution: tv.WorkflowExecution(), + }) + s.NoError(err) + // completion_event_batch_id should point to WFTerminated event. + s.EqualValues(6, msResp.GetDatabaseMutableState().GetExecutionInfo().GetCompletionEventBatchId()) }