Skip to content

Commit

Permalink
Update Registry suggest CAN
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos committed Feb 4, 2025
1 parent 72c6218 commit 299e672
Show file tree
Hide file tree
Showing 14 changed files with 49 additions and 7 deletions.
1 change: 1 addition & 0 deletions service/history/api/create_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func NewWorkflowWithSignal(
startRequest.StartRequest.Identity,
nil,
nil,
nil,
false,
)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions service/history/api/recordworkflowtaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func Invoke(
req.PollRequest.Identity,
worker_versioning.StampFromCapabilities(req.PollRequest.WorkerVersionCapabilities),
req.GetBuildIdRedirectInfo(),
workflowLease.GetContext().UpdateRegistry(ctx),
false,
)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions service/history/api/respondworkflowtaskcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
request.Identity,
versioningStamp,
nil,
nil, // TODO
false,
)
if err != nil {
Expand Down Expand Up @@ -683,6 +684,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
request.Identity,
versioningStamp,
nil,
nil, // TODO
false,
)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) createSentUpdate(tv *testvars.TestVa
tv.Any().String(),
nil,
nil,
nil,
false,
)
taskToken := &tokenspb.Task{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ func (s *VerifyFirstWorkflowTaskScheduledSuite) TestVerifyFirstWorkflowTaskSched
uuid.New(),
nil,
nil,
nil,
false,
)
wt.StartedEventID = workflowTasksStartEvent.GetEventId()
Expand Down
1 change: 1 addition & 0 deletions service/history/history_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6332,6 +6332,7 @@ func addWorkflowTaskStartedEventWithRequestID(ms workflow.MutableState, schedule
identity,
nil,
nil,
nil,
false,
)

