From d068f4bb7de55fc8ca21317bff764400785f295e Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Fri, 10 Mar 2023 18:00:49 -0800 Subject: [PATCH] Rename inflight WT to started WT --- service/history/api/describeworkflow/api.go | 2 +- service/history/api/get_workflow_util.go | 2 +- service/history/api/queryworkflow/api.go | 6 +- service/history/api/reapplyevents/api.go | 2 +- service/history/api/startworkflow/api.go | 39 +++--- service/history/historyEngine_test.go | 34 +++--- service/history/ndc/branch_manager.go | 2 +- service/history/ndc/branch_manager_test.go | 12 +- service/history/ndc/transaction_manager.go | 3 +- .../history/ndc/transaction_manager_test.go | 4 +- service/history/ndc/workflow.go | 6 +- service/history/ndc/workflow_resetter.go | 6 +- service/history/ndc/workflow_resetter_test.go | 8 +- service/history/ndc/workflow_test.go | 6 +- .../history/timerQueueActiveTaskExecutor.go | 6 +- .../timerQueueActiveTaskExecutor_test.go | 8 +- .../history/timerQueueStandbyTaskExecutor.go | 6 +- .../transferQueueActiveTaskExecutor.go | 4 +- .../transferQueueStandbyTaskExecutor.go | 4 +- service/history/workflow/mutable_state.go | 12 +- .../history/workflow/mutable_state_impl.go | 44 ++++--- .../workflow/mutable_state_impl_test.go | 4 +- .../history/workflow/mutable_state_mock.go | 113 +++++++++--------- service/history/workflow/task_generator.go | 10 +- service/history/workflow/task_refresher.go | 6 +- service/history/workflow/util.go | 6 +- .../workflow/workflow_task_state_machine.go | 47 ++++---- .../history/workflowTaskHandlerCallbacks.go | 26 ++-- tests/max_buffered_event_test.go | 2 +- 29 files changed, 211 insertions(+), 219 deletions(-) diff --git a/service/history/api/describeworkflow/api.go b/service/history/api/describeworkflow/api.go index 9c0becf0fbb..aa36850c398 100644 --- a/service/history/api/describeworkflow/api.go +++ b/service/history/api/describeworkflow/api.go @@ -183,7 +183,7 @@ func Invoke( } } - if pendingWorkflowTask, ok := mutableState.GetPendingWorkflowTask(); ok { + if pendingWorkflowTask := mutableState.GetPendingWorkflowTask(); pendingWorkflowTask != nil { result.PendingWorkflowTask = &workflowpb.PendingWorkflowTaskInfo{ State: enumspb.PENDING_WORKFLOW_TASK_STATE_SCHEDULED, ScheduledTime: pendingWorkflowTask.ScheduledTime, diff --git a/service/history/api/get_workflow_util.go b/service/history/api/get_workflow_util.go index 8e0c440544e..0020b7b50de 100644 --- a/service/history/api/get_workflow_util.go +++ b/service/history/api/get_workflow_util.go @@ -195,7 +195,7 @@ func MutableStateToGetResponse( LastFirstEventId: lastFirstEventID, LastFirstEventTxnId: lastFirstEventTxnID, NextEventId: mutableState.GetNextEventID(), - PreviousStartedEventId: mutableState.GetPreviousStartedEventID(), + PreviousStartedEventId: mutableState.GetLastWorkflowTaskStartedEventID(), TaskQueue: &taskqueuepb.TaskQueue{ Name: executionInfo.TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL, diff --git a/service/history/api/queryworkflow/api.go b/service/history/api/queryworkflow/api.go index 66ce2bf1039..6158d029929 100644 --- a/service/history/api/queryworkflow/api.go +++ b/service/history/api/queryworkflow/api.go @@ -109,7 +109,7 @@ func Invoke( } mutableState := weCtx.GetMutableState() - if !mutableState.HasProcessedOrPendingWorkflowTask() { + if !mutableState.HadOrHasWorkflowTask() { // workflow has no workflow task ever scheduled, this usually is due to firstWorkflowTaskBackoff (cron / retry) // in this case, don't buffer the query, because it is almost certain the query will time out. return nil, consts.ErrWorkflowTaskNotScheduled @@ -122,14 +122,14 @@ func Invoke( // is used to determine if a query can be safely dispatched directly through matching or must be dispatched on a workflow task. // // Precondition to dispatch query directly to matching is workflow has at least one WorkflowTaskStarted event. Otherwise, sdk would panic. - if mutableState.GetPreviousStartedEventID() != common.EmptyEventID { + if mutableState.GetLastWorkflowTaskStartedEventID() != common.EmptyEventID { // There are three cases in which a query can be dispatched directly through matching safely, without violating strong consistency level: // 1. the namespace is not active, in this case history is immutable so a query dispatched at any time is consistent // 2. the workflow is not running, whenever a workflow is not running dispatching query directly is consistent // 3. if there is no pending or started workflow tasks it means no events came before query arrived, so its safe to dispatch directly safeToDispatchDirectly := !nsEntry.ActiveInCluster(shard.GetClusterMetadata().GetCurrentClusterName()) || !mutableState.IsWorkflowExecutionRunning() || - (!mutableState.HasPendingWorkflowTask() && !mutableState.HasInFlightWorkflowTask()) + (!mutableState.HasPendingWorkflowTask() && !mutableState.HasStartedWorkflowTask()) if safeToDispatchDirectly { msResp, err := api.MutableStateToGetResponse(mutableState) if err != nil { diff --git a/service/history/api/reapplyevents/api.go b/service/history/api/reapplyevents/api.go index d58a90fb184..df3c31bd798 100644 --- a/service/history/api/reapplyevents/api.go +++ b/service/history/api/reapplyevents/api.go @@ -109,7 +109,7 @@ func Invoke( // to accept events to be reapplied baseRunID := mutableState.GetExecutionState().GetRunId() resetRunID := uuid.New() - baseRebuildLastEventID := mutableState.GetPreviousStartedEventID() + baseRebuildLastEventID := mutableState.GetLastWorkflowTaskStartedEventID() // TODO when https://github.com/uber/cadence/issues/2420 is finished, remove this block, // since cannot reapply event to a finished workflow which had no workflow tasks started diff --git a/service/history/api/startworkflow/api.go b/service/history/api/startworkflow/api.go index f5f1e7006ec..0638a6650d1 100644 --- a/service/history/api/startworkflow/api.go +++ b/service/history/api/startworkflow/api.go @@ -33,6 +33,7 @@ import ( historypb "go.temporal.io/api/history/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" + tokenspb "go.temporal.io/server/api/token/v1" "go.temporal.io/server/api/historyservice/v1" @@ -78,10 +79,9 @@ type creationParams struct { // mutableStateInfo is a container for the relevant mutable state information to generate a start response with an eager // workflow task. type mutableStateInfo struct { - branchToken []byte - lastEventID int64 - workflowTaskInfo *workflow.WorkflowTaskInfo - hasInflight bool + branchToken []byte + lastEventID int64 + workflowTask *workflow.WorkflowTaskInfo } // NewStarter creates a new starter, fails if getting the active namespace fails. @@ -196,9 +196,9 @@ func (s *Starter) createNewMutableState(ctx context.Context, workflowID string, now := s.shardCtx.GetTimeSource().Now() mutableState := workflowContext.GetMutableState() - workflowTaskInfo, hasInflight := mutableState.GetInFlightWorkflowTask() - if s.requestEagerStart() && !hasInflight { - return nil, serviceerror.NewInternal("unexpected error: mutable state did not have an inflight workflow task") + workflowTaskInfo := mutableState.GetStartedWorkflowTask() + if s.requestEagerStart() && workflowTaskInfo == nil { + return nil, serviceerror.NewInternal("unexpected error: mutable state did not have a started workflow task") } workflowSnapshot, eventBatches, err := mutableState.CloseTransactionAsSnapshot( now, @@ -367,7 +367,7 @@ func (s *Starter) applyWorkflowIDReusePolicy( if err != nil { return nil, err } - return s.generateResponse(creationParams.runID, mutableStateInfo.workflowTaskInfo, events) + return s.generateResponse(creationParams.runID, mutableStateInfo.workflowTask, events) case consts.ErrWorkflowCompleted: // previous workflow already closed // fallthough to the logic for only creating the new workflow below @@ -394,9 +394,9 @@ func (s *Starter) respondToRetriedRequest( return nil, err } - // The current workflow task is not inflight or not the first task or we exceeded the first attempt and fell back to + // The current workflow task is not started or not the first task or we exceeded the first attempt and fell back to // matching based dispatch. - if !mutableStateInfo.hasInflight || mutableStateInfo.workflowTaskInfo.StartedEventID != 3 || mutableStateInfo.workflowTaskInfo.Attempt > 1 { + if mutableStateInfo.workflowTask == nil || mutableStateInfo.workflowTask.StartedEventID != 3 || mutableStateInfo.workflowTask.Attempt > 1 { s.recordEagerDenied(eagerStartDeniedReasonTaskAlreadyDispatched) return &historyservice.StartWorkflowExecutionResponse{ RunId: runID, @@ -408,7 +408,7 @@ func (s *Starter) respondToRetriedRequest( return nil, err } - return s.generateResponse(runID, mutableStateInfo.workflowTaskInfo, events) + return s.generateResponse(runID, mutableStateInfo.workflowTask, events) } // getMutableStateInfo gets the relevant mutable state information while getting the state for the given run from the @@ -443,19 +443,18 @@ func extractMutableStateInfo(mutableState workflow.MutableState) (*mutableStateI } // Future work for the request retry path: extend the task timeout (by failing / timing out the current task). - workflowTaskInfoSource, hasInflight := mutableState.GetInFlightWorkflowTask() - // The workflowTaskInfo returned from the mutable state call is generated on the fly and technically doesn't require + workflowTaskSource := mutableState.GetStartedWorkflowTask() + // The workflowTask returned from the mutable state call is generated on the fly and technically doesn't require // cloning. We clone here just in case that changes. - var workflowTaskInfo workflow.WorkflowTaskInfo - if hasInflight { - workflowTaskInfo = *workflowTaskInfoSource + var workflowTask workflow.WorkflowTaskInfo + if workflowTaskSource != nil { + workflowTask = *workflowTaskSource } return &mutableStateInfo{ - branchToken: branchToken, - lastEventID: mutableState.GetNextEventID() - 1, - workflowTaskInfo: &workflowTaskInfo, - hasInflight: hasInflight, + branchToken: branchToken, + lastEventID: mutableState.GetNextEventID() - 1, + workflowTask: &workflowTask, }, nil } diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index ce6bbc8d132..26667ac4fe8 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -2830,8 +2830,8 @@ func (s *engineSuite) TestRespondActivityTaskCompletedSuccess() { s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, ms2.GetExecutionState().State) s.True(ms2.HasPendingWorkflowTask()) - wt, ok := ms2.GetWorkflowTaskInfo(int64(8)) - s.True(ok) + wt = ms2.GetWorkflowTaskByID(int64(8)) + s.NotNil(wt) s.EqualValues(int64(100), wt.WorkflowTaskTimeout.Seconds()) s.Equal(int64(8), wt.ScheduledEventID) s.Equal(common.EmptyEventID, wt.StartedEventID) @@ -2891,8 +2891,8 @@ func (s *engineSuite) TestRespondActivityTaskCompletedByIdSuccess() { s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, ms2.GetExecutionState().State) s.True(ms2.HasPendingWorkflowTask()) - wt, ok := ms2.GetWorkflowTaskInfo(int64(8)) - s.True(ok) + wt := ms2.GetWorkflowTaskByID(int64(8)) + s.NotNil(wt) s.EqualValues(int64(100), wt.WorkflowTaskTimeout.Seconds()) s.Equal(int64(8), wt.ScheduledEventID) s.Equal(common.EmptyEventID, wt.StartedEventID) @@ -3301,8 +3301,8 @@ func (s *engineSuite) TestRespondActivityTaskFailedSuccess() { s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, ms2.GetExecutionState().State) s.True(ms2.HasPendingWorkflowTask()) - wt, ok := ms2.GetWorkflowTaskInfo(int64(8)) - s.True(ok) + wt = ms2.GetWorkflowTaskByID(int64(8)) + s.NotNil(wt) s.EqualValues(int64(100), wt.WorkflowTaskTimeout.Seconds()) s.Equal(int64(8), wt.ScheduledEventID) s.Equal(common.EmptyEventID, wt.StartedEventID) @@ -3365,8 +3365,8 @@ func (s *engineSuite) TestRespondActivityTaskFailedWithHeartbeatSuccess() { s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, ms2.GetExecutionState().State) s.True(ms2.HasPendingWorkflowTask()) - wt, ok := ms2.GetWorkflowTaskInfo(int64(8)) - s.True(ok) + wt = ms2.GetWorkflowTaskByID(int64(8)) + s.NotNil(wt) s.EqualValues(int64(100), wt.WorkflowTaskTimeout.Seconds()) s.Equal(int64(8), wt.ScheduledEventID) s.Equal(common.EmptyEventID, wt.StartedEventID) @@ -3428,8 +3428,8 @@ func (s *engineSuite) TestRespondActivityTaskFailedByIdSuccess() { s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, ms2.GetExecutionState().State) s.True(ms2.HasPendingWorkflowTask()) - wt, ok := ms2.GetWorkflowTaskInfo(int64(8)) - s.True(ok) + wt := ms2.GetWorkflowTaskByID(int64(8)) + s.NotNil(wt) s.EqualValues(int64(100), wt.WorkflowTaskTimeout.Seconds()) s.Equal(int64(8), wt.ScheduledEventID) s.Equal(common.EmptyEventID, wt.StartedEventID) @@ -3683,8 +3683,8 @@ func (s *engineSuite) TestRespondActivityTaskCanceled_Started() { s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, ms2.GetExecutionState().State) s.True(ms2.HasPendingWorkflowTask()) - wt, ok := ms2.GetWorkflowTaskInfo(int64(9)) - s.True(ok) + wt = ms2.GetWorkflowTaskByID(int64(9)) + s.NotNil(wt) s.EqualValues(int64(100), wt.WorkflowTaskTimeout.Seconds()) s.Equal(int64(9), wt.ScheduledEventID) s.Equal(common.EmptyEventID, wt.StartedEventID) @@ -3744,8 +3744,8 @@ func (s *engineSuite) TestRespondActivityTaskCanceledById_Started() { s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, ms2.GetExecutionState().State) s.True(ms2.HasPendingWorkflowTask()) - wt, ok := ms2.GetWorkflowTaskInfo(int64(9)) - s.True(ok) + wt := ms2.GetWorkflowTaskByID(int64(9)) + s.NotNil(wt) s.EqualValues(int64(100), wt.WorkflowTaskTimeout.Seconds()) s.Equal(int64(9), wt.ScheduledEventID) s.Equal(common.EmptyEventID, wt.StartedEventID) @@ -3972,8 +3972,8 @@ func (s *engineSuite) TestRequestCancel_RespondWorkflowTaskCompleted_Scheduled() s.Equal(int64(7), ms2.GetExecutionInfo().LastWorkflowTaskStartedEventId) s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, ms2.GetExecutionState().State) s.True(ms2.HasPendingWorkflowTask()) - wt2, ok := ms2.GetWorkflowTaskInfo(ms2.GetNextEventID() - 1) - s.True(ok) + wt2 = ms2.GetWorkflowTaskByID(ms2.GetNextEventID() - 1) + s.NotNil(wt2) s.Equal(ms2.GetNextEventID()-1, wt2.ScheduledEventID) s.Equal(int32(1), wt2.Attempt) } @@ -5306,7 +5306,7 @@ func addWorkflowTaskStartedEventWithRequestID(ms workflow.MutableState, schedule } func addWorkflowTaskCompletedEvent(s *suite.Suite, ms workflow.MutableState, scheduledEventID, startedEventID int64, identity string) *historypb.HistoryEvent { - workflowTask, _ := ms.GetWorkflowTaskInfo(scheduledEventID) + workflowTask := ms.GetWorkflowTaskByID(scheduledEventID) s.NotNil(workflowTask) s.Equal(startedEventID, workflowTask.StartedEventID) diff --git a/service/history/ndc/branch_manager.go b/service/history/ndc/branch_manager.go index 3047fbf7dbf..abd439d1055 100644 --- a/service/history/ndc/branch_manager.go +++ b/service/history/ndc/branch_manager.go @@ -170,7 +170,7 @@ func (r *BranchMgrImpl) flushBufferedEvents( // check whether there are buffered events, if so, flush it // NOTE: buffered events does not show in version history or next event id if !r.mutableState.HasBufferedEvents() { - if r.mutableState.HasInFlightWorkflowTask() && r.mutableState.IsTransientWorkflowTask() { + if r.mutableState.HasStartedWorkflowTask() && r.mutableState.IsTransientWorkflowTask() { if err := r.mutableState.ClearTransientWorkflowTask(); err != nil { return nil, 0, err } diff --git a/service/history/ndc/branch_manager_test.go b/service/history/ndc/branch_manager_test.go index 136e68fa2a8..f92de683d62 100644 --- a/service/history/ndc/branch_manager_test.go +++ b/service/history/ndc/branch_manager_test.go @@ -192,7 +192,7 @@ func (s *branchMgrSuite) TestClearTransientWorkflowTask() { s.mockMutableState.EXPECT().GetLastWriteVersion().Return(lastWriteVersion, nil).AnyTimes() s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes() - s.mockMutableState.EXPECT().HasInFlightWorkflowTask().Return(true).AnyTimes() + s.mockMutableState.EXPECT().HasStartedWorkflowTask().Return(true).AnyTimes() s.mockMutableState.EXPECT().IsTransientWorkflowTask().Return(true).AnyTimes() s.mockMutableState.EXPECT().ClearTransientWorkflowTask().Return(nil).AnyTimes() @@ -239,7 +239,7 @@ func (s *branchMgrSuite) TestFlushBufferedEvents() { ScheduledEventID: 1234, StartedEventID: 2345, } - s.mockMutableState.EXPECT().GetInFlightWorkflowTask().Return(workflowTask, true) + s.mockMutableState.EXPECT().GetStartedWorkflowTask().Return(workflowTask) s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ VersionHistories: versionHistories, }).AnyTimes() @@ -284,7 +284,7 @@ func (s *branchMgrSuite) TestPrepareVersionHistory_BranchAppendable_NoMissingEve s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{VersionHistories: versionHistories}).AnyTimes() s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes() - s.mockMutableState.EXPECT().HasInFlightWorkflowTask().Return(true).AnyTimes() + s.mockMutableState.EXPECT().HasStartedWorkflowTask().Return(true).AnyTimes() s.mockMutableState.EXPECT().IsTransientWorkflowTask().Return(false).AnyTimes() doContinue, index, err := s.nDCBranchMgr.prepareVersionHistory( @@ -316,7 +316,7 @@ func (s *branchMgrSuite) TestPrepareVersionHistory_BranchAppendable_MissingEvent s.NoError(err) s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes() - s.mockMutableState.EXPECT().HasInFlightWorkflowTask().Return(true).AnyTimes() + s.mockMutableState.EXPECT().HasStartedWorkflowTask().Return(true).AnyTimes() s.mockMutableState.EXPECT().IsTransientWorkflowTask().Return(false).AnyTimes() s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ NamespaceId: s.namespaceID, @@ -358,7 +358,7 @@ func (s *branchMgrSuite) TestPrepareVersionHistory_BranchNotAppendable_NoMissing newBranchToken := []byte("some random new branch token") s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes() - s.mockMutableState.EXPECT().HasInFlightWorkflowTask().Return(true).AnyTimes() + s.mockMutableState.EXPECT().HasStartedWorkflowTask().Return(true).AnyTimes() s.mockMutableState.EXPECT().IsTransientWorkflowTask().Return(false).AnyTimes() s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ NamespaceId: s.namespaceID, @@ -418,7 +418,7 @@ func (s *branchMgrSuite) TestPrepareVersionHistory_BranchNotAppendable_MissingEv }) s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes() - s.mockMutableState.EXPECT().HasInFlightWorkflowTask().Return(true).AnyTimes() + s.mockMutableState.EXPECT().HasStartedWorkflowTask().Return(true).AnyTimes() s.mockMutableState.EXPECT().IsTransientWorkflowTask().Return(false).AnyTimes() s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ NamespaceId: s.namespaceID, diff --git a/service/history/ndc/transaction_manager.go b/service/history/ndc/transaction_manager.go index c2a86d87ce9..f3bc061b851 100644 --- a/service/history/ndc/transaction_manager.go +++ b/service/history/ndc/transaction_manager.go @@ -34,6 +34,7 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" + "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -319,7 +320,7 @@ func (r *transactionMgrImpl) backfillWorkflowEventsReapply( workflowID := baseMutableState.GetExecutionInfo().WorkflowId baseRunID := baseMutableState.GetExecutionState().GetRunId() resetRunID := uuid.New() - baseRebuildLastEventID := baseMutableState.GetPreviousStartedEventID() + baseRebuildLastEventID := baseMutableState.GetLastWorkflowTaskStartedEventID() baseVersionHistories := baseMutableState.GetExecutionInfo().GetVersionHistories() baseCurrentVersionHistory, err := versionhistory.GetCurrentVersionHistory(baseVersionHistories) if err != nil { diff --git a/service/history/ndc/transaction_manager_test.go b/service/history/ndc/transaction_manager_test.go index 48b26221af3..5771d1164ac 100644 --- a/service/history/ndc/transaction_manager_test.go +++ b/service/history/ndc/transaction_manager_test.go @@ -220,7 +220,7 @@ func (s *transactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Active_Closed RunId: runID, }).AnyTimes() mutableState.EXPECT().GetNextEventID().Return(nextEventID).AnyTimes() - mutableState.EXPECT().GetPreviousStartedEventID().Return(lastWorkflowTaskStartedEventID) + mutableState.EXPECT().GetLastWorkflowTaskStartedEventID().Return(lastWorkflowTaskStartedEventID) s.mockWorkflowResetter.EXPECT().ResetWorkflow( ctx, @@ -298,7 +298,7 @@ func (s *transactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Closed_ResetF RunId: runID, }).AnyTimes() mutableState.EXPECT().GetNextEventID().Return(nextEventID).AnyTimes() - mutableState.EXPECT().GetPreviousStartedEventID().Return(lastWorkflowTaskStartedEventID) + mutableState.EXPECT().GetLastWorkflowTaskStartedEventID().Return(lastWorkflowTaskStartedEventID) s.mockWorkflowResetter.EXPECT().ResetWorkflow( ctx, diff --git a/service/history/ndc/workflow.go b/service/history/ndc/workflow.go index 37399b71e1c..e89496b1fb0 100644 --- a/service/history/ndc/workflow.go +++ b/service/history/ndc/workflow.go @@ -146,7 +146,7 @@ func (r *WorkflowImpl) Revive() error { // workflow is in zombie state, need to set the state correctly accordingly state = enumsspb.WORKFLOW_EXECUTION_STATE_CREATED - if r.mutableState.HasProcessedOrPendingWorkflowTask() { + if r.mutableState.HadOrHasWorkflowTask() { state = enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING } return r.mutableState.UpdateWorkflowStateStatus( @@ -232,8 +232,8 @@ func (r *WorkflowImpl) failWorkflowTask( return err } - workflowTask, ok := r.mutableState.GetInFlightWorkflowTask() - if !ok { + workflowTask := r.mutableState.GetStartedWorkflowTask() + if workflowTask == nil { return nil } diff --git a/service/history/ndc/workflow_resetter.go b/service/history/ndc/workflow_resetter.go index 0a9631490c3..9d323243ed0 100644 --- a/service/history/ndc/workflow_resetter.go +++ b/service/history/ndc/workflow_resetter.go @@ -475,9 +475,9 @@ func (r *workflowResetterImpl) failWorkflowTask( resetReason string, ) error { - workflowTask, ok := resetMutableState.GetPendingWorkflowTask() - if !ok { - // TODO if resetMutableState.HasProcessedOrPendingWorkflowTask() == true + workflowTask := resetMutableState.GetPendingWorkflowTask() + if workflowTask == nil { + // TODO if resetMutableState.HadOrHasWorkflowTask() == true // meaning workflow history has NO workflow task ever // should also allow workflow reset, the only remaining issues are // * what if workflow is a cron workflow, e.g. should add a workflow task directly or still respect the cron job diff --git a/service/history/ndc/workflow_resetter_test.go b/service/history/ndc/workflow_resetter_test.go index e87c455fd8e..f2287d755f9 100644 --- a/service/history/ndc/workflow_resetter_test.go +++ b/service/history/ndc/workflow_resetter_test.go @@ -348,7 +348,7 @@ func (s *workflowResetterSuite) TestFailWorkflowTask_NoWorkflowTask() { resetReason := "some random reset reason" mutableState := workflow.NewMockMutableState(s.controller) - mutableState.EXPECT().GetPendingWorkflowTask().Return(nil, false).AnyTimes() + mutableState.EXPECT().GetPendingWorkflowTask().Return(nil).AnyTimes() err := s.workflowResetter.failWorkflowTask( mutableState, @@ -384,7 +384,7 @@ func (s *workflowResetterSuite) TestFailWorkflowTask_WorkflowTaskScheduled() { RequestID: workflowTaskSchedule.RequestID, TaskQueue: workflowTaskSchedule.TaskQueue, } - mutableState.EXPECT().GetPendingWorkflowTask().Return(workflowTaskSchedule, true).AnyTimes() + mutableState.EXPECT().GetPendingWorkflowTask().Return(workflowTaskSchedule).AnyTimes() mutableState.EXPECT().AddWorkflowTaskStartedEvent( workflowTaskSchedule.ScheduledEventID, workflowTaskSchedule.RequestID, @@ -430,7 +430,7 @@ func (s *workflowResetterSuite) TestFailWorkflowTask_WorkflowTaskStarted() { Kind: enumspb.TASK_QUEUE_KIND_NORMAL, }, } - mutableState.EXPECT().GetPendingWorkflowTask().Return(workflowTask, true).AnyTimes() + mutableState.EXPECT().GetPendingWorkflowTask().Return(workflowTask).AnyTimes() mutableState.EXPECT().AddWorkflowTaskFailedEvent( workflowTask, enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW, @@ -530,7 +530,7 @@ func (s *workflowResetterSuite) TestTerminateWorkflow() { mutableState := workflow.NewMockMutableState(s.controller) mutableState.EXPECT().GetNextEventID().Return(nextEventID).AnyTimes() - mutableState.EXPECT().GetInFlightWorkflowTask().Return(workflowTask, true) + mutableState.EXPECT().GetStartedWorkflowTask().Return(workflowTask) mutableState.EXPECT().AddWorkflowTaskFailedEvent( workflowTask, enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND, diff --git a/service/history/ndc/workflow_test.go b/service/history/ndc/workflow_test.go index 5dc794a526c..5c72bb1dccf 100644 --- a/service/history/ndc/workflow_test.go +++ b/service/history/ndc/workflow_test.go @@ -279,14 +279,14 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Terminate() { s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() s.mockMutableState.EXPECT().UpdateCurrentVersion(lastEventVersion, true).Return(nil).AnyTimes() - inFlightWorkflowTask := &workflow.WorkflowTaskInfo{ + startedWorkflowTask := &workflow.WorkflowTaskInfo{ Version: 1234, ScheduledEventID: 5678, StartedEventID: 9012, } - s.mockMutableState.EXPECT().GetInFlightWorkflowTask().Return(inFlightWorkflowTask, true) + s.mockMutableState.EXPECT().GetStartedWorkflowTask().Return(startedWorkflowTask) s.mockMutableState.EXPECT().AddWorkflowTaskFailedEvent( - inFlightWorkflowTask, + startedWorkflowTask, enumspb.WORKFLOW_TASK_FAILED_CAUSE_FAILOVER_CLOSE_COMMAND, nil, consts.IdentityHistoryService, diff --git a/service/history/timerQueueActiveTaskExecutor.go b/service/history/timerQueueActiveTaskExecutor.go index eee952b713a..2d7c8aaa0b6 100644 --- a/service/history/timerQueueActiveTaskExecutor.go +++ b/service/history/timerQueueActiveTaskExecutor.go @@ -310,8 +310,8 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowTaskTimeoutTask( return nil } - workflowTask, ok := mutableState.GetWorkflowTaskInfo(task.EventID) - if !ok { + workflowTask := mutableState.GetWorkflowTaskByID(task.EventID) + if workflowTask == nil { return nil } err = CheckTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), workflowTask.Version, task.Version, task) @@ -392,7 +392,7 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowBackoffTimerTask( ) } - if mutableState.HasProcessedOrPendingWorkflowTask() { + if mutableState.HadOrHasWorkflowTask() { // already has workflow task return nil } diff --git a/service/history/timerQueueActiveTaskExecutor_test.go b/service/history/timerQueueActiveTaskExecutor_test.go index ba62fa68c6f..62698ff5648 100644 --- a/service/history/timerQueueActiveTaskExecutor_test.go +++ b/service/history/timerQueueActiveTaskExecutor_test.go @@ -901,8 +901,8 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowTaskTimeout_Fire() { _, _, err = s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.NoError(err) - workflowTask, ok := s.getMutableStateFromCache(s.namespaceID, execution.GetWorkflowId(), execution.GetRunId()).GetPendingWorkflowTask() - s.True(ok) + workflowTask := s.getMutableStateFromCache(s.namespaceID, execution.GetWorkflowId(), execution.GetRunId()).GetPendingWorkflowTask() + s.NotNil(workflowTask) s.True(workflowTask.ScheduledEventID != common.EmptyEventID) s.Equal(common.EmptyEventID, workflowTask.StartedEventID) s.Equal(int32(2), workflowTask.Attempt) @@ -999,8 +999,8 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowBackoffTimer_Fire() { _, _, err = s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.NoError(err) - workflowTask, ok := s.getMutableStateFromCache(s.namespaceID, execution.GetWorkflowId(), execution.GetRunId()).GetPendingWorkflowTask() - s.True(ok) + workflowTask := s.getMutableStateFromCache(s.namespaceID, execution.GetWorkflowId(), execution.GetRunId()).GetPendingWorkflowTask() + s.NotNil(workflowTask) s.True(workflowTask.ScheduledEventID != common.EmptyEventID) s.Equal(common.EmptyEventID, workflowTask.StartedEventID) s.Equal(int32(1), workflowTask.Attempt) diff --git a/service/history/timerQueueStandbyTaskExecutor.go b/service/history/timerQueueStandbyTaskExecutor.go index 5efa7502b06..d089fb3aa30 100644 --- a/service/history/timerQueueStandbyTaskExecutor.go +++ b/service/history/timerQueueStandbyTaskExecutor.go @@ -319,8 +319,8 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowTaskTimeoutTask( } actionFn := func(_ context.Context, wfContext workflow.Context, mutableState workflow.MutableState) (interface{}, error) { - workflowTask, isPending := mutableState.GetWorkflowTaskInfo(timerTask.EventID) - if !isPending { + workflowTask := mutableState.GetWorkflowTaskByID(timerTask.EventID) + if workflowTask == nil { return nil, nil } @@ -352,7 +352,7 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowBackoffTimerTask( timerTask *tasks.WorkflowBackoffTimerTask, ) error { actionFn := func(_ context.Context, wfContext workflow.Context, mutableState workflow.MutableState) (interface{}, error) { - if mutableState.HasProcessedOrPendingWorkflowTask() { + if mutableState.HadOrHasWorkflowTask() { // if there is one workflow task already been processed // or has pending workflow task, meaning workflow has already running return nil, nil diff --git a/service/history/transferQueueActiveTaskExecutor.go b/service/history/transferQueueActiveTaskExecutor.go index 907515fee91..488a224cebd 100644 --- a/service/history/transferQueueActiveTaskExecutor.go +++ b/service/history/transferQueueActiveTaskExecutor.go @@ -223,8 +223,8 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask( return nil } - workflowTask, found := mutableState.GetWorkflowTaskInfo(task.ScheduledEventID) - if !found { + workflowTask := mutableState.GetWorkflowTaskByID(task.ScheduledEventID) + if workflowTask == nil { return nil } err = CheckTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), workflowTask.Version, task.Version, task) diff --git a/service/history/transferQueueStandbyTaskExecutor.go b/service/history/transferQueueStandbyTaskExecutor.go index 46c89cdf650..dfd8c97499c 100644 --- a/service/history/transferQueueStandbyTaskExecutor.go +++ b/service/history/transferQueueStandbyTaskExecutor.go @@ -174,8 +174,8 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask( ) error { processTaskIfClosed := false actionFn := func(_ context.Context, wfContext workflow.Context, mutableState workflow.MutableState) (interface{}, error) { - wtInfo, ok := mutableState.GetWorkflowTaskInfo(transferTask.ScheduledEventID) - if !ok { + wtInfo := mutableState.GetWorkflowTaskByID(transferTask.ScheduledEventID) + if wtInfo == nil { return nil, nil } diff --git a/service/history/workflow/mutable_state.go b/service/history/workflow/mutable_state.go index 3c374d11256..b87a3c5e196 100644 --- a/service/history/workflow/mutable_state.go +++ b/service/history/workflow/mutable_state.go @@ -166,7 +166,7 @@ type ( GetChildExecutionInitiatedEvent(context.Context, int64) (*historypb.HistoryEvent, error) GetCompletionEvent(context.Context) (*historypb.HistoryEvent, error) GetWorkflowCloseTime(ctx context.Context) (*time.Time, error) - GetWorkflowTaskInfo(int64) (*WorkflowTaskInfo, bool) + GetWorkflowTaskByID(scheduledEventID int64) *WorkflowTaskInfo GetNamespaceEntry() *namespace.Namespace GetStartEvent(context.Context) (*historypb.HistoryEvent, error) GetSignalExternalInitiatedEvent(context.Context, int64) (*historypb.HistoryEvent, error) @@ -175,12 +175,12 @@ type ( GetCurrentVersion() int64 GetExecutionInfo() *persistencespb.WorkflowExecutionInfo GetExecutionState() *persistencespb.WorkflowExecutionState - GetInFlightWorkflowTask() (*WorkflowTaskInfo, bool) - GetPendingWorkflowTask() (*WorkflowTaskInfo, bool) + GetStartedWorkflowTask() *WorkflowTaskInfo + GetPendingWorkflowTask() *WorkflowTaskInfo GetLastFirstEventIDTxnID() (int64, int64) GetLastWriteVersion() (int64, error) GetNextEventID() int64 - GetPreviousStartedEventID() int64 + GetLastWorkflowTaskStartedEventID() int64 GetPendingActivityInfos() map[int64]*persistencespb.ActivityInfo GetPendingTimerInfos() map[string]*persistencespb.TimerInfo GetPendingChildExecutionInfos() map[int64]*persistencespb.ChildExecutionInfo @@ -199,10 +199,10 @@ type ( IsTransientWorkflowTask() bool ClearTransientWorkflowTask() error HasBufferedEvents() bool - HasInFlightWorkflowTask() bool + HasStartedWorkflowTask() bool HasParentExecution() bool HasPendingWorkflowTask() bool - HasProcessedOrPendingWorkflowTask() bool + HadOrHasWorkflowTask() bool IsCancelRequested() bool IsCurrentWorkflowGuaranteed() bool IsSignalRequested(requestID string) bool diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 524abe5e9fc..4b2d88b0793 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -480,7 +480,7 @@ func (ms *MutableStateImpl) GetExecutionState() *persistencespb.WorkflowExecutio } func (ms *MutableStateImpl) FlushBufferedEvents() { - if ms.HasInFlightWorkflowTask() { + if ms.HasStartedWorkflowTask() { return } ms.updatePendingEventIDs(ms.hBuilder.FlushBufferToCurrentBatch()) @@ -1249,11 +1249,9 @@ func (ms *MutableStateImpl) DeleteUserTimer( return nil } -// GetWorkflowTaskInfo returns details about the in-progress workflow task -func (ms *MutableStateImpl) GetWorkflowTaskInfo( - scheduledEventID int64, -) (*WorkflowTaskInfo, bool) { - return ms.workflowTaskManager.GetWorkflowTaskInfo(scheduledEventID) +// GetWorkflowTaskByID returns details about the current workflow task by scheduled event ID. +func (ms *MutableStateImpl) GetWorkflowTaskByID(scheduledEventID int64) *WorkflowTaskInfo { + return ms.workflowTaskManager.GetWorkflowTaskByID(scheduledEventID) } func (ms *MutableStateImpl) GetPendingActivityInfos() map[int64]*persistencespb.ActivityInfo { @@ -1276,24 +1274,24 @@ func (ms *MutableStateImpl) GetPendingSignalExternalInfos() map[int64]*persisten return ms.pendingSignalInfoIDs } -func (ms *MutableStateImpl) HasProcessedOrPendingWorkflowTask() bool { - return ms.workflowTaskManager.HasProcessedOrPendingWorkflowTask() +func (ms *MutableStateImpl) HadOrHasWorkflowTask() bool { + return ms.workflowTaskManager.HadOrHasWorkflowTask() } func (ms *MutableStateImpl) HasPendingWorkflowTask() bool { return ms.workflowTaskManager.HasPendingWorkflowTask() } -func (ms *MutableStateImpl) GetPendingWorkflowTask() (*WorkflowTaskInfo, bool) { +func (ms *MutableStateImpl) GetPendingWorkflowTask() *WorkflowTaskInfo { return ms.workflowTaskManager.GetPendingWorkflowTask() } -func (ms *MutableStateImpl) HasInFlightWorkflowTask() bool { - return ms.workflowTaskManager.HasInFlightWorkflowTask() +func (ms *MutableStateImpl) HasStartedWorkflowTask() bool { + return ms.workflowTaskManager.HasStartedWorkflowTask() } -func (ms *MutableStateImpl) GetInFlightWorkflowTask() (*WorkflowTaskInfo, bool) { - return ms.workflowTaskManager.GetInFlightWorkflowTask() +func (ms *MutableStateImpl) GetStartedWorkflowTask() *WorkflowTaskInfo { + return ms.workflowTaskManager.GetStartedWorkflowTask() } func (ms *MutableStateImpl) IsTransientWorkflowTask() bool { @@ -1301,7 +1299,7 @@ func (ms *MutableStateImpl) IsTransientWorkflowTask() bool { } func (ms *MutableStateImpl) ClearTransientWorkflowTask() error { - if !ms.HasInFlightWorkflowTask() { + if !ms.HasStartedWorkflowTask() { return serviceerror.NewInternal("cannot clear transient workflow task when task is missing") } if !ms.IsTransientWorkflowTask() { @@ -1358,8 +1356,8 @@ func (ms *MutableStateImpl) GetNextEventID() int64 { return ms.hBuilder.NextEventID() } -// GetPreviousStartedEventID returns last started workflow task event ID -func (ms *MutableStateImpl) GetPreviousStartedEventID() int64 { +// GetLastWorkflowTaskStartedEventID returns last started workflow task event ID +func (ms *MutableStateImpl) GetLastWorkflowTaskStartedEventID() int64 { return ms.executionInfo.LastWorkflowTaskStartedEventId } @@ -1397,7 +1395,7 @@ func (ms *MutableStateImpl) IsSignalRequested( func (ms *MutableStateImpl) IsWorkflowPendingOnWorkflowTaskBackoff() bool { workflowTaskBackoff := timestamp.TimeValue(ms.executionInfo.GetExecutionTime()).After(timestamp.TimeValue(ms.executionInfo.GetStartTime())) - if workflowTaskBackoff && !ms.HasProcessedOrPendingWorkflowTask() { + if workflowTaskBackoff && !ms.HadOrHasWorkflowTask() { return true } return false @@ -4165,7 +4163,7 @@ func (ms *MutableStateImpl) prepareEventsAndReplicationTasks( return nil, nil, false, err } - historyMutation, err := ms.hBuilder.Finish(!ms.HasInFlightWorkflowTask()) + historyMutation, err := ms.hBuilder.Finish(!ms.HasStartedWorkflowTask()) if err != nil { return nil, nil, false, err } @@ -4368,8 +4366,8 @@ func (ms *MutableStateImpl) startTransactionHandleNamespaceMigration( return nil, err } - // local namespace -> global namespace && with inflight workflow task - if lastWriteVersion == common.EmptyVersion && namespaceEntry.FailoverVersion() > common.EmptyVersion && ms.HasInFlightWorkflowTask() { + // local namespace -> global namespace && with started workflow task + if lastWriteVersion == common.EmptyVersion && namespaceEntry.FailoverVersion() > common.EmptyVersion && ms.HasStartedWorkflowTask() { localNamespaceMutation := namespace.NewPretendAsLocalNamespace( ms.clusterMetadata.GetCurrentClusterName(), ) @@ -4389,8 +4387,8 @@ func (ms *MutableStateImpl) startTransactionHandleWorkflowTaskFailover() (bool, // all events ending in the buffer should have the same version // Handling mutable state turn from standby to active, while having a workflow task on the fly - workflowTask, ok := ms.GetInFlightWorkflowTask() - if !ok || workflowTask.Version >= ms.GetCurrentVersion() { + workflowTask := ms.GetStartedWorkflowTask() + if workflowTask == nil || workflowTask.Version >= ms.GetCurrentVersion() { // no pending workflow tasks, no buffered events // or workflow task has higher / equal version return false, nil @@ -4498,7 +4496,7 @@ func (ms *MutableStateImpl) closeTransactionHandleBufferedEventsLimit( } // Handling buffered events size issue - if workflowTask, ok := ms.GetInFlightWorkflowTask(); ok { + if workflowTask := ms.GetStartedWorkflowTask(); workflowTask != nil { // we have a workflow task on the fly with a lower version, fail it if err := failWorkflowTask( ms, diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 411eeb0bd3f..2df9128b713 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -161,7 +161,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplica newWorkflowTaskScheduleEvent, _ := s.prepareTransientWorkflowTaskCompletionFirstBatchReplicated(version, runID) - newWorkflowTask, _ := s.mutableState.GetWorkflowTaskInfo(newWorkflowTaskScheduleEvent.GetEventId()) + newWorkflowTask := s.mutableState.GetWorkflowTaskByID(newWorkflowTaskScheduleEvent.GetEventId()) s.NotNil(newWorkflowTask) _, err := s.mutableState.AddWorkflowTaskTimedOutEvent( @@ -184,7 +184,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplica newWorkflowTaskScheduleEvent, _ := s.prepareTransientWorkflowTaskCompletionFirstBatchReplicated(version, runID) - newWorkflowTask, _ := s.mutableState.GetWorkflowTaskInfo(newWorkflowTaskScheduleEvent.GetEventId()) + newWorkflowTask := s.mutableState.GetWorkflowTaskByID(newWorkflowTaskScheduleEvent.GetEventId()) s.NotNil(newWorkflowTask) _, err := s.mutableState.AddWorkflowTaskFailedEvent( diff --git a/service/history/workflow/mutable_state_mock.go b/service/history/workflow/mutable_state_mock.go index 465a5a07c36..4eaee7ce4e6 100644 --- a/service/history/workflow/mutable_state_mock.go +++ b/service/history/workflow/mutable_state_mock.go @@ -1161,21 +1161,6 @@ func (mr *MockMutableStateMockRecorder) GetFirstRunID() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFirstRunID", reflect.TypeOf((*MockMutableState)(nil).GetFirstRunID)) } -// GetInFlightWorkflowTask mocks base method. -func (m *MockMutableState) GetInFlightWorkflowTask() (*WorkflowTaskInfo, bool) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetInFlightWorkflowTask") - ret0, _ := ret[0].(*WorkflowTaskInfo) - ret1, _ := ret[1].(bool) - return ret0, ret1 -} - -// GetInFlightWorkflowTask indicates an expected call of GetInFlightWorkflowTask. -func (mr *MockMutableStateMockRecorder) GetInFlightWorkflowTask() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetInFlightWorkflowTask", reflect.TypeOf((*MockMutableState)(nil).GetInFlightWorkflowTask)) -} - // GetLastFirstEventIDTxnID mocks base method. func (m *MockMutableState) GetLastFirstEventIDTxnID() (int64, int64) { m.ctrl.T.Helper() @@ -1191,6 +1176,20 @@ func (mr *MockMutableStateMockRecorder) GetLastFirstEventIDTxnID() *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLastFirstEventIDTxnID", reflect.TypeOf((*MockMutableState)(nil).GetLastFirstEventIDTxnID)) } +// GetLastWorkflowTaskStartedEventID mocks base method. +func (m *MockMutableState) GetLastWorkflowTaskStartedEventID() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLastWorkflowTaskStartedEventID") + ret0, _ := ret[0].(int64) + return ret0 +} + +// GetLastWorkflowTaskStartedEventID indicates an expected call of GetLastWorkflowTaskStartedEventID. +func (mr *MockMutableStateMockRecorder) GetLastWorkflowTaskStartedEventID() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLastWorkflowTaskStartedEventID", reflect.TypeOf((*MockMutableState)(nil).GetLastWorkflowTaskStartedEventID)) +} + // GetLastWriteVersion mocks base method. func (m *MockMutableState) GetLastWriteVersion() (int64, error) { m.ctrl.T.Helper() @@ -1305,12 +1304,11 @@ func (mr *MockMutableStateMockRecorder) GetPendingTimerInfos() *gomock.Call { } // GetPendingWorkflowTask mocks base method. -func (m *MockMutableState) GetPendingWorkflowTask() (*WorkflowTaskInfo, bool) { +func (m *MockMutableState) GetPendingWorkflowTask() *WorkflowTaskInfo { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetPendingWorkflowTask") ret0, _ := ret[0].(*WorkflowTaskInfo) - ret1, _ := ret[1].(bool) - return ret0, ret1 + return ret0 } // GetPendingWorkflowTask indicates an expected call of GetPendingWorkflowTask. @@ -1319,20 +1317,6 @@ func (mr *MockMutableStateMockRecorder) GetPendingWorkflowTask() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPendingWorkflowTask", reflect.TypeOf((*MockMutableState)(nil).GetPendingWorkflowTask)) } -// GetPreviousStartedEventID mocks base method. -func (m *MockMutableState) GetPreviousStartedEventID() int64 { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetPreviousStartedEventID") - ret0, _ := ret[0].(int64) - return ret0 -} - -// GetPreviousStartedEventID indicates an expected call of GetPreviousStartedEventID. -func (mr *MockMutableStateMockRecorder) GetPreviousStartedEventID() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPreviousStartedEventID", reflect.TypeOf((*MockMutableState)(nil).GetPreviousStartedEventID)) -} - // GetQueryRegistry mocks base method. func (m *MockMutableState) GetQueryRegistry() QueryRegistry { m.ctrl.T.Helper() @@ -1452,6 +1436,20 @@ func (mr *MockMutableStateMockRecorder) GetStartVersion() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStartVersion", reflect.TypeOf((*MockMutableState)(nil).GetStartVersion)) } +// GetStartedWorkflowTask mocks base method. +func (m *MockMutableState) GetStartedWorkflowTask() *WorkflowTaskInfo { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetStartedWorkflowTask") + ret0, _ := ret[0].(*WorkflowTaskInfo) + return ret0 +} + +// GetStartedWorkflowTask indicates an expected call of GetStartedWorkflowTask. +func (mr *MockMutableStateMockRecorder) GetStartedWorkflowTask() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStartedWorkflowTask", reflect.TypeOf((*MockMutableState)(nil).GetStartedWorkflowTask)) +} + // GetTransientWorkflowTaskInfo mocks base method. func (m *MockMutableState) GetTransientWorkflowTaskInfo(workflowTask *WorkflowTaskInfo, identity string) *v110.TransientWorkflowTaskInfo { m.ctrl.T.Helper() @@ -1555,19 +1553,18 @@ func (mr *MockMutableStateMockRecorder) GetWorkflowStateStatus() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetWorkflowStateStatus", reflect.TypeOf((*MockMutableState)(nil).GetWorkflowStateStatus)) } -// GetWorkflowTaskInfo mocks base method. -func (m *MockMutableState) GetWorkflowTaskInfo(arg0 int64) (*WorkflowTaskInfo, bool) { +// GetWorkflowTaskByID mocks base method. +func (m *MockMutableState) GetWorkflowTaskByID(scheduledEventID int64) *WorkflowTaskInfo { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetWorkflowTaskInfo", arg0) + ret := m.ctrl.Call(m, "GetWorkflowTaskByID", scheduledEventID) ret0, _ := ret[0].(*WorkflowTaskInfo) - ret1, _ := ret[1].(bool) - return ret0, ret1 + return ret0 } -// GetWorkflowTaskInfo indicates an expected call of GetWorkflowTaskInfo. -func (mr *MockMutableStateMockRecorder) GetWorkflowTaskInfo(arg0 interface{}) *gomock.Call { +// GetWorkflowTaskByID indicates an expected call of GetWorkflowTaskByID. +func (mr *MockMutableStateMockRecorder) GetWorkflowTaskByID(scheduledEventID interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetWorkflowTaskInfo", reflect.TypeOf((*MockMutableState)(nil).GetWorkflowTaskInfo), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetWorkflowTaskByID", reflect.TypeOf((*MockMutableState)(nil).GetWorkflowTaskByID), scheduledEventID) } // GetWorkflowType mocks base method. @@ -1584,32 +1581,32 @@ func (mr *MockMutableStateMockRecorder) GetWorkflowType() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetWorkflowType", reflect.TypeOf((*MockMutableState)(nil).GetWorkflowType)) } -// HasBufferedEvents mocks base method. -func (m *MockMutableState) HasBufferedEvents() bool { +// HadOrHasWorkflowTask mocks base method. +func (m *MockMutableState) HadOrHasWorkflowTask() bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "HasBufferedEvents") + ret := m.ctrl.Call(m, "HadOrHasWorkflowTask") ret0, _ := ret[0].(bool) return ret0 } -// HasBufferedEvents indicates an expected call of HasBufferedEvents. -func (mr *MockMutableStateMockRecorder) HasBufferedEvents() *gomock.Call { +// HadOrHasWorkflowTask indicates an expected call of HadOrHasWorkflowTask. +func (mr *MockMutableStateMockRecorder) HadOrHasWorkflowTask() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasBufferedEvents", reflect.TypeOf((*MockMutableState)(nil).HasBufferedEvents)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HadOrHasWorkflowTask", reflect.TypeOf((*MockMutableState)(nil).HadOrHasWorkflowTask)) } -// HasInFlightWorkflowTask mocks base method. -func (m *MockMutableState) HasInFlightWorkflowTask() bool { +// HasBufferedEvents mocks base method. +func (m *MockMutableState) HasBufferedEvents() bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "HasInFlightWorkflowTask") + ret := m.ctrl.Call(m, "HasBufferedEvents") ret0, _ := ret[0].(bool) return ret0 } -// HasInFlightWorkflowTask indicates an expected call of HasInFlightWorkflowTask. -func (mr *MockMutableStateMockRecorder) HasInFlightWorkflowTask() *gomock.Call { +// HasBufferedEvents indicates an expected call of HasBufferedEvents. +func (mr *MockMutableStateMockRecorder) HasBufferedEvents() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasInFlightWorkflowTask", reflect.TypeOf((*MockMutableState)(nil).HasInFlightWorkflowTask)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasBufferedEvents", reflect.TypeOf((*MockMutableState)(nil).HasBufferedEvents)) } // HasParentExecution mocks base method. @@ -1640,18 +1637,18 @@ func (mr *MockMutableStateMockRecorder) HasPendingWorkflowTask() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasPendingWorkflowTask", reflect.TypeOf((*MockMutableState)(nil).HasPendingWorkflowTask)) } -// HasProcessedOrPendingWorkflowTask mocks base method. -func (m *MockMutableState) HasProcessedOrPendingWorkflowTask() bool { +// HasStartedWorkflowTask mocks base method. +func (m *MockMutableState) HasStartedWorkflowTask() bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "HasProcessedOrPendingWorkflowTask") + ret := m.ctrl.Call(m, "HasStartedWorkflowTask") ret0, _ := ret[0].(bool) return ret0 } -// HasProcessedOrPendingWorkflowTask indicates an expected call of HasProcessedOrPendingWorkflowTask. -func (mr *MockMutableStateMockRecorder) HasProcessedOrPendingWorkflowTask() *gomock.Call { +// HasStartedWorkflowTask indicates an expected call of HasStartedWorkflowTask. +func (mr *MockMutableStateMockRecorder) HasStartedWorkflowTask() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasProcessedOrPendingWorkflowTask", reflect.TypeOf((*MockMutableState)(nil).HasProcessedOrPendingWorkflowTask)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasStartedWorkflowTask", reflect.TypeOf((*MockMutableState)(nil).HasStartedWorkflowTask)) } // IsCancelRequested mocks base method. diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index 5f23601c2b9..86b59b40def 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -334,10 +334,10 @@ func (r *TaskGeneratorImpl) GenerateScheduleWorkflowTaskTasks( workflowTaskScheduledEventID int64, ) error { - workflowTask, ok := r.mutableState.GetWorkflowTaskInfo( + workflowTask := r.mutableState.GetWorkflowTaskByID( workflowTaskScheduledEventID, ) - if !ok { + if workflowTask == nil { return serviceerror.NewInternal(fmt.Sprintf("it could be a bug, cannot get pending workflow task: %v", workflowTaskScheduledEventID)) } @@ -371,11 +371,11 @@ func (r *TaskGeneratorImpl) GenerateStartWorkflowTaskTasks( workflowTaskScheduledEventID int64, ) error { - workflowTask, ok := r.mutableState.GetWorkflowTaskInfo( + workflowTask := r.mutableState.GetWorkflowTaskByID( workflowTaskScheduledEventID, ) - if !ok { - return serviceerror.NewInternal(fmt.Sprintf("it could be a bug, cannot get pending workflowTaskInfo: %v", workflowTaskScheduledEventID)) + if workflowTask == nil { + return serviceerror.NewInternal(fmt.Sprintf("it could be a bug, cannot get pending workflow task: %v", workflowTaskScheduledEventID)) } startedTime := timestamp.TimeValue(workflowTask.StartedTime) diff --git a/service/history/workflow/task_refresher.go b/service/history/workflow/task_refresher.go index 0d4ea773b0b..a53ca915add 100644 --- a/service/history/workflow/task_refresher.go +++ b/service/history/workflow/task_refresher.go @@ -182,7 +182,7 @@ func (r *TaskRefresherImpl) refreshTasksForWorkflowStart( } startAttr := startEvent.GetWorkflowExecutionStartedEventAttributes() - if !mutableState.HasProcessedOrPendingWorkflowTask() && timestamp.DurationValue(startAttr.GetFirstWorkflowTaskBackoff()) > 0 { + if !mutableState.HadOrHasWorkflowTask() && timestamp.DurationValue(startAttr.GetFirstWorkflowTaskBackoff()) > 0 { if err := taskGenerator.GenerateDelayedWorkflowTasks( startEvent, ); err != nil { @@ -248,8 +248,8 @@ func (r *TaskRefresherImpl) refreshWorkflowTaskTasks( return nil } - workflowTask, ok := mutableState.GetPendingWorkflowTask() - if !ok { + workflowTask := mutableState.GetPendingWorkflowTask() + if workflowTask == nil { return serviceerror.NewInternal("it could be a bug, cannot get pending workflow task") } diff --git a/service/history/workflow/util.go b/service/history/workflow/util.go index a15e3e4bce5..0ff5dc2403a 100644 --- a/service/history/workflow/util.go +++ b/service/history/workflow/util.go @@ -87,7 +87,7 @@ func RetryWorkflow( continueAsNewAttributes *commandpb.ContinueAsNewWorkflowExecutionCommandAttributes, ) (MutableState, error) { - if workflowTask, ok := mutableState.GetInFlightWorkflowTask(); ok { + if workflowTask := mutableState.GetStartedWorkflowTask(); workflowTask != nil { if err := failWorkflowTask( mutableState, workflowTask, @@ -117,7 +117,7 @@ func TimeoutWorkflow( continuedRunID string, ) error { - if workflowTask, ok := mutableState.GetInFlightWorkflowTask(); ok { + if workflowTask := mutableState.GetStartedWorkflowTask(); workflowTask != nil { if err := failWorkflowTask( mutableState, workflowTask, @@ -144,7 +144,7 @@ func TerminateWorkflow( deleteAfterTerminate bool, ) error { - if workflowTask, ok := mutableState.GetInFlightWorkflowTask(); ok { + if workflowTask := mutableState.GetStartedWorkflowTask(); workflowTask != nil { if err := failWorkflowTask( mutableState, workflowTask, diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index f00341ccd4c..134d89824b5 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -170,9 +170,8 @@ func (m *workflowTaskStateMachine) ReplicateWorkflowTaskStartedEvent( // When this function is called from ApplyEvents, workflowTask is nil. // It is safe to look up the workflow task as it does not have to deal with transient workflow task case. if workflowTask == nil { - var ok bool - workflowTask, ok = m.GetWorkflowTaskInfo(scheduledEventID) - if !ok { + workflowTask = m.GetWorkflowTaskByID(scheduledEventID) + if workflowTask == nil { return nil, serviceerror.NewInternal(fmt.Sprintf("unable to find workflow task: %v", scheduledEventID)) } // Transient workflow task events are not replicated but attempt count in mutable state @@ -409,8 +408,8 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( identity string, ) (*historypb.HistoryEvent, *WorkflowTaskInfo, error) { opTag := tag.WorkflowActionWorkflowTaskStarted - workflowTask, ok := m.GetWorkflowTaskInfo(scheduledEventID) - if !ok || workflowTask.StartedEventID != common.EmptyEventID { + workflowTask := m.GetWorkflowTaskByID(scheduledEventID) + if workflowTask == nil || workflowTask.StartedEventID != common.EmptyEventID { m.ms.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(m.ms.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, @@ -719,48 +718,46 @@ func (m *workflowTaskStateMachine) HasPendingWorkflowTask() bool { return m.ms.executionInfo.WorkflowTaskScheduledEventId != common.EmptyEventID } -func (m *workflowTaskStateMachine) GetPendingWorkflowTask() (*WorkflowTaskInfo, bool) { - if m.ms.executionInfo.WorkflowTaskScheduledEventId == common.EmptyEventID { - return nil, false +func (m *workflowTaskStateMachine) GetPendingWorkflowTask() *WorkflowTaskInfo { + if !m.HasPendingWorkflowTask() { + return nil } workflowTask := m.getWorkflowTaskInfo() - return workflowTask, true + return workflowTask } -func (m *workflowTaskStateMachine) HasInFlightWorkflowTask() bool { - return m.ms.executionInfo.WorkflowTaskStartedEventId != common.EmptyEventID +func (m *workflowTaskStateMachine) HasStartedWorkflowTask() bool { + return m.ms.executionInfo.WorkflowTaskScheduledEventId != common.EmptyEventID && + m.ms.executionInfo.WorkflowTaskStartedEventId != common.EmptyEventID } -func (m *workflowTaskStateMachine) GetInFlightWorkflowTask() (*WorkflowTaskInfo, bool) { - if m.ms.executionInfo.WorkflowTaskScheduledEventId == common.EmptyEventID || - m.ms.executionInfo.WorkflowTaskStartedEventId == common.EmptyEventID { - return nil, false +func (m *workflowTaskStateMachine) GetStartedWorkflowTask() *WorkflowTaskInfo { + if !m.HasStartedWorkflowTask() { + return nil } workflowTask := m.getWorkflowTaskInfo() - return workflowTask, true + return workflowTask } -func (m *workflowTaskStateMachine) HasProcessedOrPendingWorkflowTask() bool { - return m.HasPendingWorkflowTask() || m.ms.GetPreviousStartedEventID() != common.EmptyEventID +func (m *workflowTaskStateMachine) HadOrHasWorkflowTask() bool { + return m.HasPendingWorkflowTask() || m.ms.GetLastWorkflowTaskStartedEventID() != common.EmptyEventID } -// GetWorkflowTaskInfo returns details about the in-progress workflow task -func (m *workflowTaskStateMachine) GetWorkflowTaskInfo( - scheduledEventID int64, -) (*WorkflowTaskInfo, bool) { +// GetWorkflowTaskByID returns details about the current workflow task by scheduled event ID. +func (m *workflowTaskStateMachine) GetWorkflowTaskByID(scheduledEventID int64) *WorkflowTaskInfo { workflowTask := m.getWorkflowTaskInfo() if scheduledEventID == workflowTask.ScheduledEventID { - return workflowTask, true + return workflowTask } workflowTask = m.tryRestoreSpeculativeWorkflowTask(scheduledEventID) if workflowTask != nil { - return workflowTask, true + return workflowTask } - return nil, false + return nil } func (m *workflowTaskStateMachine) tryRestoreSpeculativeWorkflowTask( diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index 84aa2ea7980..3fef41d2c93 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -137,7 +137,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskScheduled( return nil, consts.ErrWorkflowCompleted } - if req.IsFirstWorkflowTask && mutableState.HasProcessedOrPendingWorkflowTask() { + if req.IsFirstWorkflowTask && mutableState.HadOrHasWorkflowTask() { return &api.UpdateWorkflowAction{ Noop: true, }, nil @@ -188,12 +188,12 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskStarted( return nil, consts.ErrWorkflowCompleted } - workflowTask, isRunning := mutableState.GetWorkflowTaskInfo(scheduledEventID) + workflowTask := mutableState.GetWorkflowTaskByID(scheduledEventID) metricsScope := handler.metricsHandler.WithTags(metrics.OperationTag(metrics.HistoryRecordWorkflowTaskStartedScope)) // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in // some extreme cassandra failure cases. - if !isRunning && scheduledEventID >= mutableState.GetNextEventID() { + if workflowTask == nil && scheduledEventID >= mutableState.GetNextEventID() { metricsScope.Counter(metrics.StaleMutableStateCounter.GetMetricName()).Record(1) // Reload workflow execution history // ErrStaleState will trigger updateWorkflow function to reload the mutable state @@ -202,7 +202,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskStarted( // Check execution state to make sure task is in the list of outstanding tasks and it is not yet started. If // task is not outstanding than it is most probably a duplicate and complete the task. - if !isRunning { + if workflowTask == nil { // Looks like WorkflowTask already completed as a result of another call. // It is OK to drop the task at this point. return nil, serviceerror.NewNotFound("Workflow task not found.") @@ -299,10 +299,10 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskFailed( } scheduledEventID := token.GetScheduledEventId() - workflowTask, isRunning := mutableState.GetWorkflowTaskInfo(scheduledEventID) + workflowTask := mutableState.GetWorkflowTaskByID(scheduledEventID) // TODO (alex-update): call mutableState.SetSpeculativeWorkflowTaskStartedEventID(mutableState) here to set StartEventID. - if !isRunning || workflowTask.Attempt != token.Attempt || workflowTask.StartedEventID == common.EmptyEventID { + if workflowTask == nil || workflowTask.Attempt != token.Attempt || workflowTask.StartedEventID == common.EmptyEventID { return nil, serviceerror.NewNotFound("Workflow task not found.") } @@ -351,8 +351,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( ctx, token.Clock, func(mutableState workflow.MutableState) bool { - _, ok := mutableState.GetWorkflowTaskInfo(token.GetScheduledEventId()) - if !ok && token.GetScheduledEventId() >= mutableState.GetNextEventID() { + workflowTask := mutableState.GetWorkflowTaskByID(token.GetScheduledEventId()) + if workflowTask == nil && token.GetScheduledEventId() >= mutableState.GetNextEventID() { handler.metricsHandler.Counter(metrics.StaleMutableStateCounter.GetMetricName()).Record( 1, metrics.OperationTag(metrics.HistoryRespondWorkflowTaskCompletedScope)) @@ -373,8 +373,6 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( weContext := workflowContext.GetContext() ms := workflowContext.GetMutableState() - currentWorkflowTask, currentWorkflowTaskRunning := ms.GetWorkflowTaskInfo(token.GetScheduledEventId()) - // TODO (alex-update): call mutableState.SetSpeculativeWorkflowTaskStartedEventID(mutableState) here to set StartEventID. executionInfo := ms.GetExecutionInfo() executionStats, err := weContext.LoadExecutionStats(ctx) @@ -382,7 +380,9 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( return nil, err } - if !ms.IsWorkflowExecutionRunning() || !currentWorkflowTaskRunning || currentWorkflowTask.Attempt != token.Attempt || + currentWorkflowTask := ms.GetWorkflowTaskByID(token.GetScheduledEventId()) + // TODO (alex-update): call mutableState.SetSpeculativeWorkflowTaskStartedEventID(mutableState) here to set StartEventID. + if !ms.IsWorkflowExecutionRunning() || currentWorkflowTask == nil || currentWorkflowTask.Attempt != token.Attempt || currentWorkflowTask.StartedEventID == common.EmptyEventID { return nil, serviceerror.NewNotFound("Workflow task not found.") } @@ -690,7 +690,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( resp := &historyservice.RespondWorkflowTaskCompletedResponse{} if request.GetReturnNewWorkflowTask() && createNewWorkflowTask { - workflowTask, _ := ms.GetWorkflowTaskInfo(newWorkflowTaskScheduledEventID) + workflowTask := ms.GetWorkflowTaskByID(newWorkflowTaskScheduledEventID) resp.StartedResponse, err = handler.createRecordWorkflowTaskStartedResponse(ms, weContext.UpdateRegistry(), workflowTask, request.GetIdentity()) if err != nil { return nil, err @@ -746,7 +746,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) verifyFirstWorkflowTaskSchedule return nil } - if !mutableState.HasProcessedOrPendingWorkflowTask() { + if !mutableState.HadOrHasWorkflowTask() { return consts.ErrWorkflowNotReady } diff --git a/tests/max_buffered_event_test.go b/tests/max_buffered_event_test.go index 8f7d57c86ed..363ac80c5b3 100644 --- a/tests/max_buffered_event_test.go +++ b/tests/max_buffered_event_test.go @@ -100,7 +100,7 @@ func (s *clientIntegrationSuite) TestMaxBufferedEventsLimit() { s.NoError(err) } - // send 101 signal, this will fail the inflight workflow task + // send 101 signal, this will fail the started workflow task err := s.sdkClient.SignalWorkflow(testCtx, wid, "", "test-signal", 100) s.NoError(err)