Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename inflight WT to started WT #4043

Merged
merged 1 commit into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion service/history/api/describeworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func Invoke(
}
}

if pendingWorkflowTask, ok := mutableState.GetPendingWorkflowTask(); ok {
if pendingWorkflowTask := mutableState.GetPendingWorkflowTask(); pendingWorkflowTask != nil {
result.PendingWorkflowTask = &workflowpb.PendingWorkflowTaskInfo{
State: enumspb.PENDING_WORKFLOW_TASK_STATE_SCHEDULED,
ScheduledTime: pendingWorkflowTask.ScheduledTime,
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/get_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func MutableStateToGetResponse(
LastFirstEventId: lastFirstEventID,
LastFirstEventTxnId: lastFirstEventTxnID,
NextEventId: mutableState.GetNextEventID(),
PreviousStartedEventId: mutableState.GetPreviousStartedEventID(),
PreviousStartedEventId: mutableState.GetLastWorkflowTaskStartedEventID(),
TaskQueue: &taskqueuepb.TaskQueue{
Name: executionInfo.TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
Expand Down
6 changes: 3 additions & 3 deletions service/history/api/queryworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func Invoke(
}

mutableState := weCtx.GetMutableState()
if !mutableState.HasProcessedOrPendingWorkflowTask() {
if !mutableState.HadOrHasWorkflowTask() {
// workflow has no workflow task ever scheduled, this usually is due to firstWorkflowTaskBackoff (cron / retry)
// in this case, don't buffer the query, because it is almost certain the query will time out.
return nil, consts.ErrWorkflowTaskNotScheduled
Expand All @@ -122,14 +122,14 @@ func Invoke(
// is used to determine if a query can be safely dispatched directly through matching or must be dispatched on a workflow task.
//
// Precondition to dispatch query directly to matching is workflow has at least one WorkflowTaskStarted event. Otherwise, sdk would panic.
if mutableState.GetPreviousStartedEventID() != common.EmptyEventID {
if mutableState.GetLastWorkflowTaskStartedEventID() != common.EmptyEventID {
// There are three cases in which a query can be dispatched directly through matching safely, without violating strong consistency level:
// 1. the namespace is not active, in this case history is immutable so a query dispatched at any time is consistent
// 2. the workflow is not running, whenever a workflow is not running dispatching query directly is consistent
// 3. if there is no pending or started workflow tasks it means no events came before query arrived, so its safe to dispatch directly
safeToDispatchDirectly := !nsEntry.ActiveInCluster(shard.GetClusterMetadata().GetCurrentClusterName()) ||
!mutableState.IsWorkflowExecutionRunning() ||
(!mutableState.HasPendingWorkflowTask() && !mutableState.HasInFlightWorkflowTask())
(!mutableState.HasPendingWorkflowTask() && !mutableState.HasStartedWorkflowTask())
if safeToDispatchDirectly {
msResp, err := api.MutableStateToGetResponse(mutableState)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/reapplyevents/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func Invoke(
// to accept events to be reapplied
baseRunID := mutableState.GetExecutionState().GetRunId()
resetRunID := uuid.New()
baseRebuildLastEventID := mutableState.GetPreviousStartedEventID()
baseRebuildLastEventID := mutableState.GetLastWorkflowTaskStartedEventID()

// TODO when https://github.com/uber/cadence/issues/2420 is finished, remove this block,
// since cannot reapply event to a finished workflow which had no workflow tasks started
Expand Down
39 changes: 19 additions & 20 deletions service/history/api/startworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"

tokenspb "go.temporal.io/server/api/token/v1"

"go.temporal.io/server/api/historyservice/v1"
Expand Down Expand Up @@ -78,10 +79,9 @@ type creationParams struct {
// mutableStateInfo is a container for the relevant mutable state information to generate a start response with an eager
// workflow task.
type mutableStateInfo struct {
branchToken []byte
lastEventID int64
workflowTaskInfo *workflow.WorkflowTaskInfo
hasInflight bool
branchToken []byte
lastEventID int64
workflowTask *workflow.WorkflowTaskInfo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably startedWorkflowTask

}

// NewStarter creates a new starter, fails if getting the active namespace fails.
Expand Down Expand Up @@ -196,9 +196,9 @@ func (s *Starter) createNewMutableState(ctx context.Context, workflowID string,

now := s.shardCtx.GetTimeSource().Now()
mutableState := workflowContext.GetMutableState()
workflowTaskInfo, hasInflight := mutableState.GetInFlightWorkflowTask()
if s.requestEagerStart() && !hasInflight {
return nil, serviceerror.NewInternal("unexpected error: mutable state did not have an inflight workflow task")
workflowTaskInfo := mutableState.GetStartedWorkflowTask()
if s.requestEagerStart() && workflowTaskInfo == nil {
return nil, serviceerror.NewInternal("unexpected error: mutable state did not have a started workflow task")
}
workflowSnapshot, eventBatches, err := mutableState.CloseTransactionAsSnapshot(
now,
Expand Down Expand Up @@ -367,7 +367,7 @@ func (s *Starter) applyWorkflowIDReusePolicy(
if err != nil {
return nil, err
}
return s.generateResponse(creationParams.runID, mutableStateInfo.workflowTaskInfo, events)
return s.generateResponse(creationParams.runID, mutableStateInfo.workflowTask, events)
case consts.ErrWorkflowCompleted:
// previous workflow already closed
// fallthough to the logic for only creating the new workflow below
Expand All @@ -394,9 +394,9 @@ func (s *Starter) respondToRetriedRequest(
return nil, err
}

// The current workflow task is not inflight or not the first task or we exceeded the first attempt and fell back to
// The current workflow task is not started or not the first task or we exceeded the first attempt and fell back to
// matching based dispatch.
if !mutableStateInfo.hasInflight || mutableStateInfo.workflowTaskInfo.StartedEventID != 3 || mutableStateInfo.workflowTaskInfo.Attempt > 1 {
if mutableStateInfo.workflowTask == nil || mutableStateInfo.workflowTask.StartedEventID != 3 || mutableStateInfo.workflowTask.Attempt > 1 {
s.recordEagerDenied(eagerStartDeniedReasonTaskAlreadyDispatched)
return &historyservice.StartWorkflowExecutionResponse{
RunId: runID,
Expand All @@ -408,7 +408,7 @@ func (s *Starter) respondToRetriedRequest(
return nil, err
}

return s.generateResponse(runID, mutableStateInfo.workflowTaskInfo, events)
return s.generateResponse(runID, mutableStateInfo.workflowTask, events)
}

// getMutableStateInfo gets the relevant mutable state information while getting the state for the given run from the
Expand Down Expand Up @@ -443,19 +443,18 @@ func extractMutableStateInfo(mutableState workflow.MutableState) (*mutableStateI
}

// Future work for the request retry path: extend the task timeout (by failing / timing out the current task).
workflowTaskInfoSource, hasInflight := mutableState.GetInFlightWorkflowTask()
// The workflowTaskInfo returned from the mutable state call is generated on the fly and technically doesn't require
workflowTaskSource := mutableState.GetStartedWorkflowTask()
// The workflowTask returned from the mutable state call is generated on the fly and technically doesn't require
// cloning. We clone here just in case that changes.
var workflowTaskInfo workflow.WorkflowTaskInfo
if hasInflight {
workflowTaskInfo = *workflowTaskInfoSource
var workflowTask workflow.WorkflowTaskInfo
if workflowTaskSource != nil {
workflowTask = *workflowTaskSource
}

return &mutableStateInfo{
branchToken: branchToken,
lastEventID: mutableState.GetNextEventID() - 1,
workflowTaskInfo: &workflowTaskInfo,
hasInflight: hasInflight,
branchToken: branchToken,
lastEventID: mutableState.GetNextEventID() - 1,
workflowTask: &workflowTask,
}, nil
}

Expand Down
34 changes: 17 additions & 17 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2830,8 +2830,8 @@ func (s *engineSuite) TestRespondActivityTaskCompletedSuccess() {
s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, ms2.GetExecutionState().State)

s.True(ms2.HasPendingWorkflowTask())
wt, ok := ms2.GetWorkflowTaskInfo(int64(8))
s.True(ok)
wt = ms2.GetWorkflowTaskByID(int64(8))
s.NotNil(wt)
s.EqualValues(int64(100), wt.WorkflowTaskTimeout.Seconds())
s.Equal(int64(8), wt.ScheduledEventID)
s.Equal(common.EmptyEventID, wt.StartedEventID)
Expand Down Expand Up @@ -2891,8 +2891,8 @@ func (s *engineSuite) TestRespondActivityTaskCompletedByIdSuccess() {
s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, ms2.GetExecutionState().State)

s.True(ms2.HasPendingWorkflowTask())
wt, ok := ms2.GetWorkflowTaskInfo(int64(8))
s.True(ok)
wt := ms2.GetWorkflowTaskByID(int64(8))
s.NotNil(wt)
s.EqualValues(int64(100), wt.WorkflowTaskTimeout.Seconds())
s.Equal(int64(8), wt.ScheduledEventID)
s.Equal(common.EmptyEventID, wt.StartedEventID)
Expand Down Expand Up @@ -3301,8 +3301,8 @@ func (s *engineSuite) TestRespondActivityTaskFailedSuccess() {
s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, ms2.GetExecutionState().State)

s.True(ms2.HasPendingWorkflowTask())
wt, ok := ms2.GetWorkflowTaskInfo(int64(8))
s.True(ok)
wt = ms2.GetWorkflowTaskByID(int64(8))
s.NotNil(wt)
s.EqualValues(int64(100), wt.WorkflowTaskTimeout.Seconds())
s.Equal(int64(8), wt.ScheduledEventID)
s.Equal(common.EmptyEventID, wt.StartedEventID)
Expand Down Expand Up @@ -3365,8 +3365,8 @@ func (s *engineSuite) TestRespondActivityTaskFailedWithHeartbeatSuccess() {
s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, ms2.GetExecutionState().State)

s.True(ms2.HasPendingWorkflowTask())
wt, ok := ms2.GetWorkflowTaskInfo(int64(8))
s.True(ok)
wt = ms2.GetWorkflowTaskByID(int64(8))
s.NotNil(wt)
s.EqualValues(int64(100), wt.WorkflowTaskTimeout.Seconds())
s.Equal(int64(8), wt.ScheduledEventID)
s.Equal(common.EmptyEventID, wt.StartedEventID)
Expand Down Expand Up @@ -3428,8 +3428,8 @@ func (s *engineSuite) TestRespondActivityTaskFailedByIdSuccess() {
s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, ms2.GetExecutionState().State)

s.True(ms2.HasPendingWorkflowTask())
wt, ok := ms2.GetWorkflowTaskInfo(int64(8))
s.True(ok)
wt := ms2.GetWorkflowTaskByID(int64(8))
s.NotNil(wt)
s.EqualValues(int64(100), wt.WorkflowTaskTimeout.Seconds())
s.Equal(int64(8), wt.ScheduledEventID)
s.Equal(common.EmptyEventID, wt.StartedEventID)
Expand Down Expand Up @@ -3683,8 +3683,8 @@ func (s *engineSuite) TestRespondActivityTaskCanceled_Started() {
s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, ms2.GetExecutionState().State)

s.True(ms2.HasPendingWorkflowTask())
wt, ok := ms2.GetWorkflowTaskInfo(int64(9))
s.True(ok)
wt = ms2.GetWorkflowTaskByID(int64(9))
s.NotNil(wt)
s.EqualValues(int64(100), wt.WorkflowTaskTimeout.Seconds())
s.Equal(int64(9), wt.ScheduledEventID)
s.Equal(common.EmptyEventID, wt.StartedEventID)
Expand Down Expand Up @@ -3744,8 +3744,8 @@ func (s *engineSuite) TestRespondActivityTaskCanceledById_Started() {
s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, ms2.GetExecutionState().State)

s.True(ms2.HasPendingWorkflowTask())
wt, ok := ms2.GetWorkflowTaskInfo(int64(9))
s.True(ok)
wt := ms2.GetWorkflowTaskByID(int64(9))
s.NotNil(wt)
s.EqualValues(int64(100), wt.WorkflowTaskTimeout.Seconds())
s.Equal(int64(9), wt.ScheduledEventID)
s.Equal(common.EmptyEventID, wt.StartedEventID)
Expand Down Expand Up @@ -3972,8 +3972,8 @@ func (s *engineSuite) TestRequestCancel_RespondWorkflowTaskCompleted_Scheduled()
s.Equal(int64(7), ms2.GetExecutionInfo().LastWorkflowTaskStartedEventId)
s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, ms2.GetExecutionState().State)
s.True(ms2.HasPendingWorkflowTask())
wt2, ok := ms2.GetWorkflowTaskInfo(ms2.GetNextEventID() - 1)
s.True(ok)
wt2 = ms2.GetWorkflowTaskByID(ms2.GetNextEventID() - 1)
s.NotNil(wt2)
s.Equal(ms2.GetNextEventID()-1, wt2.ScheduledEventID)
s.Equal(int32(1), wt2.Attempt)
}
Expand Down Expand Up @@ -5306,7 +5306,7 @@ func addWorkflowTaskStartedEventWithRequestID(ms workflow.MutableState, schedule
}

func addWorkflowTaskCompletedEvent(s *suite.Suite, ms workflow.MutableState, scheduledEventID, startedEventID int64, identity string) *historypb.HistoryEvent {
workflowTask, _ := ms.GetWorkflowTaskInfo(scheduledEventID)
workflowTask := ms.GetWorkflowTaskByID(scheduledEventID)
s.NotNil(workflowTask)
s.Equal(startedEventID, workflowTask.StartedEventID)

Expand Down
2 changes: 1 addition & 1 deletion service/history/ndc/branch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (r *BranchMgrImpl) flushBufferedEvents(
// check whether there are buffered events, if so, flush it
// NOTE: buffered events does not show in version history or next event id
if !r.mutableState.HasBufferedEvents() {
if r.mutableState.HasInFlightWorkflowTask() && r.mutableState.IsTransientWorkflowTask() {
if r.mutableState.HasStartedWorkflowTask() && r.mutableState.IsTransientWorkflowTask() {
if err := r.mutableState.ClearTransientWorkflowTask(); err != nil {
return nil, 0, err
}
Expand Down
12 changes: 6 additions & 6 deletions service/history/ndc/branch_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (s *branchMgrSuite) TestClearTransientWorkflowTask() {

s.mockMutableState.EXPECT().GetLastWriteVersion().Return(lastWriteVersion, nil).AnyTimes()
s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
s.mockMutableState.EXPECT().HasInFlightWorkflowTask().Return(true).AnyTimes()
s.mockMutableState.EXPECT().HasStartedWorkflowTask().Return(true).AnyTimes()
s.mockMutableState.EXPECT().IsTransientWorkflowTask().Return(true).AnyTimes()
s.mockMutableState.EXPECT().ClearTransientWorkflowTask().Return(nil).AnyTimes()

Expand Down Expand Up @@ -239,7 +239,7 @@ func (s *branchMgrSuite) TestFlushBufferedEvents() {
ScheduledEventID: 1234,
StartedEventID: 2345,
}
s.mockMutableState.EXPECT().GetInFlightWorkflowTask().Return(workflowTask, true)
s.mockMutableState.EXPECT().GetStartedWorkflowTask().Return(workflowTask)
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
VersionHistories: versionHistories,
}).AnyTimes()
Expand Down Expand Up @@ -284,7 +284,7 @@ func (s *branchMgrSuite) TestPrepareVersionHistory_BranchAppendable_NoMissingEve

s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{VersionHistories: versionHistories}).AnyTimes()
s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
s.mockMutableState.EXPECT().HasInFlightWorkflowTask().Return(true).AnyTimes()
s.mockMutableState.EXPECT().HasStartedWorkflowTask().Return(true).AnyTimes()
s.mockMutableState.EXPECT().IsTransientWorkflowTask().Return(false).AnyTimes()

doContinue, index, err := s.nDCBranchMgr.prepareVersionHistory(
Expand Down Expand Up @@ -316,7 +316,7 @@ func (s *branchMgrSuite) TestPrepareVersionHistory_BranchAppendable_MissingEvent
s.NoError(err)

s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
s.mockMutableState.EXPECT().HasInFlightWorkflowTask().Return(true).AnyTimes()
s.mockMutableState.EXPECT().HasStartedWorkflowTask().Return(true).AnyTimes()
s.mockMutableState.EXPECT().IsTransientWorkflowTask().Return(false).AnyTimes()
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
NamespaceId: s.namespaceID,
Expand Down Expand Up @@ -358,7 +358,7 @@ func (s *branchMgrSuite) TestPrepareVersionHistory_BranchNotAppendable_NoMissing
newBranchToken := []byte("some random new branch token")

s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
s.mockMutableState.EXPECT().HasInFlightWorkflowTask().Return(true).AnyTimes()
s.mockMutableState.EXPECT().HasStartedWorkflowTask().Return(true).AnyTimes()
s.mockMutableState.EXPECT().IsTransientWorkflowTask().Return(false).AnyTimes()
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
NamespaceId: s.namespaceID,
Expand Down Expand Up @@ -418,7 +418,7 @@ func (s *branchMgrSuite) TestPrepareVersionHistory_BranchNotAppendable_MissingEv
})

s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
s.mockMutableState.EXPECT().HasInFlightWorkflowTask().Return(true).AnyTimes()
s.mockMutableState.EXPECT().HasStartedWorkflowTask().Return(true).AnyTimes()
s.mockMutableState.EXPECT().IsTransientWorkflowTask().Return(false).AnyTimes()
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
NamespaceId: s.namespaceID,
Expand Down
3 changes: 2 additions & 1 deletion service/history/ndc/transaction_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -319,7 +320,7 @@ func (r *transactionMgrImpl) backfillWorkflowEventsReapply(
workflowID := baseMutableState.GetExecutionInfo().WorkflowId
baseRunID := baseMutableState.GetExecutionState().GetRunId()
resetRunID := uuid.New()
baseRebuildLastEventID := baseMutableState.GetPreviousStartedEventID()
baseRebuildLastEventID := baseMutableState.GetLastWorkflowTaskStartedEventID()
baseVersionHistories := baseMutableState.GetExecutionInfo().GetVersionHistories()
baseCurrentVersionHistory, err := versionhistory.GetCurrentVersionHistory(baseVersionHistories)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions service/history/ndc/transaction_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (s *transactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Active_Closed
RunId: runID,
}).AnyTimes()
mutableState.EXPECT().GetNextEventID().Return(nextEventID).AnyTimes()
mutableState.EXPECT().GetPreviousStartedEventID().Return(lastWorkflowTaskStartedEventID)
mutableState.EXPECT().GetLastWorkflowTaskStartedEventID().Return(lastWorkflowTaskStartedEventID)

s.mockWorkflowResetter.EXPECT().ResetWorkflow(
ctx,
Expand Down Expand Up @@ -298,7 +298,7 @@ func (s *transactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Closed_ResetF
RunId: runID,
}).AnyTimes()
mutableState.EXPECT().GetNextEventID().Return(nextEventID).AnyTimes()
mutableState.EXPECT().GetPreviousStartedEventID().Return(lastWorkflowTaskStartedEventID)
mutableState.EXPECT().GetLastWorkflowTaskStartedEventID().Return(lastWorkflowTaskStartedEventID)

s.mockWorkflowResetter.EXPECT().ResetWorkflow(
ctx,
Expand Down
6 changes: 3 additions & 3 deletions service/history/ndc/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (r *WorkflowImpl) Revive() error {

// workflow is in zombie state, need to set the state correctly accordingly
state = enumsspb.WORKFLOW_EXECUTION_STATE_CREATED
if r.mutableState.HasProcessedOrPendingWorkflowTask() {
if r.mutableState.HadOrHasWorkflowTask() {
state = enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING
}
return r.mutableState.UpdateWorkflowStateStatus(
Expand Down Expand Up @@ -232,8 +232,8 @@ func (r *WorkflowImpl) failWorkflowTask(
return err
}

workflowTask, ok := r.mutableState.GetInFlightWorkflowTask()
if !ok {
workflowTask := r.mutableState.GetStartedWorkflowTask()
if workflowTask == nil {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,9 @@ func (r *workflowResetterImpl) failWorkflowTask(
resetReason string,
) error {

workflowTask, ok := resetMutableState.GetPendingWorkflowTask()
if !ok {
// TODO if resetMutableState.HasProcessedOrPendingWorkflowTask() == true
workflowTask := resetMutableState.GetPendingWorkflowTask()
if workflowTask == nil {
// TODO if resetMutableState.HadOrHasWorkflowTask() == true
// meaning workflow history has NO workflow task ever
// should also allow workflow reset, the only remaining issues are
// * what if workflow is a cron workflow, e.g. should add a workflow task directly or still respect the cron job
Expand Down
Loading