Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent scavenger from removing build ids that were recently default for a set #4527

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ const (
ReachabilityQueryBuildIdLimit = "limit.reachabilityQueryBuildIds"
// TaskQueuesPerBuildIdLimit limits the number of task queue names that can be mapped to a single build id.
TaskQueuesPerBuildIdLimit = "limit.taskQueuesPerBuildId"
// RemovableBuildIdDurationSinceDefault is the minimum duration since a build id was last default in its containing
// set for it to be considered for removal, used by the build id scavenger.
RemovableBuildIdDurationSinceDefault = "worker.removableBuildIdDurationSinceDefault"

// keys for frontend

Expand Down
42 changes: 27 additions & 15 deletions service/worker/scanner/build_ids/scavenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

"go.temporal.io/server/api/matchingservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/clock/hybrid_logical_clock"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -72,14 +73,19 @@ type (
}

Activities struct {
logger log.Logger
taskManager persistence.TaskManager
metadataManager persistence.MetadataManager
visibilityManager manager.VisibilityManager
namespaceRegistry namespace.Registry
matchingClient matchingservice.MatchingServiceClient
currentClusterName string
removableBuildIdMinAge dynamicconfig.DurationPropertyFn
logger log.Logger
taskManager persistence.TaskManager
metadataManager persistence.MetadataManager
visibilityManager manager.VisibilityManager
namespaceRegistry namespace.Registry
matchingClient matchingservice.MatchingServiceClient
currentClusterName string
// Minimum duration since a build id was last default in its containing set for it to be considered for removal.
// If a build id was still default recently, there may be:
// 1. workers with that identifier processing tasks
// 2. workflows with that identifier that have yet to be indexed in visibility
// The scavenger should allow enough time to pass before cleaning these build ids.
removableBuildIdDurationSinceDefault dynamicconfig.DurationPropertyFn
}

heartbeatDetails struct {
Expand All @@ -98,15 +104,17 @@ func NewActivities(
namespaceRegistry namespace.Registry,
matchingClient matchingservice.MatchingServiceClient,
currentClusterName string,
removableBuildIdDurationSinceDefault dynamicconfig.DurationPropertyFn,
) *Activities {
return &Activities{
logger: logger,
taskManager: taskManager,
metadataManager: metadataManager,
visibilityManager: visibilityManager,
namespaceRegistry: namespaceRegistry,
matchingClient: matchingClient,
currentClusterName: currentClusterName,
logger: logger,
taskManager: taskManager,
metadataManager: metadataManager,
visibilityManager: visibilityManager,
namespaceRegistry: namespaceRegistry,
matchingClient: matchingClient,
currentClusterName: currentClusterName,
removableBuildIdDurationSinceDefault: removableBuildIdDurationSinceDefault,
}
}

Expand Down Expand Up @@ -272,6 +280,10 @@ func (a *Activities) findBuildIdsToRemove(
if buildIdIsSetDefault && (setIsQueueDefault || setActive > 1) {
continue
}
timeSinceWasDefault := time.Since(hybrid_logical_clock.UTC(*buildId.BecameDefaultTimestamp))
if timeSinceWasDefault < a.removableBuildIdDurationSinceDefault() {
continue
}

if err := rateLimiter.Wait(ctx); err != nil {
return buildIdsToRemove, err
Expand Down
22 changes: 20 additions & 2 deletions service/worker/scanner/build_ids/scavenger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ func Test_findBuildIdsToRemove_FindsAllBuildIdsToRemove(t *testing.T) {
a := &Activities{
logger: log.NewCLILogger(),
visibilityManager: visiblityManager,
removableBuildIdDurationSinceDefault: func() time.Duration {
return time.Hour
},
}

visiblityManager.EXPECT().CountWorkflowExecutions(gomock.Any(), gomock.Any()).Times(4).DoAndReturn(
Expand All @@ -107,6 +110,8 @@ func Test_findBuildIdsToRemove_FindsAllBuildIdsToRemove(t *testing.T) {
})

c0 := hlc.Zero(0)
c1 := hlc.Clock{WallClock: time.Now().UnixMilli(), Version: 0, ClusterId: 0}

userData := &persistencespb.TaskQueueUserData{
Clock: &c0,
VersioningData: &persistencespb.VersioningData{
Expand Down Expand Up @@ -169,7 +174,20 @@ func Test_findBuildIdsToRemove_FindsAllBuildIdsToRemove(t *testing.T) {
SetIds: []string{"v4"},
BuildIds: []*persistencespb.BuildId{
{
Id: "v4.0",
Id: "v4.0",
State: persistencespb.STATE_ACTIVE,
StateUpdateTimestamp: &c0,
// This one may have been used recently, it should not be deleted
BecameDefaultTimestamp: &c1,
},
},
BecameDefaultTimestamp: &c0,
},
{
SetIds: []string{"v5"},
BuildIds: []*persistencespb.BuildId{
{
Id: "v5.0",
State: persistencespb.STATE_ACTIVE,
StateUpdateTimestamp: &c0,
BecameDefaultTimestamp: &c0,
Expand Down Expand Up @@ -221,7 +239,7 @@ func Test_ScavengeBuildIds_Heartbeats(t *testing.T) {
taskManager: taskManager,
namespaceRegistry: namespaceRegistry,
matchingClient: matchingClient,
removableBuildIdMinAge: func() time.Duration {
removableBuildIdDurationSinceDefault: func() time.Duration {
return time.Hour
},
currentClusterName: "test-cluster",
Expand Down
5 changes: 5 additions & 0 deletions service/worker/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ type (
ExecutionScannerWorkerCount dynamicconfig.IntPropertyFn
// ExecutionScannerHistoryEventIdValidator indicates if the execution scavenger to validate history event id.
ExecutionScannerHistoryEventIdValidator dynamicconfig.BoolPropertyFn

// RemovableBuildIdDurationSinceDefault is the minimum duration since a build id was last default in its
// containing set for it to be considered for removal.
RemovableBuildIdDurationSinceDefault dynamicconfig.DurationPropertyFn
}

// scannerContext is the context object that gets
Expand Down Expand Up @@ -204,6 +208,7 @@ func (s *Scanner) Start() error {
s.context.namespaceRegistry,
s.context.matchingClient,
s.context.currentClusterName,
s.context.cfg.RemovableBuildIdDurationSinceDefault,
)

work := s.context.sdkClientFactory.NewWorker(s.context.sdkClientFactory.GetSystemClient(), build_ids.BuildIdScavengerTaskQueueName, workerOpts)
Expand Down
4 changes: 4 additions & 0 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ func NewConfig(
dynamicconfig.ExecutionScannerHistoryEventIdValidator,
true,
),
RemovableBuildIdDurationSinceDefault: dc.GetDurationProperty(
dynamicconfig.RemovableBuildIdDurationSinceDefault,
time.Hour,
),
},
EnableBatcher: dc.GetBoolProperty(dynamicconfig.EnableBatcher, true),
BatcherRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BatcherRPS, batcher.DefaultRPS),
Expand Down
2 changes: 2 additions & 0 deletions tests/advanced_visibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ func (s *advancedVisibilitySuite) SetupSuite() {
dynamicconfig.ReachabilityTaskQueueScanLimit: 2,
dynamicconfig.ReachabilityQueryBuildIdLimit: 1,
dynamicconfig.BuildIdScavengerEnabled: true,
// Allow the scavenger to remove any build id regardless of when it was last default for a set.
dynamicconfig.RemovableBuildIdDurationSinceDefault: time.Microsecond,
}

switch TestFlags.PersistenceDriver {
Expand Down
2 changes: 2 additions & 0 deletions tests/xdc/user_data_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func (s *userDataReplicationTestSuite) SetupSuite() {
dynamicconfig.FrontendEnableWorkerVersioningDataAPIs: true,
dynamicconfig.FrontendEnableWorkerVersioningWorkflowAPIs: true,
dynamicconfig.BuildIdScavengerEnabled: true,
// Ensure the scavenger can immediately delete build ids that are not in use.
dynamicconfig.RemovableBuildIdDurationSinceDefault: time.Microsecond,
}
s.setupSuite([]string{"task_queue_repl_active", "task_queue_repl_standby"})
}
Expand Down