Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Registry suggest CAN refactor #7230

Merged
merged 1 commit into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions service/history/api/create_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func NewWorkflowWithSignal(
startRequest.StartRequest.Identity,
nil,
nil,
nil,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New Workflow, there's no Update registry ...

false,
)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions service/history/api/recordworkflowtaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func Invoke(
req.PollRequest.Identity,
worker_versioning.StampFromCapabilities(req.PollRequest.WorkerVersionCapabilities),
req.GetBuildIdRedirectInfo(),
workflowLease.GetContext().UpdateRegistry(ctx),
false,
)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions service/history/api/respondworkflowtaskcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
request.Identity,
versioningStamp,
nil,
workflowLease.GetContext().UpdateRegistry(ctx),
false,
)
if err != nil {
Expand Down Expand Up @@ -683,6 +684,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
request.Identity,
versioningStamp,
nil,
workflowLease.GetContext().UpdateRegistry(ctx),
false,
)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) createSentUpdate(tv *testvars.TestVa
tv.Any().String(),
nil,
nil,
nil,
false,
)
taskToken := &tokenspb.Task{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ func (s *VerifyFirstWorkflowTaskScheduledSuite) TestVerifyFirstWorkflowTaskSched
uuid.New(),
nil,
nil,
nil,
false,
)
wt.StartedEventID = workflowTasksStartEvent.GetEventId()
Expand Down
1 change: 1 addition & 0 deletions service/history/history_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6332,6 +6332,7 @@ func addWorkflowTaskStartedEventWithRequestID(ms workflow.MutableState, schedule
identity,
nil,
nil,
nil,
false,
)

