diff --git a/service/history/api/get_workflow_util.go b/service/history/api/get_workflow_util.go index c21fe5b77ff..360232c0fe0 100644 --- a/service/history/api/get_workflow_util.go +++ b/service/history/api/get_workflow_util.go @@ -209,7 +209,7 @@ func MutableStateToGetResponse( CurrentBranchToken: currentBranchToken, WorkflowState: workflowState, WorkflowStatus: workflowStatus, - IsStickyTaskQueueEnabled: mutableState.IsStickyTaskQueueEnabled(), + IsStickyTaskQueueEnabled: mutableState.IsStickyTaskQueueSet(), VersionHistories: versionhistory.CopyVersionHistories( mutableState.GetExecutionInfo().GetVersionHistories(), ), diff --git a/service/history/api/resetstickytaskqueue/api.go b/service/history/api/resetstickytaskqueue/api.go index 5cd1d0cc8ae..58996a3cded 100644 --- a/service/history/api/resetstickytaskqueue/api.go +++ b/service/history/api/resetstickytaskqueue/api.go @@ -62,7 +62,7 @@ func Invoke( return nil, consts.ErrWorkflowCompleted } - mutableState.ClearStickyness() + mutableState.ClearStickyTaskQueue() return &api.UpdateWorkflowAction{ Noop: true, CreateWorkflowTask: false, diff --git a/service/history/api/updateworkflow/api.go b/service/history/api/updateworkflow/api.go index 6ff920b50fb..97781b145ab 100644 --- a/service/history/api/updateworkflow/api.go +++ b/service/history/api/updateworkflow/api.go @@ -27,7 +27,6 @@ package updateworkflow import ( "context" "fmt" - "time" commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" @@ -35,6 +34,7 @@ import ( "go.temporal.io/api/workflowservice/v1" enumspb "go.temporal.io/api/enums/v1" + enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" @@ -169,14 +169,7 @@ func addWorkflowTaskToMatching( wt *workflow.WorkflowTaskInfo, nsID namespace.ID, ) error { - // TODO (alex): Timeout calculation is copied from somewhere else. Extract func instead? - var taskScheduleToStartTimeout *time.Duration - if ms.IsStickyTaskQueueEnabled() { - taskScheduleToStartTimeout = ms.GetExecutionInfo().StickyScheduleToStartTimeout - } else { - taskScheduleToStartTimeout = ms.GetExecutionInfo().WorkflowRunTimeout - } - + _, scheduleToStartTimeout := ms.TaskQueueScheduleToStartTimeout(wt.TaskQueue.Name) wfKey := ms.GetWorkflowKey() clock, err := shardCtx.NewVectorClock() if err != nil { @@ -191,7 +184,7 @@ func addWorkflowTaskToMatching( }, TaskQueue: wt.TaskQueue, ScheduledEventId: wt.ScheduledEventID, - ScheduleToStartTimeout: taskScheduleToStartTimeout, + ScheduleToStartTimeout: scheduleToStartTimeout, Clock: clock, }) if err != nil { diff --git a/service/history/transferQueueActiveTaskExecutor.go b/service/history/transferQueueActiveTaskExecutor.go index 9f886ae7a3f..a9fa48a60f5 100644 --- a/service/history/transferQueueActiveTaskExecutor.go +++ b/service/history/transferQueueActiveTaskExecutor.go @@ -27,7 +27,6 @@ package history import ( "context" "fmt" - "time" "github.com/pborman/uuid" commonpb "go.temporal.io/api/common/v1" @@ -207,18 +206,18 @@ func (t *transferQueueActiveTaskExecutor) processActivityTask( func (t *transferQueueActiveTaskExecutor) processWorkflowTask( ctx context.Context, - task *tasks.WorkflowTask, + transferTask *tasks.WorkflowTask, ) (retError error) { ctx, cancel := context.WithTimeout(ctx, taskTimeout) defer cancel() - weContext, release, err := getWorkflowExecutionContextForTask(ctx, t.cache, task) + weContext, release, err := getWorkflowExecutionContextForTask(ctx, t.cache, transferTask) if err != nil { return err } defer func() { release(retError) }() - mutableState, err := loadMutableStateForTransferTask(ctx, weContext, task, t.metricHandler, t.logger) + mutableState, err := loadMutableStateForTransferTask(ctx, weContext, transferTask, t.metricHandler, t.logger) if err != nil { return err } @@ -226,54 +225,35 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask( return nil } - workflowTask := mutableState.GetWorkflowTaskByID(task.ScheduledEventID) + workflowTask := mutableState.GetWorkflowTaskByID(transferTask.ScheduledEventID) if workflowTask == nil { return nil } - err = CheckTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), workflowTask.Version, task.Version, task) + err = CheckTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), workflowTask.Version, transferTask.Version, transferTask) if err != nil { return err } - executionInfo := mutableState.GetExecutionInfo() + // Task queue from transfer task (not current one from mutable state) must be used here. + // If current task queue becomes sticky since this transfer task was created, + // it can't be used here, because timeout timer was not created for it, + // because it used to be non-sticky when this transfer task was created . + taskQueue, scheduleToStartTimeout := mutableState.TaskQueueScheduleToStartTimeout(transferTask.TaskQueue) - // NOTE: previously this section check whether mutable state has enabled - // sticky workflowTask, if so convert the workflowTask to a sticky workflowTask. - // that logic has a bug which timer task for that sticky workflowTask is not generated - // the correct logic should check whether the workflow task is a sticky workflowTask - // task or not. - var taskQueue *taskqueuepb.TaskQueue - var taskScheduleToStartTimeout *time.Duration - if mutableState.GetExecutionInfo().TaskQueue != task.TaskQueue { - // this workflowTask is an sticky workflowTask - // there shall already be an timer set - taskQueue = &taskqueuepb.TaskQueue{ - Name: task.TaskQueue, - Kind: enumspb.TASK_QUEUE_KIND_STICKY, - } - taskScheduleToStartTimeout = executionInfo.StickyScheduleToStartTimeout - } else { - taskQueue = &taskqueuepb.TaskQueue{ - Name: task.TaskQueue, - Kind: enumspb.TASK_QUEUE_KIND_NORMAL, - } - workflowRunTimeout := executionInfo.WorkflowRunTimeout - taskScheduleToStartTimeout = workflowRunTimeout - } + normalTaskQueueName := mutableState.GetExecutionInfo().TaskQueue - originalTaskQueue := mutableState.GetExecutionInfo().TaskQueue - // NOTE: do not access anything related mutable state after this lock release - // release the context lock since we no longer need mutable state and - // the rest of logic is making RPC call, which takes time. + // NOTE: Do not access mutableState after this lock is released. + // It is important to release the workflow lock here, because pushWorkflowTask will call matching, + // which will call history back (with RecordWorkflowTaskStarted), and it will try to get workflow lock again. release(nil) - err = t.pushWorkflowTask(ctx, task, taskQueue, taskScheduleToStartTimeout) + err = t.pushWorkflowTask(ctx, transferTask, taskQueue, scheduleToStartTimeout) if _, ok := err.(*serviceerrors.StickyWorkerUnavailable); ok { - // sticky worker is unavailable, switch to original task queue + // sticky worker is unavailable, switch to original normal task queue taskQueue = &taskqueuepb.TaskQueue{ - // do not use task.TaskQueue which is sticky, use original task queue from mutable state - Name: originalTaskQueue, + // do not use task.TaskQueue which is sticky, use original normal task queue from mutable state + Name: normalTaskQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL, } @@ -282,7 +262,7 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask( // There is no need to reset sticky, because if this task is picked by new worker, the new worker will reset // the sticky queue to a new one. However, if worker is completely down, that schedule_to_start timeout task // will re-create a new non-sticky task and reset sticky. - err = t.pushWorkflowTask(ctx, task, taskQueue, taskScheduleToStartTimeout) + err = t.pushWorkflowTask(ctx, transferTask, taskQueue, scheduleToStartTimeout) } return err } diff --git a/service/history/transferQueueStandbyTaskExecutor.go b/service/history/transferQueueStandbyTaskExecutor.go index 6b8c55c22b7..8998005e9ad 100644 --- a/service/history/transferQueueStandbyTaskExecutor.go +++ b/service/history/transferQueueStandbyTaskExecutor.go @@ -181,21 +181,17 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask( return nil, nil } - executionInfo := mutableState.GetExecutionInfo() - + _, scheduleToStartTimeout := mutableState.TaskQueueScheduleToStartTimeout(transferTask.TaskQueue) + // Task queue is ignored here because at standby, always use original normal task queue, + // disregards the transferTask.TaskQueue which could be sticky. + // NOTE: scheduleToStart timeout is respected. If workflow was sticky before namespace become standby, + // transferTask.TaskQueue is sticky, and there is timer already created for this timeout. + // Use this sticky timeout as TTL. taskQueue := &taskqueuepb.TaskQueue{ - // at standby, always use original task queue, disregards the task.TaskQueue which could be sticky Name: mutableState.GetExecutionInfo().TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL, } - workflowRunTimeout := executionInfo.WorkflowRunTimeout - taskScheduleToStartTimeout := workflowRunTimeout - if mutableState.GetExecutionInfo().TaskQueue != transferTask.TaskQueue { - // Experimental: try to push sticky task as regular task with sticky timeout as TTL. - // workflow might be sticky before namespace become standby - // there shall already be a schedule_to_start timer created - taskScheduleToStartTimeout = executionInfo.StickyScheduleToStartTimeout - } + err := CheckTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), wtInfo.Version, transferTask.Version, transferTask) if err != nil { return nil, err @@ -204,7 +200,7 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask( if wtInfo.StartedEventID == common.EmptyEventID { return newWorkflowTaskPostActionInfo( mutableState, - taskScheduleToStartTimeout, + scheduleToStartTimeout, *taskQueue, ) } @@ -569,12 +565,11 @@ func (t *transferQueueStandbyTaskExecutor) pushWorkflowTask( } pushwtInfo := postActionInfo.(*workflowTaskPostActionInfo) - timeout := pushwtInfo.workflowTaskScheduleToStartTimeout return t.transferQueueTaskExecutorBase.pushWorkflowTask( ctx, task.(*tasks.WorkflowTask), &pushwtInfo.taskqueue, - timeout, + pushwtInfo.workflowTaskScheduleToStartTimeout, ) } diff --git a/service/history/workflow/mutable_state.go b/service/history/workflow/mutable_state.go index 7145938e1ff..34d2bc1a374 100644 --- a/service/history/workflow/mutable_state.go +++ b/service/history/workflow/mutable_state.go @@ -149,7 +149,6 @@ type ( AddWorkflowExecutionUpdateAcceptedEvent(protocolInstanceID string, updAcceptance *updatepb.Acceptance) (*historypb.HistoryEvent, error) AddWorkflowExecutionUpdateCompletedEvent(updResp *updatepb.Response) (*historypb.HistoryEvent, error) RejectWorkflowExecutionUpdate(protocolInstanceID string, updRejection *updatepb.Rejection) error - ClearStickyness() CheckResettable() error CloneToProto() *persistencespb.WorkflowMutableState RetryActivity(ai *persistencespb.ActivityInfo, failure *failurepb.Failure) (enumspb.RetryState, error) @@ -212,8 +211,13 @@ type ( IsCancelRequested() bool IsCurrentWorkflowGuaranteed() bool IsSignalRequested(requestID string) bool - IsStickyTaskQueueEnabled() bool - TaskQueue() *taskqueuepb.TaskQueue + + CurrentTaskQueue() *taskqueuepb.TaskQueue + SetStickyTaskQueue(name string, scheduleToStartTimeout *time.Duration) + ClearStickyTaskQueue() + IsStickyTaskQueueSet() bool + TaskQueueScheduleToStartTimeout(name string) (*taskqueuepb.TaskQueue, *time.Duration) + IsWorkflowExecutionRunning() bool IsResourceDuplicated(resourceDedupKey definition.DeduplicationID) bool IsWorkflowPendingOnWorkflowTaskBackoff() bool diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 4bd093a54a3..92092f866d0 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -632,21 +632,47 @@ func (ms *MutableStateImpl) GetNamespaceEntry() *namespace.Namespace { return ms.namespaceEntry } -func (ms *MutableStateImpl) IsStickyTaskQueueEnabled() bool { +func (ms *MutableStateImpl) CurrentTaskQueue() *taskqueuepb.TaskQueue { + if ms.IsStickyTaskQueueSet() { + return &taskqueuepb.TaskQueue{ + Name: ms.executionInfo.StickyTaskQueue, + Kind: enumspb.TASK_QUEUE_KIND_STICKY, + } + } + return &taskqueuepb.TaskQueue{ + Name: ms.executionInfo.TaskQueue, + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + } +} + +func (ms *MutableStateImpl) SetStickyTaskQueue(name string, scheduleToStartTimeout *time.Duration) { + ms.executionInfo.StickyTaskQueue = name + ms.executionInfo.StickyScheduleToStartTimeout = scheduleToStartTimeout +} + +func (ms *MutableStateImpl) ClearStickyTaskQueue() { + ms.executionInfo.StickyTaskQueue = "" + ms.executionInfo.StickyScheduleToStartTimeout = nil +} + +func (ms *MutableStateImpl) IsStickyTaskQueueSet() bool { return ms.executionInfo.StickyTaskQueue != "" } -func (ms *MutableStateImpl) TaskQueue() *taskqueuepb.TaskQueue { - if ms.IsStickyTaskQueueEnabled() { +// TaskQueueScheduleToStartTimeout returns TaskQueue struct and corresponding StartToClose timeout. +// Task queue kind (sticky or normal) and timeout are set based on comparison of normal task queue name +// in mutable state and provided name. +func (ms *MutableStateImpl) TaskQueueScheduleToStartTimeout(name string) (*taskqueuepb.TaskQueue, *time.Duration) { + if ms.executionInfo.TaskQueue != name { return &taskqueuepb.TaskQueue{ Name: ms.executionInfo.StickyTaskQueue, Kind: enumspb.TASK_QUEUE_KIND_STICKY, - } + }, ms.executionInfo.StickyScheduleToStartTimeout } return &taskqueuepb.TaskQueue{ Name: ms.executionInfo.TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL, - } + }, ms.executionInfo.WorkflowRunTimeout // No WT ScheduleToStart timeout for normal task queue. } func (ms *MutableStateImpl) GetWorkflowType() *commonpb.WorkflowType { @@ -1409,11 +1435,6 @@ func (ms *MutableStateImpl) DeleteWorkflowTask() { ms.workflowTaskManager.DeleteWorkflowTask() } -func (ms *MutableStateImpl) ClearStickyness() { - ms.executionInfo.StickyTaskQueue = "" - ms.executionInfo.StickyScheduleToStartTimeout = timestamp.DurationFromSeconds(0) -} - // GetLastFirstEventIDTxnID returns last first event ID and corresponding transaction ID // first event ID is the ID of a batch of events in a single history events record func (ms *MutableStateImpl) GetLastFirstEventIDTxnID() (int64, int64) { @@ -2537,7 +2558,7 @@ func (ms *MutableStateImpl) ReplicateWorkflowExecutionCompletedEvent( ms.executionInfo.CompletionEventBatchId = firstEventID // Used when completion event needs to be loaded from database ms.executionInfo.NewExecutionRunId = event.GetWorkflowExecutionCompletedEventAttributes().GetNewExecutionRunId() ms.executionInfo.CloseTime = event.GetEventTime() - ms.ClearStickyness() + ms.ClearStickyTaskQueue() ms.writeEventToCache(event) return nil } @@ -2582,7 +2603,7 @@ func (ms *MutableStateImpl) ReplicateWorkflowExecutionFailedEvent( ms.executionInfo.CompletionEventBatchId = firstEventID // Used when completion event needs to be loaded from database ms.executionInfo.NewExecutionRunId = event.GetWorkflowExecutionFailedEventAttributes().GetNewExecutionRunId() ms.executionInfo.CloseTime = event.GetEventTime() - ms.ClearStickyness() + ms.ClearStickyTaskQueue() ms.writeEventToCache(event) return nil } @@ -2626,7 +2647,7 @@ func (ms *MutableStateImpl) ReplicateWorkflowExecutionTimedoutEvent( ms.executionInfo.CompletionEventBatchId = firstEventID // Used when completion event needs to be loaded from database ms.executionInfo.NewExecutionRunId = event.GetWorkflowExecutionTimedOutEventAttributes().GetNewExecutionRunId() ms.executionInfo.CloseTime = event.GetEventTime() - ms.ClearStickyness() + ms.ClearStickyTaskQueue() ms.writeEventToCache(event) return nil } @@ -2706,7 +2727,7 @@ func (ms *MutableStateImpl) ReplicateWorkflowExecutionCanceledEvent( ms.executionInfo.CompletionEventBatchId = firstEventID // Used when completion event needs to be loaded from database ms.executionInfo.NewExecutionRunId = "" ms.executionInfo.CloseTime = event.GetEventTime() - ms.ClearStickyness() + ms.ClearStickyTaskQueue() ms.writeEventToCache(event) return nil } @@ -3326,7 +3347,7 @@ func (ms *MutableStateImpl) ReplicateWorkflowExecutionTerminatedEvent( ms.executionInfo.CompletionEventBatchId = firstEventID // Used when completion event needs to be loaded from database ms.executionInfo.NewExecutionRunId = "" ms.executionInfo.CloseTime = event.GetEventTime() - ms.ClearStickyness() + ms.ClearStickyTaskQueue() ms.writeEventToCache(event) return nil } @@ -3488,7 +3509,7 @@ func (ms *MutableStateImpl) ReplicateWorkflowExecutionContinuedAsNewEvent( ms.executionInfo.CompletionEventBatchId = firstEventID // Used when completion event needs to be loaded from database ms.executionInfo.NewExecutionRunId = continueAsNewEvent.GetWorkflowExecutionContinuedAsNewEventAttributes().GetNewExecutionRunId() ms.executionInfo.CloseTime = continueAsNewEvent.GetEventTime() - ms.ClearStickyness() + ms.ClearStickyTaskQueue() ms.writeEventToCache(continueAsNewEvent) return nil } diff --git a/service/history/workflow/mutable_state_mock.go b/service/history/workflow/mutable_state_mock.go index 661098af696..01105d17419 100644 --- a/service/history/workflow/mutable_state_mock.go +++ b/service/history/workflow/mutable_state_mock.go @@ -847,16 +847,16 @@ func (mr *MockMutableStateMockRecorder) CheckSpeculativeWorkflowTaskTimeoutTask( return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckSpeculativeWorkflowTaskTimeoutTask", reflect.TypeOf((*MockMutableState)(nil).CheckSpeculativeWorkflowTaskTimeoutTask), task) } -// ClearStickyness mocks base method. -func (m *MockMutableState) ClearStickyness() { +// ClearStickyTaskQueue mocks base method. +func (m *MockMutableState) ClearStickyTaskQueue() { m.ctrl.T.Helper() - m.ctrl.Call(m, "ClearStickyness") + m.ctrl.Call(m, "ClearStickyTaskQueue") } -// ClearStickyness indicates an expected call of ClearStickyness. -func (mr *MockMutableStateMockRecorder) ClearStickyness() *gomock.Call { +// ClearStickyTaskQueue indicates an expected call of ClearStickyTaskQueue. +func (mr *MockMutableStateMockRecorder) ClearStickyTaskQueue() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearStickyness", reflect.TypeOf((*MockMutableState)(nil).ClearStickyness)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearStickyTaskQueue", reflect.TypeOf((*MockMutableState)(nil).ClearStickyTaskQueue)) } // ClearTransientWorkflowTask mocks base method. @@ -933,6 +933,20 @@ func (mr *MockMutableStateMockRecorder) ContinueAsNewMinBackoff(backoffDuration return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ContinueAsNewMinBackoff", reflect.TypeOf((*MockMutableState)(nil).ContinueAsNewMinBackoff), backoffDuration) } +// CurrentTaskQueue mocks base method. +func (m *MockMutableState) CurrentTaskQueue() *v14.TaskQueue { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CurrentTaskQueue") + ret0, _ := ret[0].(*v14.TaskQueue) + return ret0 +} + +// CurrentTaskQueue indicates an expected call of CurrentTaskQueue. +func (mr *MockMutableStateMockRecorder) CurrentTaskQueue() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CurrentTaskQueue", reflect.TypeOf((*MockMutableState)(nil).CurrentTaskQueue)) +} + // DeleteSignalRequested mocks base method. func (m *MockMutableState) DeleteSignalRequested(requestID string) { m.ctrl.T.Helper() @@ -1794,18 +1808,18 @@ func (mr *MockMutableStateMockRecorder) IsSignalRequested(requestID interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSignalRequested", reflect.TypeOf((*MockMutableState)(nil).IsSignalRequested), requestID) } -// IsStickyTaskQueueEnabled mocks base method. -func (m *MockMutableState) IsStickyTaskQueueEnabled() bool { +// IsStickyTaskQueueSet mocks base method. +func (m *MockMutableState) IsStickyTaskQueueSet() bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsStickyTaskQueueEnabled") + ret := m.ctrl.Call(m, "IsStickyTaskQueueSet") ret0, _ := ret[0].(bool) return ret0 } -// IsStickyTaskQueueEnabled indicates an expected call of IsStickyTaskQueueEnabled. -func (mr *MockMutableStateMockRecorder) IsStickyTaskQueueEnabled() *gomock.Call { +// IsStickyTaskQueueSet indicates an expected call of IsStickyTaskQueueSet. +func (mr *MockMutableStateMockRecorder) IsStickyTaskQueueSet() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsStickyTaskQueueEnabled", reflect.TypeOf((*MockMutableState)(nil).IsStickyTaskQueueEnabled)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsStickyTaskQueueSet", reflect.TypeOf((*MockMutableState)(nil).IsStickyTaskQueueSet)) } // IsTransientWorkflowTask mocks base method. @@ -2591,6 +2605,18 @@ func (mr *MockMutableStateMockRecorder) SetSpeculativeWorkflowTaskTimeoutTask(ta return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSpeculativeWorkflowTaskTimeoutTask", reflect.TypeOf((*MockMutableState)(nil).SetSpeculativeWorkflowTaskTimeoutTask), task) } +// SetStickyTaskQueue mocks base method. +func (m *MockMutableState) SetStickyTaskQueue(name string, scheduleToStartTimeout *time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetStickyTaskQueue", name, scheduleToStartTimeout) +} + +// SetStickyTaskQueue indicates an expected call of SetStickyTaskQueue. +func (mr *MockMutableStateMockRecorder) SetStickyTaskQueue(name, scheduleToStartTimeout interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetStickyTaskQueue", reflect.TypeOf((*MockMutableState)(nil).SetStickyTaskQueue), name, scheduleToStartTimeout) +} + // SetUpdateCondition mocks base method. func (m *MockMutableState) SetUpdateCondition(arg0, arg1 int64) { m.ctrl.T.Helper() @@ -2618,18 +2644,19 @@ func (mr *MockMutableStateMockRecorder) StartTransaction(entry interface{}) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartTransaction", reflect.TypeOf((*MockMutableState)(nil).StartTransaction), entry) } -// TaskQueue mocks base method. -func (m *MockMutableState) TaskQueue() *v14.TaskQueue { +// TaskQueueScheduleToStartTimeout mocks base method. +func (m *MockMutableState) TaskQueueScheduleToStartTimeout(name string) (*v14.TaskQueue, *time.Duration) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TaskQueue") + ret := m.ctrl.Call(m, "TaskQueueScheduleToStartTimeout", name) ret0, _ := ret[0].(*v14.TaskQueue) - return ret0 + ret1, _ := ret[1].(*time.Duration) + return ret0, ret1 } -// TaskQueue indicates an expected call of TaskQueue. -func (mr *MockMutableStateMockRecorder) TaskQueue() *gomock.Call { +// TaskQueueScheduleToStartTimeout indicates an expected call of TaskQueueScheduleToStartTimeout. +func (mr *MockMutableStateMockRecorder) TaskQueueScheduleToStartTimeout(name interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TaskQueue", reflect.TypeOf((*MockMutableState)(nil).TaskQueue)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TaskQueueScheduleToStartTimeout", reflect.TypeOf((*MockMutableState)(nil).TaskQueueScheduleToStartTimeout), name) } // UpdateActivity mocks base method. diff --git a/service/history/workflow/mutable_state_rebuilder.go b/service/history/workflow/mutable_state_rebuilder.go index 319410aef18..9cfc4207caa 100644 --- a/service/history/workflow/mutable_state_rebuilder.go +++ b/service/history/workflow/mutable_state_rebuilder.go @@ -138,8 +138,8 @@ func (b *MutableStateRebuilderImpl) applyEvents( taskGenerator := taskGeneratorProvider.NewTaskGenerator(b.shard, b.mutableState) - // need to clear the stickiness since workflow turned to passive - b.mutableState.ClearStickyness() + // Need to clear the sticky task queue because workflow turned to passive. + b.mutableState.ClearStickyTaskQueue() executionInfo := b.mutableState.GetExecutionInfo() executionInfo.LastFirstEventId = firstEvent.GetEventId() diff --git a/service/history/workflow/mutable_state_rebuilder_test.go b/service/history/workflow/mutable_state_rebuilder_test.go index bd345f0e8d7..b79b917bca2 100644 --- a/service/history/workflow/mutable_state_rebuilder_test.go +++ b/service/history/workflow/mutable_state_rebuilder_test.go @@ -199,7 +199,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionStarted_No s.mockTaskGenerator.EXPECT().GenerateWorkflowStartTasks( event, ).Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() s.mockMutableState.EXPECT().SetHistoryTree(gomock.Any(), nil, timestamp.DurationFromSeconds(100), tests.RunID).Return(nil) _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) @@ -248,7 +248,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionStarted_Wi s.mockTaskGenerator.EXPECT().GenerateDelayedWorkflowTasks( event, ).Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() s.mockMutableState.EXPECT().SetHistoryTree(gomock.Any(), nil, timestamp.DurationFromSeconds(100), tests.RunID).Return(nil) _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) @@ -281,7 +281,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionTimedOut() event, false, ).Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -313,7 +313,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionTerminated event, false, ).Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) s.Equal(event.TaskId, s.executionInfo.LastEventTaskId) @@ -344,7 +344,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionFailed() { event, false, ).Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -376,7 +376,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionCompleted( event, false, ).Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -408,7 +408,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionCanceled() event, false, ).Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -506,7 +506,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA continueAsNewEvent, false, ).Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() // new workflow namespace s.mockNamespaceCache.EXPECT().GetNamespace(tests.ParentNamespace).Return(tests.GlobalParentNamespaceEntry, nil).AnyTimes() @@ -564,7 +564,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA continueAsNewEvent, false, ).Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() // new workflow namespace s.mockNamespaceCache.EXPECT().GetNamespace(tests.ParentNamespace).Return(tests.GlobalParentNamespaceEntry, nil).AnyTimes() @@ -597,7 +597,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionSignaled() } s.mockUpdateVersion(event) s.mockMutableState.EXPECT().ReplicateWorkflowExecutionSignaled(event).Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -625,7 +625,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionCancelRequ s.mockMutableState.EXPECT().ReplicateWorkflowExecutionCancelRequestedEvent(event).Return(nil) s.mockUpdateVersion(event) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -656,7 +656,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeUpsertWorkflowSearchAttribu s.mockMutableState.EXPECT().ReplicateUpsertWorkflowSearchAttributesEvent(event).Return() s.mockUpdateVersion(event) s.mockTaskGenerator.EXPECT().GenerateUpsertVisibilityTask().Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -687,7 +687,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowPropertiesModified( s.mockMutableState.EXPECT().ReplicateWorkflowPropertiesModifiedEvent(event).Return() s.mockUpdateVersion(event) s.mockTaskGenerator.EXPECT().GenerateUpsertVisibilityTask().Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -714,7 +714,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeMarkerRecorded() { Attributes: &historypb.HistoryEvent_MarkerRecordedEventAttributes{MarkerRecordedEventAttributes: &historypb.MarkerRecordedEventAttributes{}}, } s.mockUpdateVersion(event) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -767,7 +767,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowTaskScheduled() { wt.ScheduledEventID, false, ).Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -816,7 +816,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowTaskStarted() { s.mockTaskGenerator.EXPECT().GenerateStartWorkflowTaskTasks( wt.ScheduledEventID, ).Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -862,7 +862,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowTaskTimedOut() { newScheduledEventID, false, ).Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -907,7 +907,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowTaskFailed() { newScheduledEventID, false, ).Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -940,7 +940,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowTaskCompleted() { } s.mockMutableState.EXPECT().ReplicateWorkflowTaskCompletedEvent(event).Return(nil) s.mockUpdateVersion(event) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -985,7 +985,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeTimerStarted() { s.mockUpdateVersion(event) // assertion on timer generated is in `mockUpdateVersion` function, since activity / user timer // need to be refreshed each time - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1016,7 +1016,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeTimerFired() { s.mockUpdateVersion(event) // assertion on timer generated is in `mockUpdateVersion` function, since activity / user timer // need to be refreshed each time - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1047,7 +1047,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeTimerCanceled() { s.mockUpdateVersion(event) // assertion on timer generated is in `mockUpdateVersion` function, since activity / user timer // need to be refreshed each time - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1105,7 +1105,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeActivityTaskScheduled() { ).Return(nil) // assertion on timer generated is in `mockUpdateVersion` function, since activity / user timer // need to be refreshed each time - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1148,7 +1148,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeActivityTaskStarted() { s.mockUpdateVersion(startedEvent) // assertion on timer generated is in `mockUpdateVersion` function, since activity / user timer // need to be refreshed each time - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(startedEvent), nil) s.Nil(err) @@ -1180,7 +1180,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeActivityTaskTimedOut() { // assertion on timer generated is in `mockUpdateVersion` function, since activity / user timer // need to be refreshed each time// assertion on timer generated is in `mockUpdateVersion` function, since activity / user timer // // need to be refreshed each time - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1211,7 +1211,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeActivityTaskFailed() { s.mockUpdateVersion(event) // assertion on timer generated is in `mockUpdateVersion` function, since activity / user timer // need to be refreshed each time - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1242,7 +1242,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeActivityTaskCompleted() { s.mockUpdateVersion(event) // assertion on timer generated is in `mockUpdateVersion` function, since activity / user timer // need to be refreshed each time - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1270,7 +1270,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeActivityTaskCancelRequested } s.mockMutableState.EXPECT().ReplicateActivityTaskCancelRequestedEvent(event).Return(nil) s.mockUpdateVersion(event) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1301,7 +1301,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeActivityTaskCanceled() { s.mockUpdateVersion(event) // assertion on timer generated is in `mockUpdateVersion` function, since activity / user timer // need to be refreshed each time - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1354,7 +1354,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeStartChildWorkflowExecution s.mockTaskGenerator.EXPECT().GenerateChildWorkflowTasks( event, ).Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1382,7 +1382,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeStartChildWorkflowExecution } s.mockMutableState.EXPECT().ReplicateStartChildWorkflowExecutionFailedEvent(event).Return(nil) s.mockUpdateVersion(event) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1410,7 +1410,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeChildWorkflowExecutionStart } s.mockMutableState.EXPECT().ReplicateChildWorkflowExecutionStartedEvent(event, nil).Return(nil) s.mockUpdateVersion(event) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1438,7 +1438,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeChildWorkflowExecutionTimed } s.mockMutableState.EXPECT().ReplicateChildWorkflowExecutionTimedOutEvent(event).Return(nil) s.mockUpdateVersion(event) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1466,7 +1466,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeChildWorkflowExecutionTermi } s.mockMutableState.EXPECT().ReplicateChildWorkflowExecutionTerminatedEvent(event).Return(nil) s.mockUpdateVersion(event) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1494,7 +1494,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeChildWorkflowExecutionFaile } s.mockMutableState.EXPECT().ReplicateChildWorkflowExecutionFailedEvent(event).Return(nil) s.mockUpdateVersion(event) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1522,7 +1522,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeChildWorkflowExecutionCompl } s.mockMutableState.EXPECT().ReplicateChildWorkflowExecutionCompletedEvent(event).Return(nil) s.mockUpdateVersion(event) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1579,7 +1579,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeRequestCancelExternalWorkfl s.mockTaskGenerator.EXPECT().GenerateRequestCancelExternalTasks( event, ).Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1607,7 +1607,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeRequestCancelExternalWorkfl } s.mockMutableState.EXPECT().ReplicateRequestCancelExternalWorkflowExecutionFailedEvent(event).Return(nil) s.mockUpdateVersion(event) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1635,7 +1635,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeExternalWorkflowExecutionCa } s.mockMutableState.EXPECT().ReplicateExternalWorkflowExecutionCancelRequested(event).Return(nil) s.mockUpdateVersion(event) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1663,7 +1663,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeChildWorkflowExecutionCance } s.mockMutableState.EXPECT().ReplicateChildWorkflowExecutionCanceledEvent(event).Return(nil) s.mockUpdateVersion(event) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1726,7 +1726,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeSignalExternalWorkflowExecu s.mockTaskGenerator.EXPECT().GenerateSignalExternalTasks( event, ).Return(nil) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1754,7 +1754,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeSignalExternalWorkflowExecu } s.mockMutableState.EXPECT().ReplicateSignalExternalWorkflowExecutionFailedEvent(event).Return(nil) s.mockUpdateVersion(event) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1782,7 +1782,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeExternalWorkflowExecutionSi } s.mockMutableState.EXPECT().ReplicateExternalWorkflowExecutionSignaled(event).Return(nil) s.mockUpdateVersion(event) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.Nil(err) @@ -1814,7 +1814,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionUpdateAcce } s.mockMutableState.EXPECT().ReplicateWorkflowExecutionUpdateAcceptedEvent(event).Return(nil) s.mockUpdateVersion(event) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.NoError(err) @@ -1844,7 +1844,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionUpdateComp } s.mockMutableState.EXPECT().ReplicateWorkflowExecutionUpdateCompletedEvent(event).Return(nil) s.mockUpdateVersion(event) - s.mockMutableState.EXPECT().ClearStickyness() + s.mockMutableState.EXPECT().ClearStickyTaskQueue() _, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil) s.NoError(err) diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index c9b49d8a87c..44d71c37e51 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -343,7 +343,7 @@ func (r *TaskGeneratorImpl) GenerateScheduleWorkflowTaskTasks( return serviceerror.NewInternal(fmt.Sprintf("it could be a bug, cannot get pending workflow task: %v", workflowTaskScheduledEventID)) } - if r.mutableState.IsStickyTaskQueueEnabled() { + if r.mutableState.IsStickyTaskQueueSet() { scheduledTime := timestamp.TimeValue(workflowTask.ScheduledTime) scheduleToStartTimeout := timestamp.DurationValue(r.mutableState.GetExecutionInfo().StickyScheduleToStartTimeout) @@ -370,7 +370,12 @@ func (r *TaskGeneratorImpl) GenerateScheduleWorkflowTaskTasks( r.mutableState.AddTasks(&tasks.WorkflowTask{ // TaskID, VisibilityTimestamp is set by shard - WorkflowKey: r.mutableState.GetWorkflowKey(), + WorkflowKey: r.mutableState.GetWorkflowKey(), + // Store current task queue to the transfer task. + // If current task queue becomes sticky in between when this transfer task is created and processed, + // it can't be used at process time, because timeout timer was not created for it, + // because it used to be non-sticky when this transfer task was created here. + // In short, task queue that was "current" when transfer task was created must be used when task is processed. TaskQueue: workflowTask.TaskQueue.GetName(), ScheduledEventID: workflowTask.ScheduledEventID, Version: workflowTask.Version, diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index 1f0e91d784b..c69095a0826 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -141,9 +141,9 @@ func (m *workflowTaskStateMachine) ReplicateTransientWorkflowTaskScheduled() (*W StartedEventID: common.EmptyEventID, RequestID: emptyUUID, WorkflowTaskTimeout: m.ms.GetExecutionInfo().DefaultWorkflowTaskTimeout, - // Task queue is always of kind NORMAL because transient workflow task is created only for - // failed/timed out workflow task and fail/timeout clears stickiness. - TaskQueue: m.ms.TaskQueue(), + // Task queue is always normal (not sticky) because transient workflow task is created only for + // failed/timed out workflow task and fail/timeout clears sticky task queue. + TaskQueue: m.ms.CurrentTaskQueue(), Attempt: m.ms.GetExecutionInfo().WorkflowTaskAttempt, ScheduledTime: timestamp.TimePtr(m.ms.timeSource.Now()), StartedTime: timestamp.UnixOrZeroTimePtr(0), @@ -250,7 +250,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduleToStartTimeoutEvent( // Create corresponding WorkflowTaskScheduled event for speculative WT. scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent( - m.ms.TaskQueue(), + m.ms.CurrentTaskQueue(), workflowTask.WorkflowTaskTimeout, workflowTask.Attempt, timestamp.TimeValue(workflowTask.ScheduledTime).UTC(), @@ -319,7 +319,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( scheduleTime := m.ms.timeSource.Now().UTC() attempt := m.ms.executionInfo.WorkflowTaskAttempt // TaskQueue should already be set from workflow execution started event. - taskQueue := m.ms.TaskQueue() + taskQueue := m.ms.CurrentTaskQueue() // DefaultWorkflowTaskTimeout should already be set from workflow execution started event. startToCloseTimeout := m.getStartToCloseTimeout(m.ms.executionInfo.DefaultWorkflowTaskTimeout, attempt) @@ -533,7 +533,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent( if !workflowTaskScheduledStartedEventsCreated { // Create corresponding WorkflowTaskScheduled and WorkflowTaskStarted events for transient/speculative workflow tasks. scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent( - m.ms.TaskQueue(), + m.ms.CurrentTaskQueue(), workflowTask.WorkflowTaskTimeout, workflowTask.Attempt, timestamp.TimeValue(workflowTask.ScheduledTime).UTC(), @@ -583,7 +583,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskFailedEvent( // Create corresponding WorkflowTaskScheduled and WorkflowTaskStarted events for speculative WT. scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent( - m.ms.TaskQueue(), + m.ms.CurrentTaskQueue(), workflowTask.WorkflowTaskTimeout, workflowTask.Attempt, timestamp.TimeValue(workflowTask.ScheduledTime).UTC(), @@ -642,7 +642,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskTimedOutEvent( // Create corresponding WorkflowTaskScheduled and WorkflowTaskStarted events for speculative WT. scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent( - m.ms.TaskQueue(), + m.ms.CurrentTaskQueue(), workflowTask.WorkflowTaskTimeout, workflowTask.Attempt, timestamp.TimeValue(workflowTask.ScheduledTime).UTC(), @@ -680,10 +680,10 @@ func (m *workflowTaskStateMachine) FailWorkflowTask( incrementAttempt bool, ) { // Increment attempts only if workflow task is failing on non-sticky task queue. - // If it was stick task queue, clear stickiness first and try again before creating transient workflow task. - if m.ms.IsStickyTaskQueueEnabled() { + // If it was sticky task queue, clear sticky task queue first and try again before creating transient workflow task. + if m.ms.IsStickyTaskQueueSet() { incrementAttempt = false - m.ms.ClearStickyness() + m.ms.ClearStickyTaskQueue() } failWorkflowTaskInfo := &WorkflowTaskInfo{ @@ -860,7 +860,7 @@ func (m *workflowTaskStateMachine) GetTransientWorkflowTaskInfo( Version: m.ms.currentVersion, Attributes: &historypb.HistoryEvent_WorkflowTaskScheduledEventAttributes{ WorkflowTaskScheduledEventAttributes: &historypb.WorkflowTaskScheduledEventAttributes{ - TaskQueue: m.ms.TaskQueue(), + TaskQueue: m.ms.CurrentTaskQueue(), StartToCloseTimeout: workflowTask.WorkflowTaskTimeout, Attempt: workflowTask.Attempt, }, @@ -898,7 +898,7 @@ func (m *workflowTaskStateMachine) getWorkflowTaskInfo() *WorkflowTaskInfo { Attempt: m.ms.executionInfo.WorkflowTaskAttempt, StartedTime: m.ms.executionInfo.WorkflowTaskStartedTime, ScheduledTime: m.ms.executionInfo.WorkflowTaskScheduledTime, - TaskQueue: m.ms.TaskQueue(), + TaskQueue: m.ms.CurrentTaskQueue(), OriginalScheduledTime: m.ms.executionInfo.WorkflowTaskOriginalScheduledTime, Type: m.ms.executionInfo.WorkflowTaskType, SuggestContinueAsNew: m.ms.executionInfo.WorkflowTaskSuggestContinueAsNew, diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index 3801eef9d72..6d15e33dc6f 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -233,11 +233,12 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskStarted( // We will clear the stickiness here when that task is delivered to another worker polling from normal queue. // The stickiness info is used by frontend to decide if it should send down partial history or full history. // Sending down partial history will cost the worker an extra fetch to server for the full history. - if mutableState.IsStickyTaskQueueEnabled() && - mutableState.TaskQueue().GetName() != req.PollRequest.TaskQueue.GetName() { + currentTaskQueue := mutableState.CurrentTaskQueue() + if currentTaskQueue.Kind == enumspb.TASK_QUEUE_KIND_STICKY && + currentTaskQueue.GetName() != req.PollRequest.TaskQueue.GetName() { // req.PollRequest.TaskQueue.GetName() may include partition, but we only check when sticky is enabled, // and sticky queue never has partition, so it does not matter. - mutableState.ClearStickyness() + mutableState.ClearStickyTaskQueue() } _, workflowTask, err = mutableState.AddWorkflowTaskStartedEvent( @@ -407,7 +408,6 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( weContext := workflowContext.GetContext() ms := workflowContext.GetMutableState() - executionInfo := ms.GetExecutionInfo() executionStats, err := weContext.LoadExecutionStats(ctx) if err != nil { return nil, err @@ -448,7 +448,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( if err != nil { return nil, err } - ms.ClearStickyness() + ms.ClearStickyTaskQueue() } else { completedEvent, err = ms.AddWorkflowTaskCompletedEvent(currentWorkflowTask, request, maxResetPoints) if err != nil { @@ -468,14 +468,12 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( handler.metricsHandler.Counter(metrics.CompleteWorkflowTaskWithStickyDisabledCounter.GetMetricName()).Record( 1, metrics.OperationTag(metrics.HistoryRespondWorkflowTaskCompletedScope)) - executionInfo.StickyTaskQueue = "" - executionInfo.StickyScheduleToStartTimeout = timestamp.DurationFromSeconds(0) + ms.ClearStickyTaskQueue() } else { handler.metricsHandler.Counter(metrics.CompleteWorkflowTaskWithStickyEnabledCounter.GetMetricName()).Record( 1, metrics.OperationTag(metrics.HistoryRespondWorkflowTaskCompletedScope)) - executionInfo.StickyTaskQueue = request.StickyAttributes.WorkerTaskQueue.GetName() - executionInfo.StickyScheduleToStartTimeout = request.StickyAttributes.GetScheduleToStartTimeout() + ms.SetStickyTaskQueue(request.StickyAttributes.WorkerTaskQueue.GetName(), request.StickyAttributes.GetScheduleToStartTimeout()) } var ( @@ -840,9 +838,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) createRecordWorkflowTaskStarted // before it was started. response.ScheduledEventId = workflowTask.ScheduledEventID response.StartedEventId = workflowTask.StartedEventID - if ms.IsStickyTaskQueueEnabled() { - response.StickyExecutionEnabled = true - } + response.StickyExecutionEnabled = ms.IsStickyTaskQueueSet() response.NextEventId = ms.GetNextEventID() response.Attempt = workflowTask.Attempt response.WorkflowExecutionTaskQueue = &taskqueuepb.TaskQueue{