Skip to content

Commit

Permalink
Don't check upload time in expanded replication
Browse files Browse the repository at this point in the history
Upload time is checked (and blocks within a grace period are ignored)
by the query consistency check.

Signed-off-by: Nick Pillitteri <[email protected]>
  • Loading branch information
56quarters committed Jan 13, 2025
1 parent 2a520a9 commit d96f5b6
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 50 deletions.
30 changes: 10 additions & 20 deletions pkg/storegateway/expanded_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,14 @@ func (cfg *ExpandedReplicationConfig) Validate() error {
return nil
}

// ReplicatedBlock is a TSDB block that may be eligible to be synced to more store-gateways
// than the configured replication factor based on metadata about the block.
// ReplicatedBlock is a TSDB block that may be eligible to be synced to and queried from
// more store-gateways than the configured replication factor based on metadata about the
// block.
type ReplicatedBlock interface {
GetMinTime() time.Time
GetMaxTime() time.Time
}

// QueryableReplicatedBlock is a TSDB block that may be eligible to be queried from more
// store-gateways than the configured replication factor based on metadata about the block.
type QueryableReplicatedBlock interface {
ReplicatedBlock

GetUploadedAt() time.Time
}

// ExpandedReplication determines if a TSDB block is eligible to be sync to and queried from more
// store-gateways than the configured replication factor based on metadata about the block.
type ExpandedReplication interface {
Expand All @@ -54,7 +47,7 @@ type ExpandedReplication interface {

// EligibleForQuerying returns true if the block can be safely queried from more than the
// configured (via replication factor) number of store-gateways, false otherwise.
EligibleForQuerying(b QueryableReplicatedBlock) bool
EligibleForQuerying(b ReplicatedBlock) bool
}

func NewNopExpandedReplication() *NopExpandedReplication {
Expand All @@ -68,7 +61,7 @@ func (n NopExpandedReplication) EligibleForSync(ReplicatedBlock) bool {
return false
}

func (n NopExpandedReplication) EligibleForQuerying(QueryableReplicatedBlock) bool {
func (n NopExpandedReplication) EligibleForQuerying(ReplicatedBlock) bool {
return false
}

Expand Down Expand Up @@ -97,14 +90,11 @@ func (e *MaxTimeExpandedReplication) EligibleForSync(b ReplicatedBlock) bool {
return maxTimeDelta <= e.maxTime
}

func (e *MaxTimeExpandedReplication) EligibleForQuerying(b QueryableReplicatedBlock) bool {
func (e *MaxTimeExpandedReplication) EligibleForQuerying(b ReplicatedBlock) bool {
now := e.now()
uploadedDelta := now.Sub(b.GetUploadedAt())
maxTimeDelta := now.Sub(b.GetMaxTime())
// To be eligible for querying a block must:
// * Have been uploaded more than `gracePeriod` ago since we need to allow store-gateways
// to sync recently uploaded blocks.
// * Have a max time within `maxTime-gracePeriod` since we need to allow store-gateways to
// sync blocks that have recently become eligible for expanded replication.
return uploadedDelta > e.gracePeriod && maxTimeDelta <= (e.maxTime-e.gracePeriod)
// To be eligible for querying a block must have a max time within `maxTime-gracePeriod` since
// we need to allow store-gateways to sync blocks that have recently become eligible for expanded
// replication.
return maxTimeDelta <= (e.maxTime - e.gracePeriod)
}
38 changes: 8 additions & 30 deletions pkg/storegateway/expanded_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,54 +27,32 @@ func TestMaxTimeExpandedReplication(t *testing.T) {
testCases := map[string]testCase{
"max time eligible": {
block: bucketindex.Block{
MinTime: now.Add(-24 * time.Hour).UnixMilli(),
MaxTime: now.Add(-12 * time.Hour).UnixMilli(),
UploadedAt: now.Add(-6 * time.Hour).Unix(),
MinTime: now.Add(-24 * time.Hour).UnixMilli(),
MaxTime: now.Add(-12 * time.Hour).UnixMilli(),
},
expectedSync: true,
expectedQuery: true,
},
"max time eligible recent upload": {
block: bucketindex.Block{
MinTime: now.Add(-24 * time.Hour).UnixMilli(),
MaxTime: now.Add(-12 * time.Hour).UnixMilli(),
UploadedAt: now.Add(-15 * time.Minute).Unix(),
},
expectedSync: true,
expectedQuery: false,
},
"max time on boundary": {
block: bucketindex.Block{
MinTime: now.Add(-49 * time.Hour).UnixMilli(),
MaxTime: now.Add(-25 * time.Hour).UnixMilli(),
UploadedAt: now.Add(-6 * time.Hour).Unix(),
MinTime: now.Add(-49 * time.Hour).UnixMilli(),
MaxTime: now.Add(-25 * time.Hour).UnixMilli(),
},
expectedSync: true,
expectedQuery: false,
},
"max time on boundary including grace period": {
block: bucketindex.Block{
MinTime: now.Add(-49 * time.Hour).UnixMilli(),
MaxTime: now.Add(-(24*time.Hour + 15*time.Minute)).UnixMilli(),
UploadedAt: now.Add(-6 * time.Hour).Unix(),
MinTime: now.Add(-49 * time.Hour).UnixMilli(),
MaxTime: now.Add(-(24*time.Hour + 15*time.Minute)).UnixMilli(),
},
expectedSync: true,
expectedQuery: true,
},
"max time on boundary including grace period recent upload": {
block: bucketindex.Block{
MinTime: now.Add(-49 * time.Hour).UnixMilli(),
MaxTime: now.Add(-(24*time.Hour + 15*time.Minute)).UnixMilli(),
UploadedAt: now.Add(-30 * time.Minute).Unix(),
},
expectedSync: true,
expectedQuery: false,
},
"max time too old": {
block: bucketindex.Block{
MinTime: now.Add(-72 * time.Hour).UnixMilli(),
MaxTime: now.Add(-48 * time.Hour).UnixMilli(),
UploadedAt: now.Add(-24 * time.Hour).Unix(),
MinTime: now.Add(-72 * time.Hour).UnixMilli(),
MaxTime: now.Add(-48 * time.Hour).UnixMilli(),
},
expectedSync: false,
expectedQuery: false,
Expand Down

0 comments on commit d96f5b6

Please sign in to comment.