Expand Down
1 change: 1 addition & 0 deletions service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ func (r *workflowResetterImpl) failWorkflowTask(
consts.IdentityHistoryService,
nil,
nil,
nil,
// skipping versioning checks because this task is not actually dispatched but will fail immediately.
true,
)
Expand Down
2 changes: 2 additions & 0 deletions service/history/ndc/workflow_resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func (s *workflowResetterSuite) TestFailWorkflowTask_WorkflowTaskScheduled() {
consts.IdentityHistoryService,
nil,
nil,
nil,
true,
).Return(&historypb.HistoryEvent{}, workflowTaskStart, nil)
mutableState.EXPECT().AddWorkflowTaskFailedEvent(
Expand Down Expand Up @@ -1468,6 +1469,7 @@ func (s *workflowResetterSuite) TestWorkflowRestartAfterExecutionTimeout() {
consts.IdentityHistoryService,
nil,
nil,
nil,
true,
).Return(&historypb.HistoryEvent{}, workflowTaskStart, nil)

Expand Down
3 changes: 2 additions & 1 deletion service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"go.temporal.io/server/service/history/historybuilder"
"go.temporal.io/server/service/history/hsm"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow/update"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -182,7 +183,7 @@ type (
AddFirstWorkflowTaskScheduled(parentClock *clockspb.VectorClock, event *historypb.HistoryEvent, bypassTaskGeneration bool) (int64, error)
AddWorkflowTaskScheduledEvent(bypassTaskGeneration bool, workflowTaskType enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error)
AddWorkflowTaskScheduledEventAsHeartbeat(bypassTaskGeneration bool, originalScheduledTimestamp *timestamppb.Timestamp, workflowTaskType enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error)
AddWorkflowTaskStartedEvent(int64, string, *taskqueuepb.TaskQueue, string, *commonpb.WorkerVersionStamp, *taskqueuespb.BuildIdRedirectInfo, bool) (*historypb.HistoryEvent, *WorkflowTaskInfo, error)
AddWorkflowTaskStartedEvent(int64, string, *taskqueuepb.TaskQueue, string, *commonpb.WorkerVersionStamp, *taskqueuespb.BuildIdRedirectInfo, update.Registry, bool) (*historypb.HistoryEvent, *WorkflowTaskInfo, error)
AddWorkflowTaskTimedOutEvent(workflowTask *WorkflowTaskInfo) (*historypb.HistoryEvent, error)
AddExternalWorkflowExecutionCancelRequested(int64, namespace.Name, namespace.ID, string, string) (*historypb.HistoryEvent, error)
AddExternalWorkflowExecutionSignaled(int64, namespace.Name, namespace.ID, string, string, string) (*historypb.HistoryEvent, error)
Expand Down
4 changes: 3 additions & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import (
"go.temporal.io/server/service/history/hsm"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow/update"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -2585,13 +2586,14 @@ func (ms *MutableStateImpl) AddWorkflowTaskStartedEvent(
identity string,
versioningStamp *commonpb.WorkerVersionStamp,
redirectInfo *taskqueuespb.BuildIdRedirectInfo,
updateReg update.Registry,
skipVersioningCheck bool,
) (*historypb.HistoryEvent, *WorkflowTaskInfo, error) {
opTag := tag.WorkflowActionWorkflowTaskStarted
if err := ms.checkMutability(opTag); err != nil {
return nil, nil, err
}
return ms.workflowTaskManager.AddWorkflowTaskStartedEvent(scheduledEventID, requestID, taskQueue, identity, versioningStamp, redirectInfo, skipVersioningCheck)
return ms.workflowTaskManager.AddWorkflowTaskStartedEvent(scheduledEventID, requestID, taskQueue, identity, versioningStamp, redirectInfo, skipVersioningCheck, updateReg)
}

func (ms *MutableStateImpl) ApplyWorkflowTaskStartedEvent(
Expand Down
14 changes: 14 additions & 0 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_Valid() {
"",
worker_versioning.StampFromBuildId("b2"),
&taskqueuespb.BuildIdRedirectInfo{AssignedBuildId: "b1"},
nil,
false,
)
s.NoError(err)
Expand All @@ -362,6 +363,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_Invalid() {
"",
worker_versioning.StampFromBuildId("b2"),
&taskqueuespb.BuildIdRedirectInfo{AssignedBuildId: "b0"},
nil,
false,
)
expectedErr := &serviceerror2.ObsoleteDispatchBuildId{}
Expand All @@ -383,6 +385,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_EmptyRedirectInfo() {
"",
worker_versioning.StampFromBuildId("b2"),
nil,
nil,
false,
)
expectedErr := &serviceerror2.ObsoleteDispatchBuildId{}
Expand All @@ -404,6 +407,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_EmptyStamp() {
"",
nil,
&taskqueuespb.BuildIdRedirectInfo{AssignedBuildId: "b1"},
nil,
false,
)
expectedErr := &serviceerror2.ObsoleteDispatchBuildId{}
Expand All @@ -427,6 +431,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_Sticky() {
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand All @@ -452,6 +457,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_StickyInvalid() {
"",
nil,
nil,
nil,
false,
)
expectedErr := &serviceerror2.ObsoleteDispatchBuildId{}
Expand All @@ -474,6 +480,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_UnexpectedSticky() {
"",
nil,
nil,
nil,
false,
)
expectedErr := &serviceerror2.ObsoleteDispatchBuildId{}
Expand Down Expand Up @@ -505,6 +512,7 @@ func (s *mutableStateSuite) createVersionedMutableStateWithCompletedWFT(tq *task
"",
worker_versioning.StampFromBuildId("b1"),
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -684,6 +692,7 @@ func (s *mutableStateSuite) createMutableStateWithVersioningBehavior(
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -733,6 +742,7 @@ func (s *mutableStateSuite) TestUnpinnedTransition() {
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -771,6 +781,7 @@ func (s *mutableStateSuite) TestUnpinnedTransitionFailed() {
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -812,6 +823,7 @@ func (s *mutableStateSuite) TestUnpinnedTransitionTimeout() {
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -969,6 +981,7 @@ func (s *mutableStateSuite) TestOverride_BaseDeploymentUpdatedOnCompletion() {
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -1265,6 +1278,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskStart_CurrentVersionChanged
"random identity",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down
9 changes: 5 additions & 4 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions service/history/workflow/update/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ type (

// FailoverVersion of a Mutable State at the time of Registry creation.
FailoverVersion() int64

// SuggestContinueAsNew returns true if the Registry is reaching its limit.
// Note that this does not apply to in-flight limits, as these are transient.
SuggestContinueAsNew() bool
}

registry struct {
Expand Down Expand Up @@ -485,3 +489,8 @@ func (r *registry) GetSize() int {
func (r *registry) FailoverVersion() int64 {
return r.failoverVersion
}

func (r *registry) SuggestContinueAsNew() bool {
// TODO: implement me
return false
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation to follow ...

}
6 changes: 6 additions & 0 deletions service/history/workflow/workflow_task_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
package workflow

import (
"cmp"
"fmt"
"math"
"time"
Expand All @@ -50,6 +51,7 @@ import (
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/tqid"
"go.temporal.io/server/common/worker_versioning"
"go.temporal.io/server/service/history/workflow/update"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -469,6 +471,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent(
versioningStamp *commonpb.WorkerVersionStamp,
redirectInfo *taskqueuespb.BuildIdRedirectInfo,
skipVersioningCheck bool,
updateReg update.Registry,
) (*historypb.HistoryEvent, *WorkflowTaskInfo, error) {
opTag := tag.WorkflowActionWorkflowTaskStarted
workflowTask := m.GetWorkflowTaskByID(scheduledEventID)
Expand All @@ -491,6 +494,9 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent(
// consistent between the started event in history and the event that was sent to the SDK
// that resulted in the successful completion.
suggestContinueAsNew, historySizeBytes := m.getHistorySizeInfo()
if updateReg != nil {
suggestContinueAsNew = cmp.Or(suggestContinueAsNew, updateReg.SuggestContinueAsNew())
}

workflowTask, scheduledEventCreatedForRedirect, redirectCounter, err := m.processBuildIdRedirectInfo(versioningStamp, workflowTask, taskQueue, redirectInfo, skipVersioningCheck)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (c *mutationTestCase) startWFT(
"",
nil,
nil,
nil,
false,
)
if err != nil {
Expand Down Expand Up @@ -464,6 +465,7 @@ func TestGetNexusCompletion(t *testing.T) {
"---",
nil,
nil,
nil,
false,
)
require.NoError(t, err)
Expand Down
Loading