Expand Down
3 changes: 2 additions & 1 deletion service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ func (r *workflowResetterImpl) failWorkflowTask(
consts.IdentityHistoryService,
nil,
nil,
nil,
// skipping versioning checks because this task is not actually dispatched but will fail immediately.
true,
)
Expand Down Expand Up @@ -959,7 +960,7 @@ func reapplyEvents(
// This function is intended to pick up all the events for a child that was already initialized before the reset point.
// Re-applying these events is needed to support reconnecting of the child with parent.
func reapplyChildEvents(mutableState workflow.MutableState, event *historypb.HistoryEvent) error { // nolint:revive
switch event.GetEventType() { // nolint:exhaustive
switch event.GetEventType() { // nolint:exhaustive
case enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED:
childEventAttributes := event.GetStartChildWorkflowExecutionFailedEventAttributes()
_, childExists := mutableState.GetChildExecutionInfo(childEventAttributes.GetInitiatedEventId())
Expand Down
3 changes: 2 additions & 1 deletion service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"go.temporal.io/server/service/history/historybuilder"
"go.temporal.io/server/service/history/hsm"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow/update"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -182,7 +183,7 @@ type (
AddFirstWorkflowTaskScheduled(parentClock *clockspb.VectorClock, event *historypb.HistoryEvent, bypassTaskGeneration bool) (int64, error)
AddWorkflowTaskScheduledEvent(bypassTaskGeneration bool, workflowTaskType enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error)
AddWorkflowTaskScheduledEventAsHeartbeat(bypassTaskGeneration bool, originalScheduledTimestamp *timestamppb.Timestamp, workflowTaskType enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error)
AddWorkflowTaskStartedEvent(int64, string, *taskqueuepb.TaskQueue, string, *commonpb.WorkerVersionStamp, *taskqueuespb.BuildIdRedirectInfo, bool) (*historypb.HistoryEvent, *WorkflowTaskInfo, error)
AddWorkflowTaskStartedEvent(int64, string, *taskqueuepb.TaskQueue, string, *commonpb.WorkerVersionStamp, *taskqueuespb.BuildIdRedirectInfo, update.Registry, bool) (*historypb.HistoryEvent, *WorkflowTaskInfo, error)
AddWorkflowTaskTimedOutEvent(workflowTask *WorkflowTaskInfo) (*historypb.HistoryEvent, error)
AddExternalWorkflowExecutionCancelRequested(int64, namespace.Name, namespace.ID, string, string) (*historypb.HistoryEvent, error)
AddExternalWorkflowExecutionSignaled(int64, namespace.Name, namespace.ID, string, string, string) (*historypb.HistoryEvent, error)
Expand Down
4 changes: 3 additions & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import (
"go.temporal.io/server/service/history/hsm"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow/update"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -2585,13 +2586,14 @@ func (ms *MutableStateImpl) AddWorkflowTaskStartedEvent(
identity string,
versioningStamp *commonpb.WorkerVersionStamp,
redirectInfo *taskqueuespb.BuildIdRedirectInfo,
updateReg update.Registry,
skipVersioningCheck bool,
) (*historypb.HistoryEvent, *WorkflowTaskInfo, error) {
opTag := tag.WorkflowActionWorkflowTaskStarted
if err := ms.checkMutability(opTag); err != nil {
return nil, nil, err
}
return ms.workflowTaskManager.AddWorkflowTaskStartedEvent(scheduledEventID, requestID, taskQueue, identity, versioningStamp, redirectInfo, skipVersioningCheck)
return ms.workflowTaskManager.AddWorkflowTaskStartedEvent(scheduledEventID, requestID, taskQueue, identity, versioningStamp, redirectInfo, skipVersioningCheck, updateReg)
}

func (ms *MutableStateImpl) ApplyWorkflowTaskStartedEvent(
Expand Down
14 changes: 14 additions & 0 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_Valid() {
"",
worker_versioning.StampFromBuildId("b2"),
&taskqueuespb.BuildIdRedirectInfo{AssignedBuildId: "b1"},
nil,
false,
)
s.NoError(err)
Expand All @@ -362,6 +363,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_Invalid() {
"",
worker_versioning.StampFromBuildId("b2"),
&taskqueuespb.BuildIdRedirectInfo{AssignedBuildId: "b0"},
nil,
false,
)
expectedErr := &serviceerror2.ObsoleteDispatchBuildId{}
Expand All @@ -383,6 +385,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_EmptyRedirectInfo() {
"",
worker_versioning.StampFromBuildId("b2"),
nil,
nil,
false,
)
expectedErr := &serviceerror2.ObsoleteDispatchBuildId{}
Expand All @@ -404,6 +407,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_EmptyStamp() {
"",
nil,
&taskqueuespb.BuildIdRedirectInfo{AssignedBuildId: "b1"},
nil,
false,
)
expectedErr := &serviceerror2.ObsoleteDispatchBuildId{}
Expand All @@ -427,6 +431,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_Sticky() {
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand All @@ -452,6 +457,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_StickyInvalid() {
"",
nil,
nil,
nil,
false,
)
expectedErr := &serviceerror2.ObsoleteDispatchBuildId{}
Expand All @@ -474,6 +480,7 @@ func (s *mutableStateSuite) TestRedirectInfoValidation_UnexpectedSticky() {
"",
nil,
nil,
nil,
false,
)
expectedErr := &serviceerror2.ObsoleteDispatchBuildId{}
Expand Down Expand Up @@ -505,6 +512,7 @@ func (s *mutableStateSuite) createVersionedMutableStateWithCompletedWFT(tq *task
"",
worker_versioning.StampFromBuildId("b1"),
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -684,6 +692,7 @@ func (s *mutableStateSuite) createMutableStateWithVersioningBehavior(
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -733,6 +742,7 @@ func (s *mutableStateSuite) TestUnpinnedTransition() {
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -771,6 +781,7 @@ func (s *mutableStateSuite) TestUnpinnedTransitionFailed() {
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -812,6 +823,7 @@ func (s *mutableStateSuite) TestUnpinnedTransitionTimeout() {
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -969,6 +981,7 @@ func (s *mutableStateSuite) TestOverride_BaseDeploymentUpdatedOnCompletion() {
"",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down Expand Up @@ -1265,6 +1278,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskStart_CurrentVersionChanged
"random identity",
nil,
nil,
nil,
false,
)
s.NoError(err)
Expand Down
9 changes: 5 additions & 4 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions service/history/workflow/update/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ type (

// FailoverVersion of a Mutable State at the time of Registry creation.
FailoverVersion() int64

// SuggestContinueAsNew returns true if the Registry is reaching its limit.
SuggestContinueAsNew() bool
}

registry struct {
Expand Down Expand Up @@ -485,3 +488,8 @@ func (r *registry) GetSize() int {
func (r *registry) FailoverVersion() int64 {
return r.failoverVersion
}

func (r *registry) SuggestContinueAsNew() bool {
// TODO: implement me
return false
}
6 changes: 6 additions & 0 deletions service/history/workflow/workflow_task_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
package workflow

import (
"cmp"
"fmt"
"math"
"time"
Expand All @@ -50,6 +51,7 @@ import (
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/tqid"
"go.temporal.io/server/common/worker_versioning"
"go.temporal.io/server/service/history/workflow/update"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -469,6 +471,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent(
versioningStamp *commonpb.WorkerVersionStamp,
redirectInfo *taskqueuespb.BuildIdRedirectInfo,
skipVersioningCheck bool,
updateReg update.Registry,
) (*historypb.HistoryEvent, *WorkflowTaskInfo, error) {
opTag := tag.WorkflowActionWorkflowTaskStarted
workflowTask := m.GetWorkflowTaskByID(scheduledEventID)
Expand All @@ -491,6 +494,9 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent(
// consistent between the started event in history and the event that was sent to the SDK
// that resulted in the successful completion.
suggestContinueAsNew, historySizeBytes := m.getHistorySizeInfo()
if updateReg != nil {
suggestContinueAsNew = cmp.Or(suggestContinueAsNew, updateReg.SuggestContinueAsNew())
}

workflowTask, scheduledEventCreatedForRedirect, redirectCounter, err := m.processBuildIdRedirectInfo(versioningStamp, workflowTask, taskQueue, redirectInfo, skipVersioningCheck)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (c *mutationTestCase) startWFT(
"",
nil,
nil,
nil,
false,
)
if err != nil {
Expand Down Expand Up @@ -464,6 +465,7 @@ func TestGetNexusCompletion(t *testing.T) {
"---",
nil,
nil,
nil,
false,
)
require.NoError(t, err)
Expand Down

0 comments on commit 299e672

Please sign in to comment.