Skip to content

Commit

Permalink
Use WTFailed.EventId as completion_event_batch_id when workflow is te…
Browse files Browse the repository at this point in the history
…rminated or timed out (#4392)
  • Loading branch information
alexshtin authored May 26, 2023
1 parent 9aa4ade commit 0059b83
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 55 deletions.
2 changes: 0 additions & 2 deletions service/history/api/deleteworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,9 @@ func Invoke(
weCtx,
func(workflowContext api.WorkflowContext) (*api.UpdateWorkflowAction, error) {
mutableState := workflowContext.GetMutableState()
eventBatchFirstEventID := mutableState.GetNextEventID()

return api.UpdateWorkflowWithoutWorkflowTask, workflow.TerminateWorkflow(
mutableState,
eventBatchFirstEventID,
"Delete workflow execution",
nil,
consts.IdentityHistoryService,
Expand Down
3 changes: 0 additions & 3 deletions service/history/api/terminateworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,8 @@ func Invoke(
}
}

eventBatchFirstEventID := mutableState.GetNextEventID()

return api.UpdateWorkflowWithoutWorkflowTask, workflow.TerminateWorkflow(
mutableState,
eventBatchFirstEventID,
request.GetReason(),
request.GetDetails(),
request.GetIdentity(),
Expand Down
1 change: 0 additions & 1 deletion service/history/api/workflow_id_reuse_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func ApplyWorkflowIDReusePolicy(

return UpdateWorkflowWithoutWorkflowTask, workflow.TerminateWorkflow(
mutableState,
mutableState.GetNextEventID(),
"TerminateIfRunning WorkflowIdReusePolicy Policy",
payloads.EncodeString(
fmt.Sprintf("terminated by new runID: %s", runID),
Expand Down
30 changes: 19 additions & 11 deletions service/history/ndc/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"fmt"

enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"

enumsspb "go.temporal.io/server/api/enums/v1"
Expand Down Expand Up @@ -220,24 +221,25 @@ func (r *WorkflowImpl) FlushBufferedEvents() error {
return serviceerror.NewInternal("Workflow encountered workflow with buffered events but last write not from current cluster")
}

return r.failWorkflowTask(lastWriteVersion)
_, err = r.failWorkflowTask(lastWriteVersion)
return err
}

func (r *WorkflowImpl) failWorkflowTask(
lastWriteVersion int64,
) error {
) (*historypb.HistoryEvent, error) {

// do not persist the change right now, Workflow requires transaction
if err := r.mutableState.UpdateCurrentVersion(lastWriteVersion, true); err != nil {
return err
return nil, err
}

workflowTask := r.mutableState.GetStartedWorkflowTask()
if workflowTask == nil {
return nil
return nil, nil
}

if _, err := r.mutableState.AddWorkflowTaskFailedEvent(
wtFailedEvent, err := r.mutableState.AddWorkflowTaskFailedEvent(
workflowTask,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_FAILOVER_CLOSE_COMMAND,
nil,
Expand All @@ -246,12 +248,13 @@ func (r *WorkflowImpl) failWorkflowTask(
"",
"",
0,
); err != nil {
return err
)
if err != nil {
return nil, err
}

r.mutableState.FlushBufferedEvents()
return nil
return wtFailedEvent, nil
}

func (r *WorkflowImpl) terminateWorkflow(
Expand All @@ -260,16 +263,21 @@ func (r *WorkflowImpl) terminateWorkflow(
) error {

eventBatchFirstEventID := r.GetMutableState().GetNextEventID()
if err := r.failWorkflowTask(lastWriteVersion); err != nil {
wtFailedEvent, err := r.failWorkflowTask(lastWriteVersion)
if err != nil {
return err
}

if wtFailedEvent != nil {
eventBatchFirstEventID = wtFailedEvent.GetEventId()
}

// do not persist the change right now, Workflow requires transaction
if err := r.mutableState.UpdateCurrentVersion(lastWriteVersion, true); err != nil {
if err = r.mutableState.UpdateCurrentVersion(lastWriteVersion, true); err != nil {
return err
}

_, err := r.mutableState.AddWorkflowExecutionTerminatedEvent(
_, err = r.mutableState.AddWorkflowExecutionTerminatedEvent(
eventBatchFirstEventID,
workflowTerminationReason,
payloads.EncodeString(fmt.Sprintf("terminated by version: %v", incomingLastWriteVersion)),
Expand Down
2 changes: 0 additions & 2 deletions service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,10 +576,8 @@ func (r *workflowResetterImpl) terminateWorkflow(
terminateReason string,
) error {

eventBatchFirstEventID := mutableState.GetNextEventID()
return workflow.TerminateWorkflow(
mutableState,
eventBatchFirstEventID,
terminateReason,
nil,
consts.IdentityResetter,
Expand Down
9 changes: 5 additions & 4 deletions service/history/ndc/workflow_resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,12 +528,13 @@ func (s *workflowResetterSuite) TestTerminateWorkflow() {
ScheduledEventID: 1234,
StartedEventID: 5678,
}
nextEventID := int64(666)
wtFailedEventID := int64(666)
terminateReason := "some random terminate reason"

mutableState := workflow.NewMockMutableState(s.controller)

mutableState.EXPECT().GetNextEventID().Return(nextEventID).AnyTimes()
randomEventID := int64(2208)
mutableState.EXPECT().GetNextEventID().Return(randomEventID).AnyTimes() // This doesn't matter, GetNextEventID is not used if there is started WT.
mutableState.EXPECT().GetStartedWorkflowTask().Return(workflowTask)
mutableState.EXPECT().AddWorkflowTaskFailedEvent(
workflowTask,
Expand All @@ -544,10 +545,10 @@ func (s *workflowResetterSuite) TestTerminateWorkflow() {
"",
"",
int64(0),
).Return(&historypb.HistoryEvent{}, nil)
).Return(&historypb.HistoryEvent{EventId: wtFailedEventID}, nil)
mutableState.EXPECT().FlushBufferedEvents()
mutableState.EXPECT().AddWorkflowExecutionTerminatedEvent(
nextEventID,
wtFailedEventID,
terminateReason,
nil,
consts.IdentityResetter,
Expand Down
9 changes: 5 additions & 4 deletions service/history/ndc/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,11 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Error() {
}

func (s *workflowSuite) TestSuppressWorkflowBy_Terminate() {
lastEventID := int64(2)
randomEventID := int64(2208)
wtFailedEventID := int64(2)
lastEventTaskID := int64(144)
lastEventVersion := int64(12)
s.mockMutableState.EXPECT().GetNextEventID().Return(lastEventID + 1).AnyTimes()
s.mockMutableState.EXPECT().GetNextEventID().Return(randomEventID).AnyTimes() // This doesn't matter, GetNextEventID is not used if there is started WT.
s.mockMutableState.EXPECT().GetLastWriteVersion().Return(lastEventVersion, nil).AnyTimes()
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
NamespaceId: s.namespaceID,
Expand Down Expand Up @@ -294,11 +295,11 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Terminate() {
"",
"",
int64(0),
).Return(&historypb.HistoryEvent{}, nil)
).Return(&historypb.HistoryEvent{EventId: wtFailedEventID}, nil)
s.mockMutableState.EXPECT().FlushBufferedEvents()

s.mockMutableState.EXPECT().AddWorkflowExecutionTerminatedEvent(
lastEventID+1, workflowTerminationReason, gomock.Any(), workflowTerminationIdentity, false,
wtFailedEventID, workflowTerminationReason, gomock.Any(), workflowTerminationIdentity, false,
).Return(&historypb.HistoryEvent{}, nil)

// if workflow is in zombie or finished state, keep as is
Expand Down
2 changes: 0 additions & 2 deletions service/history/timerQueueActiveTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,6 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowTimeoutTask(
return err
}

eventBatchFirstEventID := mutableState.GetNextEventID()
timeoutFailure := failure.NewTimeoutFailure("workflow timeout", enumspb.TIMEOUT_TYPE_START_TO_CLOSE)
backoffInterval := backoff.NoBackoff
retryState := enumspb.RETRY_STATE_TIMEOUT
Expand All @@ -535,7 +534,6 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowTimeoutTask(
// First add timeout workflow event, no matter what we're doing next.
if err := workflow.TimeoutWorkflow(
mutableState,
eventBatchFirstEventID,
retryState,
newRunID,
); err != nil {
Expand Down
4 changes: 0 additions & 4 deletions service/history/workflow/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -991,12 +991,8 @@ func (c *ContextImpl) forceTerminateWorkflow(
return err
}

// Terminate workflow is written as a separate batch and might result in more than one event as we close the
// outstanding workflow task before terminating the workflow
eventBatchFirstEventID := mutableState.GetNextEventID()
return TerminateWorkflow(
mutableState,
eventBatchFirstEventID,
failureReason,
nil,
consts.IdentityHistoryService,
Expand Down
4 changes: 2 additions & 2 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4762,7 +4762,7 @@ func (ms *MutableStateImpl) startTransactionHandleWorkflowTaskFailover() (bool,
}

// we have a workflow task with buffered events on the fly with a lower version, fail it
if err := failWorkflowTask(
if _, err := failWorkflowTask(
ms,
workflowTask,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_FAILOVER_CLOSE_COMMAND,
Expand Down Expand Up @@ -4827,7 +4827,7 @@ func (ms *MutableStateImpl) closeTransactionHandleBufferedEventsLimit(
// Handling buffered events size issue
if workflowTask := ms.GetStartedWorkflowTask(); workflowTask != nil {
// we have a workflow task on the fly with a lower version, fail it
if err := failWorkflowTask(
if _, err := failWorkflowTask(
ms,
workflowTask,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND,
Expand Down
44 changes: 30 additions & 14 deletions service/history/workflow/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
workflowpb "go.temporal.io/api/workflow/v1"

Expand All @@ -46,9 +47,9 @@ func failWorkflowTask(
mutableState MutableState,
workflowTask *WorkflowTaskInfo,
workflowTaskFailureCause enumspb.WorkflowTaskFailedCause,
) error {
) (*historypb.HistoryEvent, error) {

if _, err := mutableState.AddWorkflowTaskFailedEvent(
wtFailedEvent, err := mutableState.AddWorkflowTaskFailedEvent(
workflowTask,
workflowTaskFailureCause,
nil,
Expand All @@ -57,12 +58,13 @@ func failWorkflowTask(
"",
"",
0,
); err != nil {
return err
)
if err != nil {
return nil, err
}

mutableState.FlushBufferedEvents()
return nil
return wtFailedEvent, nil
}

func ScheduleWorkflowTask(
Expand All @@ -83,19 +85,22 @@ func ScheduleWorkflowTask(
func RetryWorkflow(
ctx context.Context,
mutableState MutableState,
eventBatchFirstEventID int64,
parentNamespace namespace.Name,
continueAsNewAttributes *commandpb.ContinueAsNewWorkflowExecutionCommandAttributes,
) (MutableState, error) {

// Check TerminateWorkflow comment bellow.
eventBatchFirstEventID := mutableState.GetNextEventID()
if workflowTask := mutableState.GetStartedWorkflowTask(); workflowTask != nil {
if err := failWorkflowTask(
wtFailedEvent, err := failWorkflowTask(
mutableState,
workflowTask,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND,
); err != nil {
)
if err != nil {
return nil, err
}
eventBatchFirstEventID = wtFailedEvent.GetEventId()
}

_, newMutableState, err := mutableState.AddContinueAsNewEvent(
Expand All @@ -113,19 +118,22 @@ func RetryWorkflow(

func TimeoutWorkflow(
mutableState MutableState,
eventBatchFirstEventID int64,
retryState enumspb.RetryState,
continuedRunID string,
) error {

// Check TerminateWorkflow comment bellow.
eventBatchFirstEventID := mutableState.GetNextEventID()
if workflowTask := mutableState.GetStartedWorkflowTask(); workflowTask != nil {
if err := failWorkflowTask(
wtFailedEvent, err := failWorkflowTask(
mutableState,
workflowTask,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND,
); err != nil {
)
if err != nil {
return err
}
eventBatchFirstEventID = wtFailedEvent.GetEventId()
}

_, err := mutableState.AddTimeoutWorkflowEvent(
Expand All @@ -138,21 +146,29 @@ func TimeoutWorkflow(

func TerminateWorkflow(
mutableState MutableState,
eventBatchFirstEventID int64,
terminateReason string,
terminateDetails *commonpb.Payloads,
terminateIdentity string,
deleteAfterTerminate bool,
) error {

// Terminate workflow is written as a separate batch and might result in more than one event
// if there is started WT which needs to be failed before.
// Failing speculative WT creates 3 events: WTScheduled, WTStarted, and WTFailed.
// First 2 goes to separate batch and eventBatchFirstEventID has to point to WTFailed event.
// If there is no started WT, then eventBatchFirstEventID points to TerminateWorkflow event (which is next event).
eventBatchFirstEventID := mutableState.GetNextEventID()

if workflowTask := mutableState.GetStartedWorkflowTask(); workflowTask != nil {
if err := failWorkflowTask(
wtFailedEvent, err := failWorkflowTask(
mutableState,
workflowTask,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND,
); err != nil {
)
if err != nil {
return err
}
eventBatchFirstEventID = wtFailedEvent.GetEventId()
}

_, err := mutableState.AddWorkflowExecutionTerminatedEvent(
Expand Down
10 changes: 4 additions & 6 deletions service/history/workflowTaskHandlerCallbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,10 +692,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
return nil, err
}

eventBatchFirstEventID := ms.GetNextEventID()
if err := workflow.TerminateWorkflow(
ms,
eventBatchFirstEventID,
common.FailureReasonTransactionSizeExceedsLimit,
payloads.EncodeString(updateErr.Error()),
consts.IdentityHistoryService,
Expand Down Expand Up @@ -980,21 +978,21 @@ func failWorkflowTask(
if err != nil {
return nil, common.EmptyEventID, err
}
if _, err = mutableState.AddWorkflowTaskFailedEvent(
wtFailedEvent, err := mutableState.AddWorkflowTaskFailedEvent(
workflowTask,
wtFailedCause.failedCause,
failure.NewServerFailure(wtFailedCause.Message(), true),
request.GetIdentity(),
request.GetBinaryChecksum(),
"",
"",
0); err != nil {
0)
if err != nil {
return nil, common.EmptyEventID, err
}

nextEventBatchId := mutableState.GetNextEventID() - 1
// Return new mutable state back to the caller for further updates
return mutableState, nextEventBatchId, nil
return mutableState, wtFailedEvent.GetEventId(), nil
}

// Filter function to be passed to mutable_state.HasAnyBufferedEvent
Expand Down
Loading

0 comments on commit 0059b83

Please sign in to comment.