Skip to content

Commit

Permalink
Prevent scavenger from removing build ids that were recently default …
Browse files Browse the repository at this point in the history
…for a set (#4527)
  • Loading branch information
bergundy authored Jun 22, 2023
1 parent 5b4e2e3 commit 8699916
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 17 deletions.
3 changes: 3 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ const (
ReachabilityQueryBuildIdLimit = "limit.reachabilityQueryBuildIds"
// TaskQueuesPerBuildIdLimit limits the number of task queue names that can be mapped to a single build id.
TaskQueuesPerBuildIdLimit = "limit.taskQueuesPerBuildId"
// RemovableBuildIdDurationSinceDefault is the minimum duration since a build id was last default in its containing
// set for it to be considered for removal, used by the build id scavenger.
RemovableBuildIdDurationSinceDefault = "worker.removableBuildIdDurationSinceDefault"

// keys for frontend

Expand Down
42 changes: 27 additions & 15 deletions service/worker/scanner/build_ids/scavenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

"go.temporal.io/server/api/matchingservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/clock/hybrid_logical_clock"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -72,14 +73,19 @@ type (
}

Activities struct {
logger log.Logger
taskManager persistence.TaskManager
metadataManager persistence.MetadataManager
visibilityManager manager.VisibilityManager
namespaceRegistry namespace.Registry
matchingClient matchingservice.MatchingServiceClient
currentClusterName string
removableBuildIdMinAge dynamicconfig.DurationPropertyFn
logger log.Logger
taskManager persistence.TaskManager
metadataManager persistence.MetadataManager
visibilityManager manager.VisibilityManager
namespaceRegistry namespace.Registry
matchingClient matchingservice.MatchingServiceClient
currentClusterName string
// Minimum duration since a build id was last default in its containing set for it to be considered for removal.
// If a build id was still default recently, there may be:
// 1. workers with that identifier processing tasks
// 2. workflows with that identifier that have yet to be indexed in visibility
// The scavenger should allow enough time to pass before cleaning these build ids.
removableBuildIdDurationSinceDefault dynamicconfig.DurationPropertyFn
}

heartbeatDetails struct {
Expand All @@ -98,15 +104,17 @@ func NewActivities(
namespaceRegistry namespace.Registry,
matchingClient matchingservice.MatchingServiceClient,
currentClusterName string,
removableBuildIdDurationSinceDefault dynamicconfig.DurationPropertyFn,
) *Activities {
return &Activities{
logger: logger,
taskManager: taskManager,
metadataManager: metadataManager,
visibilityManager: visibilityManager,
namespaceRegistry: namespaceRegistry,
matchingClient: matchingClient,
currentClusterName: currentClusterName,
logger: logger,
taskManager: taskManager,
metadataManager: metadataManager,
visibilityManager: visibilityManager,
namespaceRegistry: namespaceRegistry,
matchingClient: matchingClient,
currentClusterName: currentClusterName,
removableBuildIdDurationSinceDefault: removableBuildIdDurationSinceDefault,
}
}

Expand Down Expand Up @@ -272,6 +280,10 @@ func (a *Activities) findBuildIdsToRemove(
if buildIdIsSetDefault && (setIsQueueDefault || setActive > 1) {
continue
}
timeSinceWasDefault := time.Since(hybrid_logical_clock.UTC(*buildId.BecameDefaultTimestamp))
if timeSinceWasDefault < a.removableBuildIdDurationSinceDefault() {
continue
}

if err := rateLimiter.Wait(ctx); err != nil {
return buildIdsToRemove, err
Expand Down
22 changes: 20 additions & 2 deletions service/worker/scanner/build_ids/scavenger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ func Test_findBuildIdsToRemove_FindsAllBuildIdsToRemove(t *testing.T) {
a := &Activities{
logger: log.NewCLILogger(),
visibilityManager: visiblityManager,
removableBuildIdDurationSinceDefault: func() time.Duration {
return time.Hour
},
}

visiblityManager.EXPECT().CountWorkflowExecutions(gomock.Any(), gomock.Any()).Times(4).DoAndReturn(
Expand All @@ -107,6 +110,8 @@ func Test_findBuildIdsToRemove_FindsAllBuildIdsToRemove(t *testing.T) {
})

c0 := hlc.Zero(0)
c1 := hlc.Clock{WallClock: time.Now().UnixMilli(), Version: 0, ClusterId: 0}

userData := &persistencespb.TaskQueueUserData{
Clock: &c0,
VersioningData: &persistencespb.VersioningData{
Expand Down Expand Up @@ -169,7 +174,20 @@ func Test_findBuildIdsToRemove_FindsAllBuildIdsToRemove(t *testing.T) {
SetIds: []string{"v4"},
BuildIds: []*persistencespb.BuildId{
{
Id: "v4.0",
Id: "v4.0",
State: persistencespb.STATE_ACTIVE,
StateUpdateTimestamp: &c0,
// This one may have been used recently, it should not be deleted
BecameDefaultTimestamp: &c1,
},
},
BecameDefaultTimestamp: &c0,
},
{
SetIds: []string{"v5"},
BuildIds: []*persistencespb.BuildId{
{
Id: "v5.0",
State: persistencespb.STATE_ACTIVE,
StateUpdateTimestamp: &c0,
BecameDefaultTimestamp: &c0,
Expand Down Expand Up @@ -221,7 +239,7 @@ func Test_ScavengeBuildIds_Heartbeats(t *testing.T) {
taskManager: taskManager,
namespaceRegistry: namespaceRegistry,
matchingClient: matchingClient,
removableBuildIdMinAge: func() time.Duration {
removableBuildIdDurationSinceDefault: func() time.Duration {
return time.Hour
},
currentClusterName: "test-cluster",
Expand Down
5 changes: 5 additions & 0 deletions service/worker/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ type (
ExecutionScannerWorkerCount dynamicconfig.IntPropertyFn
// ExecutionScannerHistoryEventIdValidator indicates if the execution scavenger to validate history event id.
ExecutionScannerHistoryEventIdValidator dynamicconfig.BoolPropertyFn

// RemovableBuildIdDurationSinceDefault is the minimum duration since a build id was last default in its
// containing set for it to be considered for removal.
RemovableBuildIdDurationSinceDefault dynamicconfig.DurationPropertyFn
}

// scannerContext is the context object that gets
Expand Down Expand Up @@ -204,6 +208,7 @@ func (s *Scanner) Start() error {
s.context.namespaceRegistry,
s.context.matchingClient,
s.context.currentClusterName,
s.context.cfg.RemovableBuildIdDurationSinceDefault,
)

work := s.context.sdkClientFactory.NewWorker(s.context.sdkClientFactory.GetSystemClient(), build_ids.BuildIdScavengerTaskQueueName, workerOpts)
Expand Down
4 changes: 4 additions & 0 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ func NewConfig(
dynamicconfig.ExecutionScannerHistoryEventIdValidator,
true,
),
RemovableBuildIdDurationSinceDefault: dc.GetDurationProperty(
dynamicconfig.RemovableBuildIdDurationSinceDefault,
time.Hour,
),
},
EnableBatcher: dc.GetBoolProperty(dynamicconfig.EnableBatcher, true),
BatcherRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BatcherRPS, batcher.DefaultRPS),
Expand Down
2 changes: 2 additions & 0 deletions tests/advanced_visibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ func (s *advancedVisibilitySuite) SetupSuite() {
dynamicconfig.ReachabilityTaskQueueScanLimit: 2,
dynamicconfig.ReachabilityQueryBuildIdLimit: 1,
dynamicconfig.BuildIdScavengerEnabled: true,
// Allow the scavenger to remove any build id regardless of when it was last default for a set.
dynamicconfig.RemovableBuildIdDurationSinceDefault: time.Microsecond,
}

switch TestFlags.PersistenceDriver {
Expand Down
2 changes: 2 additions & 0 deletions tests/xdc/user_data_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func (s *userDataReplicationTestSuite) SetupSuite() {
dynamicconfig.FrontendEnableWorkerVersioningDataAPIs: true,
dynamicconfig.FrontendEnableWorkerVersioningWorkflowAPIs: true,
dynamicconfig.BuildIdScavengerEnabled: true,
// Ensure the scavenger can immediately delete build ids that are not in use.
dynamicconfig.RemovableBuildIdDurationSinceDefault: time.Microsecond,
}
s.setupSuite([]string{"task_queue_repl_active", "task_queue_repl_standby"})
}
Expand Down

0 comments on commit 8699916

Please sign in to comment.