Skip to content

Commit

Permalink
Update Registry suggest CAN refactor (temporalio#7230)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->

Refactoring to allow Update registry to allow suggesting CAN.

## Why?
<!-- Tell your future self why have you made these changes -->

Preparation for implementation. "Make the change easy, then make the
easy change".

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
stephanos authored Feb 5, 2025
1 parent 1b11ed2 commit de8f006
Show file tree
Hide file tree
Showing 15 changed files with 51 additions and 6 deletions.
1 change: 1 addition & 0 deletions service/history/api/create_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func NewWorkflowWithSignal(
startRequest.StartRequest.Identity,
nil,
nil,
nil,
false,
)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions service/history/api/recordworkflowtaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func Invoke(
req.PollRequest.Identity,
worker_versioning.StampFromCapabilities(req.PollRequest.WorkerVersionCapabilities),
req.GetBuildIdRedirectInfo(),
workflowLease.GetContext().UpdateRegistry(ctx),
false,
)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions service/history/api/respondworkflowtaskcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
request.Identity,
versioningStamp,
nil,
workflowLease.GetContext().UpdateRegistry(ctx),
false,
)
if err != nil {
Expand Down Expand Up @@ -683,6 +684,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
request.Identity,
versioningStamp,
nil,
workflowLease.GetContext().UpdateRegistry(ctx),
false,
)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) createSentUpdate(tv *testvars.TestVa
tv.Any().String(),
nil,
nil,
nil,
false,
)
taskToken := &tokenspb.Task{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ func (s *VerifyFirstWorkflowTaskScheduledSuite) TestVerifyFirstWorkflowTaskSched
uuid.New(),
nil,
nil,
nil,
false,
)
wt.StartedEventID = workflowTasksStartEvent.GetEventId()
Expand Down
1 change: 1 addition & 0 deletions service/history/history_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6332,6 +6332,7 @@ func addWorkflowTaskStartedEventWithRequestID(ms workflow.MutableState, schedule
identity,
nil,
nil,
nil,
false,
)

