diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index a5115bbec1c..d2298cc4ac3 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -197,6 +197,9 @@ const ( ReachabilityQueryBuildIdLimit = "limit.reachabilityQueryBuildIds" // TaskQueuesPerBuildIdLimit limits the number of task queue names that can be mapped to a single build id. TaskQueuesPerBuildIdLimit = "limit.taskQueuesPerBuildId" + // RemovableBuildIdDurationSinceDefault is the minimum duration since a build id was last default in its containing + // set for it to be considered for removal, used by the build id scavenger. + RemovableBuildIdDurationSinceDefault = "worker.removableBuildIdDurationSinceDefault" // keys for frontend diff --git a/service/worker/scanner/build_ids/scavenger.go b/service/worker/scanner/build_ids/scavenger.go index c003e321af1..fbd99eb1341 100644 --- a/service/worker/scanner/build_ids/scavenger.go +++ b/service/worker/scanner/build_ids/scavenger.go @@ -37,6 +37,7 @@ import ( "go.temporal.io/server/api/matchingservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/clock/hybrid_logical_clock" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -72,14 +73,19 @@ type ( } Activities struct { - logger log.Logger - taskManager persistence.TaskManager - metadataManager persistence.MetadataManager - visibilityManager manager.VisibilityManager - namespaceRegistry namespace.Registry - matchingClient matchingservice.MatchingServiceClient - currentClusterName string - removableBuildIdMinAge dynamicconfig.DurationPropertyFn + logger log.Logger + taskManager persistence.TaskManager + metadataManager persistence.MetadataManager + visibilityManager manager.VisibilityManager + namespaceRegistry namespace.Registry + matchingClient matchingservice.MatchingServiceClient + currentClusterName string + // Minimum duration since a build id was last default in its containing set for it to be considered for removal. + // If a build id was still default recently, there may be: + // 1. workers with that identifier processing tasks + // 2. workflows with that identifier that have yet to be indexed in visibility + // The scavenger should allow enough time to pass before cleaning these build ids. + removableBuildIdDurationSinceDefault dynamicconfig.DurationPropertyFn } heartbeatDetails struct { @@ -98,15 +104,17 @@ func NewActivities( namespaceRegistry namespace.Registry, matchingClient matchingservice.MatchingServiceClient, currentClusterName string, + removableBuildIdDurationSinceDefault dynamicconfig.DurationPropertyFn, ) *Activities { return &Activities{ - logger: logger, - taskManager: taskManager, - metadataManager: metadataManager, - visibilityManager: visibilityManager, - namespaceRegistry: namespaceRegistry, - matchingClient: matchingClient, - currentClusterName: currentClusterName, + logger: logger, + taskManager: taskManager, + metadataManager: metadataManager, + visibilityManager: visibilityManager, + namespaceRegistry: namespaceRegistry, + matchingClient: matchingClient, + currentClusterName: currentClusterName, + removableBuildIdDurationSinceDefault: removableBuildIdDurationSinceDefault, } } @@ -272,6 +280,10 @@ func (a *Activities) findBuildIdsToRemove( if buildIdIsSetDefault && (setIsQueueDefault || setActive > 1) { continue } + timeSinceWasDefault := time.Since(hybrid_logical_clock.UTC(*buildId.BecameDefaultTimestamp)) + if timeSinceWasDefault < a.removableBuildIdDurationSinceDefault() { + continue + } if err := rateLimiter.Wait(ctx); err != nil { return buildIdsToRemove, err diff --git a/service/worker/scanner/build_ids/scavenger_test.go b/service/worker/scanner/build_ids/scavenger_test.go index b564e1034d3..dfd25a24ff1 100644 --- a/service/worker/scanner/build_ids/scavenger_test.go +++ b/service/worker/scanner/build_ids/scavenger_test.go @@ -86,6 +86,9 @@ func Test_findBuildIdsToRemove_FindsAllBuildIdsToRemove(t *testing.T) { a := &Activities{ logger: log.NewCLILogger(), visibilityManager: visiblityManager, + removableBuildIdDurationSinceDefault: func() time.Duration { + return time.Hour + }, } visiblityManager.EXPECT().CountWorkflowExecutions(gomock.Any(), gomock.Any()).Times(4).DoAndReturn( @@ -107,6 +110,8 @@ func Test_findBuildIdsToRemove_FindsAllBuildIdsToRemove(t *testing.T) { }) c0 := hlc.Zero(0) + c1 := hlc.Clock{WallClock: time.Now().UnixMilli(), Version: 0, ClusterId: 0} + userData := &persistencespb.TaskQueueUserData{ Clock: &c0, VersioningData: &persistencespb.VersioningData{ @@ -169,7 +174,20 @@ func Test_findBuildIdsToRemove_FindsAllBuildIdsToRemove(t *testing.T) { SetIds: []string{"v4"}, BuildIds: []*persistencespb.BuildId{ { - Id: "v4.0", + Id: "v4.0", + State: persistencespb.STATE_ACTIVE, + StateUpdateTimestamp: &c0, + // This one may have been used recently, it should not be deleted + BecameDefaultTimestamp: &c1, + }, + }, + BecameDefaultTimestamp: &c0, + }, + { + SetIds: []string{"v5"}, + BuildIds: []*persistencespb.BuildId{ + { + Id: "v5.0", State: persistencespb.STATE_ACTIVE, StateUpdateTimestamp: &c0, BecameDefaultTimestamp: &c0, @@ -221,7 +239,7 @@ func Test_ScavengeBuildIds_Heartbeats(t *testing.T) { taskManager: taskManager, namespaceRegistry: namespaceRegistry, matchingClient: matchingClient, - removableBuildIdMinAge: func() time.Duration { + removableBuildIdDurationSinceDefault: func() time.Duration { return time.Hour }, currentClusterName: "test-cluster", diff --git a/service/worker/scanner/scanner.go b/service/worker/scanner/scanner.go index f5e22c3faf6..b60b9a42f7c 100644 --- a/service/worker/scanner/scanner.go +++ b/service/worker/scanner/scanner.go @@ -89,6 +89,10 @@ type ( ExecutionScannerWorkerCount dynamicconfig.IntPropertyFn // ExecutionScannerHistoryEventIdValidator indicates if the execution scavenger to validate history event id. ExecutionScannerHistoryEventIdValidator dynamicconfig.BoolPropertyFn + + // RemovableBuildIdDurationSinceDefault is the minimum duration since a build id was last default in its + // containing set for it to be considered for removal. + RemovableBuildIdDurationSinceDefault dynamicconfig.DurationPropertyFn } // scannerContext is the context object that gets @@ -204,6 +208,7 @@ func (s *Scanner) Start() error { s.context.namespaceRegistry, s.context.matchingClient, s.context.currentClusterName, + s.context.cfg.RemovableBuildIdDurationSinceDefault, ) work := s.context.sdkClientFactory.NewWorker(s.context.sdkClientFactory.GetSystemClient(), build_ids.BuildIdScavengerTaskQueueName, workerOpts) diff --git a/service/worker/service.go b/service/worker/service.go index a2f1ca41259..135fa5a6ac6 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -320,6 +320,10 @@ func NewConfig( dynamicconfig.ExecutionScannerHistoryEventIdValidator, true, ), + RemovableBuildIdDurationSinceDefault: dc.GetDurationProperty( + dynamicconfig.RemovableBuildIdDurationSinceDefault, + time.Hour, + ), }, EnableBatcher: dc.GetBoolProperty(dynamicconfig.EnableBatcher, true), BatcherRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BatcherRPS, batcher.DefaultRPS), diff --git a/tests/advanced_visibility_test.go b/tests/advanced_visibility_test.go index b6ed7c14554..007af7b3725 100644 --- a/tests/advanced_visibility_test.go +++ b/tests/advanced_visibility_test.go @@ -98,6 +98,8 @@ func (s *advancedVisibilitySuite) SetupSuite() { dynamicconfig.ReachabilityTaskQueueScanLimit: 2, dynamicconfig.ReachabilityQueryBuildIdLimit: 1, dynamicconfig.BuildIdScavengerEnabled: true, + // Allow the scavenger to remove any build id regardless of when it was last default for a set. + dynamicconfig.RemovableBuildIdDurationSinceDefault: time.Microsecond, } switch TestFlags.PersistenceDriver { diff --git a/tests/xdc/user_data_replication_test.go b/tests/xdc/user_data_replication_test.go index a76133f08f8..31aca15baaf 100644 --- a/tests/xdc/user_data_replication_test.go +++ b/tests/xdc/user_data_replication_test.go @@ -81,6 +81,8 @@ func (s *userDataReplicationTestSuite) SetupSuite() { dynamicconfig.FrontendEnableWorkerVersioningDataAPIs: true, dynamicconfig.FrontendEnableWorkerVersioningWorkflowAPIs: true, dynamicconfig.BuildIdScavengerEnabled: true, + // Ensure the scavenger can immediately delete build ids that are not in use. + dynamicconfig.RemovableBuildIdDurationSinceDefault: time.Microsecond, } s.setupSuite([]string{"task_queue_repl_active", "task_queue_repl_standby"}) }