diff --git a/service/history/api/create_workflow_util.go b/service/history/api/create_workflow_util.go index 07b4d226ad7..49a7a7e0325 100644 --- a/service/history/api/create_workflow_util.go +++ b/service/history/api/create_workflow_util.go @@ -136,6 +136,7 @@ func NewWorkflowWithSignal( startRequest.StartRequest.Identity, nil, nil, + nil, false, ) if err != nil { diff --git a/service/history/api/recordworkflowtaskstarted/api.go b/service/history/api/recordworkflowtaskstarted/api.go index b9fc1663e6c..8c9b191230d 100644 --- a/service/history/api/recordworkflowtaskstarted/api.go +++ b/service/history/api/recordworkflowtaskstarted/api.go @@ -182,6 +182,7 @@ func Invoke( req.PollRequest.Identity, worker_versioning.StampFromCapabilities(req.PollRequest.WorkerVersionCapabilities), req.GetBuildIdRedirectInfo(), + workflowLease.GetContext().UpdateRegistry(ctx), false, ) if err != nil { diff --git a/service/history/api/respondworkflowtaskcompleted/api.go b/service/history/api/respondworkflowtaskcompleted/api.go index bf93f1021ea..6fc2da25d76 100644 --- a/service/history/api/respondworkflowtaskcompleted/api.go +++ b/service/history/api/respondworkflowtaskcompleted/api.go @@ -561,6 +561,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( request.Identity, versioningStamp, nil, + workflowLease.GetContext().UpdateRegistry(ctx), false, ) if err != nil { @@ -683,6 +684,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( request.Identity, versioningStamp, nil, + workflowLease.GetContext().UpdateRegistry(ctx), false, ) if err != nil { diff --git a/service/history/api/respondworkflowtaskcompleted/api_test.go b/service/history/api/respondworkflowtaskcompleted/api_test.go index 9ddd20eee56..11e1b91de58 100644 --- a/service/history/api/respondworkflowtaskcompleted/api_test.go +++ b/service/history/api/respondworkflowtaskcompleted/api_test.go @@ -595,6 +595,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) createSentUpdate(tv *testvars.TestVa tv.Any().String(), nil, nil, + nil, false, ) taskToken := &tokenspb.Task{ diff --git a/service/history/api/verifyfirstworkflowtaskscheduled/api_test.go b/service/history/api/verifyfirstworkflowtaskscheduled/api_test.go index 611d2772517..c5972af7449 100644 --- a/service/history/api/verifyfirstworkflowtaskscheduled/api_test.go +++ b/service/history/api/verifyfirstworkflowtaskscheduled/api_test.go @@ -257,6 +257,7 @@ func (s *VerifyFirstWorkflowTaskScheduledSuite) TestVerifyFirstWorkflowTaskSched uuid.New(), nil, nil, + nil, false, ) wt.StartedEventID = workflowTasksStartEvent.GetEventId() diff --git a/service/history/history_engine_test.go b/service/history/history_engine_test.go index 2bdbc5a940c..c9bf569c570 100644 --- a/service/history/history_engine_test.go +++ b/service/history/history_engine_test.go @@ -6332,6 +6332,7 @@ func addWorkflowTaskStartedEventWithRequestID(ms workflow.MutableState, schedule identity, nil, nil, + nil, false, ) diff --git a/service/history/ndc/workflow_resetter.go b/service/history/ndc/workflow_resetter.go index d57be0c97a6..5c9514604de 100644 --- a/service/history/ndc/workflow_resetter.go +++ b/service/history/ndc/workflow_resetter.go @@ -525,6 +525,7 @@ func (r *workflowResetterImpl) failWorkflowTask( consts.IdentityHistoryService, nil, nil, + nil, // skipping versioning checks because this task is not actually dispatched but will fail immediately. true, ) diff --git a/service/history/ndc/workflow_resetter_test.go b/service/history/ndc/workflow_resetter_test.go index 8db93aa55c3..e1ef02cccad 100644 --- a/service/history/ndc/workflow_resetter_test.go +++ b/service/history/ndc/workflow_resetter_test.go @@ -400,6 +400,7 @@ func (s *workflowResetterSuite) TestFailWorkflowTask_WorkflowTaskScheduled() { consts.IdentityHistoryService, nil, nil, + nil, true, ).Return(&historypb.HistoryEvent{}, workflowTaskStart, nil) mutableState.EXPECT().AddWorkflowTaskFailedEvent( @@ -1468,6 +1469,7 @@ func (s *workflowResetterSuite) TestWorkflowRestartAfterExecutionTimeout() { consts.IdentityHistoryService, nil, nil, + nil, true, ).Return(&historypb.HistoryEvent{}, workflowTaskStart, nil) diff --git a/service/history/workflow/mutable_state.go b/service/history/workflow/mutable_state.go index d35afae6259..4e87d8c0ce5 100644 --- a/service/history/workflow/mutable_state.go +++ b/service/history/workflow/mutable_state.go @@ -54,6 +54,7 @@ import ( "go.temporal.io/server/service/history/historybuilder" "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/history/workflow/update" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -182,7 +183,7 @@ type ( AddFirstWorkflowTaskScheduled(parentClock *clockspb.VectorClock, event *historypb.HistoryEvent, bypassTaskGeneration bool) (int64, error) AddWorkflowTaskScheduledEvent(bypassTaskGeneration bool, workflowTaskType enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error) AddWorkflowTaskScheduledEventAsHeartbeat(bypassTaskGeneration bool, originalScheduledTimestamp *timestamppb.Timestamp, workflowTaskType enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error) - AddWorkflowTaskStartedEvent(int64, string, *taskqueuepb.TaskQueue, string, *commonpb.WorkerVersionStamp, *taskqueuespb.BuildIdRedirectInfo, bool) (*historypb.HistoryEvent, *WorkflowTaskInfo, error) + AddWorkflowTaskStartedEvent(int64, string, *taskqueuepb.TaskQueue, string, *commonpb.WorkerVersionStamp, *taskqueuespb.BuildIdRedirectInfo, update.Registry, bool) (*historypb.HistoryEvent, *WorkflowTaskInfo, error) AddWorkflowTaskTimedOutEvent(workflowTask *WorkflowTaskInfo) (*historypb.HistoryEvent, error) AddExternalWorkflowExecutionCancelRequested(int64, namespace.Name, namespace.ID, string, string) (*historypb.HistoryEvent, error) AddExternalWorkflowExecutionSignaled(int64, namespace.Name, namespace.ID, string, string, string) (*historypb.HistoryEvent, error) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 47d50047da3..0ceebde9ece 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -86,6 +86,7 @@ import ( "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/history/workflow/update" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" @@ -2585,13 +2586,14 @@ func (ms *MutableStateImpl) AddWorkflowTaskStartedEvent( identity string, versioningStamp *commonpb.WorkerVersionStamp, redirectInfo *taskqueuespb.BuildIdRedirectInfo, + updateReg update.Registry, skipVersioningCheck bool, ) (*historypb.HistoryEvent, *WorkflowTaskInfo, error) { opTag := tag.WorkflowActionWorkflowTaskStarted if err := ms.checkMutability(opTag); err != nil { return nil, nil, err } - return ms.workflowTaskManager.AddWorkflowTaskStartedEvent(scheduledEventID, requestID, taskQueue, identity, versioningStamp, redirectInfo, skipVersioningCheck) + return ms.workflowTaskManager.AddWorkflowTaskStartedEvent(scheduledEventID, requestID, taskQueue, identity, versioningStamp, redirectInfo, skipVersioningCheck, updateReg) } func (ms *MutableStateImpl) ApplyWorkflowTaskStartedEvent( diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 2316a1dfe32..6fe70b67880 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -339,6 +339,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_Valid() { "", worker_versioning.StampFromBuildId("b2"), &taskqueuespb.BuildIdRedirectInfo{AssignedBuildId: "b1"}, + nil, false, ) s.NoError(err) @@ -362,6 +363,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_Invalid() { "", worker_versioning.StampFromBuildId("b2"), &taskqueuespb.BuildIdRedirectInfo{AssignedBuildId: "b0"}, + nil, false, ) expectedErr := &serviceerror2.ObsoleteDispatchBuildId{} @@ -383,6 +385,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_EmptyRedirectInfo() { "", worker_versioning.StampFromBuildId("b2"), nil, + nil, false, ) expectedErr := &serviceerror2.ObsoleteDispatchBuildId{} @@ -404,6 +407,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_EmptyStamp() { "", nil, &taskqueuespb.BuildIdRedirectInfo{AssignedBuildId: "b1"}, + nil, false, ) expectedErr := &serviceerror2.ObsoleteDispatchBuildId{} @@ -427,6 +431,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_Sticky() { "", nil, nil, + nil, false, ) s.NoError(err) @@ -452,6 +457,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_StickyInvalid() { "", nil, nil, + nil, false, ) expectedErr := &serviceerror2.ObsoleteDispatchBuildId{} @@ -474,6 +480,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_UnexpectedSticky() { "", nil, nil, + nil, false, ) expectedErr := &serviceerror2.ObsoleteDispatchBuildId{} @@ -505,6 +512,7 @@ func (s *mutableStateSuite) createVersionedMutableStateWithCompletedWFT(tq *task "", worker_versioning.StampFromBuildId("b1"), nil, + nil, false, ) s.NoError(err) @@ -684,6 +692,7 @@ func (s *mutableStateSuite) createMutableStateWithVersioningBehavior( "", nil, nil, + nil, false, ) s.NoError(err) @@ -733,6 +742,7 @@ func (s *mutableStateSuite) TestUnpinnedTransition() { "", nil, nil, + nil, false, ) s.NoError(err) @@ -771,6 +781,7 @@ func (s *mutableStateSuite) TestUnpinnedTransitionFailed() { "", nil, nil, + nil, false, ) s.NoError(err) @@ -812,6 +823,7 @@ func (s *mutableStateSuite) TestUnpinnedTransitionTimeout() { "", nil, nil, + nil, false, ) s.NoError(err) @@ -969,6 +981,7 @@ func (s *mutableStateSuite) TestOverride_BaseDeploymentUpdatedOnCompletion() { "", nil, nil, + nil, false, ) s.NoError(err) @@ -1265,6 +1278,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskStart_CurrentVersionChanged "random identity", nil, nil, + nil, false, ) s.NoError(err) diff --git a/service/history/workflow/mutable_state_mock.go b/service/history/workflow/mutable_state_mock.go index 83e600bdb95..124a135f375 100644 --- a/service/history/workflow/mutable_state_mock.go +++ b/service/history/workflow/mutable_state_mock.go @@ -61,6 +61,7 @@ import ( historybuilder "go.temporal.io/server/service/history/historybuilder" hsm "go.temporal.io/server/service/history/hsm" tasks "go.temporal.io/server/service/history/tasks" + update0 "go.temporal.io/server/service/history/workflow/update" gomock "go.uber.org/mock/gomock" durationpb "google.golang.org/protobuf/types/known/durationpb" timestamppb "google.golang.org/protobuf/types/known/timestamppb" @@ -883,9 +884,9 @@ func (mr *MockMutableStateMockRecorder) AddWorkflowTaskScheduledEventAsHeartbeat } // AddWorkflowTaskStartedEvent mocks base method. -func (m *MockMutableState) AddWorkflowTaskStartedEvent(arg0 int64, arg1 string, arg2 *taskqueue.TaskQueue, arg3 string, arg4 *common.WorkerVersionStamp, arg5 *taskqueue0.BuildIdRedirectInfo, arg6 bool) (*history.HistoryEvent, *WorkflowTaskInfo, error) { +func (m *MockMutableState) AddWorkflowTaskStartedEvent(arg0 int64, arg1 string, arg2 *taskqueue.TaskQueue, arg3 string, arg4 *common.WorkerVersionStamp, arg5 *taskqueue0.BuildIdRedirectInfo, arg6 update0.Registry, arg7 bool) (*history.HistoryEvent, *WorkflowTaskInfo, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddWorkflowTaskStartedEvent", arg0, arg1, arg2, arg3, arg4, arg5, arg6) + ret := m.ctrl.Call(m, "AddWorkflowTaskStartedEvent", arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7) ret0, _ := ret[0].(*history.HistoryEvent) ret1, _ := ret[1].(*WorkflowTaskInfo) ret2, _ := ret[2].(error) @@ -893,9 +894,9 @@ func (m *MockMutableState) AddWorkflowTaskStartedEvent(arg0 int64, arg1 string, } // AddWorkflowTaskStartedEvent indicates an expected call of AddWorkflowTaskStartedEvent. -func (mr *MockMutableStateMockRecorder) AddWorkflowTaskStartedEvent(arg0, arg1, arg2, arg3, arg4, arg5, arg6 any) *gomock.Call { +func (mr *MockMutableStateMockRecorder) AddWorkflowTaskStartedEvent(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkflowTaskStartedEvent", reflect.TypeOf((*MockMutableState)(nil).AddWorkflowTaskStartedEvent), arg0, arg1, arg2, arg3, arg4, arg5, arg6) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkflowTaskStartedEvent", reflect.TypeOf((*MockMutableState)(nil).AddWorkflowTaskStartedEvent), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7) } // AddWorkflowTaskTimedOutEvent mocks base method. diff --git a/service/history/workflow/update/registry.go b/service/history/workflow/update/registry.go index 4d7023e1170..cba2db19e50 100644 --- a/service/history/workflow/update/registry.go +++ b/service/history/workflow/update/registry.go @@ -99,6 +99,10 @@ type ( // FailoverVersion of a Mutable State at the time of Registry creation. FailoverVersion() int64 + + // SuggestContinueAsNew returns true if the Registry is reaching its limit. + // Note that this does not apply to in-flight limits, as these are transient. + SuggestContinueAsNew() bool } registry struct { @@ -485,3 +489,8 @@ func (r *registry) GetSize() int { func (r *registry) FailoverVersion() int64 { return r.failoverVersion } + +func (r *registry) SuggestContinueAsNew() bool { + // TODO: implement me + return false +} diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index d0f7af79959..3c8557e784c 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -27,6 +27,7 @@ package workflow import ( + "cmp" "fmt" "math" "time" @@ -50,6 +51,7 @@ import ( "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/tqid" "go.temporal.io/server/common/worker_versioning" + "go.temporal.io/server/service/history/workflow/update" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -469,6 +471,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( versioningStamp *commonpb.WorkerVersionStamp, redirectInfo *taskqueuespb.BuildIdRedirectInfo, skipVersioningCheck bool, + updateReg update.Registry, ) (*historypb.HistoryEvent, *WorkflowTaskInfo, error) { opTag := tag.WorkflowActionWorkflowTaskStarted workflowTask := m.GetWorkflowTaskByID(scheduledEventID) @@ -491,6 +494,9 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( // consistent between the started event in history and the event that was sent to the SDK // that resulted in the successful completion. suggestContinueAsNew, historySizeBytes := m.getHistorySizeInfo() + if updateReg != nil { + suggestContinueAsNew = cmp.Or(suggestContinueAsNew, updateReg.SuggestContinueAsNew()) + } workflowTask, scheduledEventCreatedForRedirect, redirectCounter, err := m.processBuildIdRedirectInfo(versioningStamp, workflowTask, taskQueue, redirectInfo, skipVersioningCheck) if err != nil { diff --git a/service/history/workflow/workflow_test/mutable_state_impl_test.go b/service/history/workflow/workflow_test/mutable_state_impl_test.go index 0487f9109d3..dacee3ec51f 100644 --- a/service/history/workflow/workflow_test/mutable_state_impl_test.go +++ b/service/history/workflow/workflow_test/mutable_state_impl_test.go @@ -157,6 +157,7 @@ func (c *mutationTestCase) startWFT( "", nil, nil, + nil, false, ) if err != nil { @@ -464,6 +465,7 @@ func TestGetNexusCompletion(t *testing.T) { "---", nil, nil, + nil, false, ) require.NoError(t, err)