Expand Down
1 change: 1 addition & 0 deletions service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ func (r *workflowResetterImpl) failWorkflowTask(
consts.IdentityHistoryService,
nil,
nil,
nil,
// skipping versioning checks because this task is not actually dispatched but will fail immediately.
true,
)
Expand Down
2 changes: 2 additions & 0 deletions service/history/ndc/workflow_resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func (s *workflowResetterSuite) TestFailWorkflowTask_WorkflowTaskScheduled() {
consts.IdentityHistoryService,
nil,
nil,
nil,
true,
).Return(&historypb.HistoryEvent{}, workflowTaskStart, nil)
mutableState.EXPECT().AddWorkflowTaskFailedEvent(
Expand Down Expand Up @@ -1469,6 +1470,7 @@ func (s *workflowResetterSuite) TestWorkflowRestartAfterExecutionTimeout() {
consts.IdentityHistoryService,
nil,
nil,
nil,
true,
).Return(&historypb.HistoryEvent{}, workflowTaskStart, nil)

Expand Down
3 changes: 2 additions & 1 deletion service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"go.temporal.io/server/service/history/historybuilder"
"go.temporal.io/server/service/history/hsm"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow/update"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -182,7 +183,7 @@ type (
AddFirstWorkflowTaskScheduled(parentClock *clockspb.VectorClock, event *historypb.HistoryEvent, bypassTaskGeneration bool) (int64, error)
AddWorkflowTaskScheduledEvent(bypassTaskGeneration bool, workflowTaskType enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error)
AddWorkflowTaskScheduledEventAsHeartbeat(bypassTaskGeneration bool, originalScheduledTimestamp *timestamppb.Timestamp, workflowTaskType enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error)
AddWorkflowTaskStartedEvent(int64, string, *taskqueuepb.TaskQueue, string, *commonpb.WorkerVersionStamp, *taskqueuespb.BuildIdRedirectInfo, bool) (*historypb.HistoryEvent, *WorkflowTaskInfo, error)
AddWorkflowTaskStartedEvent(int64, string, *taskqueuepb.TaskQueue, string, *commonpb.WorkerVersionStamp, *taskqueuespb.BuildIdRedirectInfo, update.Registry, bool) (*historypb.HistoryEvent, *WorkflowTaskInfo, error)
AddWorkflowTaskTimedOutEvent(workflowTask *WorkflowTaskInfo) (*historypb.HistoryEvent, error)
AddExternalWorkflowExecutionCancelRequested(int64, namespace.Name, namespace.ID, string, string) (*historypb.HistoryEvent, error)
AddExternalWorkflowExecutionSignaled(int64, namespace.Name, namespace.ID, string, string, string) (*historypb.HistoryEvent, error)
Expand Down
4 changes: 3 additions & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import (
"go.temporal.io/server/service/history/hsm"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow/update"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -2607,13 +2608,14 @@ func (ms *MutableStateImpl) AddWorkflowTaskStartedEvent(
identity string,
versioningStamp *commonpb.WorkerVersionStamp,
redirectInfo *taskqueuespb.BuildIdRedirectInfo,
updateReg update.Registry,
skipVersioningCheck bool,
) (*historypb.HistoryEvent, *WorkflowTaskInfo, error) {
opTag := tag.WorkflowActionWorkflowTaskStarted
if err := ms.checkMutability(opTag); err != nil {
return nil, nil, err
}
return ms.workflowTaskManager.AddWorkflowTaskStartedEvent(scheduledEventID, requestID, taskQueue, identity, versioningStamp, redirectInfo, skipVersioningCheck)
return ms.workflowTaskManager.AddWorkflowTaskStartedEvent(scheduledEventID, requestID, taskQueue, identity, versioningStamp, redirectInfo, skipVersioningCheck, updateReg)
}

func (ms *MutableStateImpl) ApplyWorkflowTaskStartedEvent(
Expand Down
14 changes: 14 additions & 0 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_Valid() {
"",
worker_versioning.StampFromBuildId("b2"),
&taskqueuespb.BuildIdRedirectInfo{AssignedBuildId: "b1"},
nil,
false,
)
s.NoError(err)
Expand All @@ -362,6 +363,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_Invalid() {
"",
worker_versioning.StampFromBuildId("b2"),
&taskqueuespb.BuildIdRedirectInfo{AssignedBuildId: "b0"},
nil,
false,
)
expectedErr := &serviceerror2.ObsoleteDispatchBuildId{}
Expand All @@ -383,6 +385,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_EmptyRedirectInfo() {
"",
worker_versioning.StampFromBuildId("b2"),
nil,
nil,
false,
)
expectedErr := &serviceerror2.ObsoleteDispatchBuildId{}
Expand All @@ -404,6 +407,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_EmptyStamp() {
"",
nil,
&taskqueuespb.BuildIdRedirectInfo{AssignedBuildId: "b1"},
nil,
false,
)
expectedErr := &serviceerror2.ObsoleteDispatchBuildId{}
Expand All @@ -427,6 +431,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_Sticky() {
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand All @@ -452,6 +457,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_StickyInvalid() {
"",
nil,
nil,
nil,
false,
)
expectedErr := &serviceerror2.ObsoleteDispatchBuildId{}
Expand All @@ -474,6 +480,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_UnexpectedSticky() {
"",
nil,
nil,
nil,
false,
)
expectedErr := &serviceerror2.ObsoleteDispatchBuildId{}
Expand Down Expand Up @@ -505,6 +512,7 @@ func (s *mutableStateSuite) createVersionedMutableStateWithCompletedWFT(tq *task
"",
worker_versioning.StampFromBuildId("b1"),
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -684,6 +692,7 @@ func (s *mutableStateSuite) createMutableStateWithVersioningBehavior(
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -733,6 +742,7 @@ func (s *mutableStateSuite) TestUnpinnedTransition() {
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -771,6 +781,7 @@ func (s *mutableStateSuite) TestUnpinnedTransitionFailed() {
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -812,6 +823,7 @@ func (s *mutableStateSuite) TestUnpinnedTransitionTimeout() {
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -969,6 +981,7 @@ func (s *mutableStateSuite) TestOverride_BaseDeploymentUpdatedOnCompletion() {
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -1265,6 +1278,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskStart_CurrentVersionChanged
"random identity",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down
9 changes: 5 additions & 4 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions service/history/workflow/update/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ type (

// FailoverVersion of a Mutable State at the time of Registry creation.
FailoverVersion() int64

// SuggestContinueAsNew returns true if the Registry is reaching its limit.
// Note that this does not apply to in-flight limits, as these are transient.
SuggestContinueAsNew() bool
}

registry struct {
Expand Down Expand Up @@ -485,3 +489,8 @@ func (r *registry) GetSize() int {
func (r *registry) FailoverVersion() int64 {
return r.failoverVersion
}

func (r *registry) SuggestContinueAsNew() bool {
// TODO: implement me
return false
}
6 changes: 6 additions & 0 deletions service/history/workflow/workflow_task_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
package workflow

import (
"cmp"
"fmt"
"math"
"time"
Expand All @@ -50,6 +51,7 @@ import (
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/tqid"
"go.temporal.io/server/common/worker_versioning"
"go.temporal.io/server/service/history/workflow/update"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -469,6 +471,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent(
versioningStamp *commonpb.WorkerVersionStamp,
redirectInfo *taskqueuespb.BuildIdRedirectInfo,
skipVersioningCheck bool,
updateReg update.Registry,
) (*historypb.HistoryEvent, *WorkflowTaskInfo, error) {
opTag := tag.WorkflowActionWorkflowTaskStarted
workflowTask := m.GetWorkflowTaskByID(scheduledEventID)
Expand All @@ -491,6 +494,9 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent(
// consistent between the started event in history and the event that was sent to the SDK
// that resulted in the successful completion.
suggestContinueAsNew, historySizeBytes := m.getHistorySizeInfo()
if updateReg != nil {
suggestContinueAsNew = cmp.Or(suggestContinueAsNew, updateReg.SuggestContinueAsNew())
}

workflowTask, scheduledEventCreatedForRedirect, redirectCounter, err := m.processBuildIdRedirectInfo(versioningStamp, workflowTask, taskQueue, redirectInfo, skipVersioningCheck)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (c *mutationTestCase) startWFT(
"",
nil,
nil,
nil,
false,
)
if err != nil {
Expand Down Expand Up @@ -464,6 +465,7 @@ func TestGetNexusCompletion(t *testing.T) {
"---",
nil,
nil,
nil,
false,
)
require.NoError(t, err)
Expand Down

0 comments on commit de8f006

Please sign in to comment.