Skip to content

Commit

Permalink
Code review changes
Browse files Browse the repository at this point in the history
Apply grace period to sync instead of querying

Signed-off-by: Nick Pillitteri <[email protected]>
  • Loading branch information
56quarters committed Jan 14, 2025
1 parent d96f5b6 commit 305ff5e
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 12 deletions.
14 changes: 6 additions & 8 deletions pkg/storegateway/expanded_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,8 @@ func NewMaxTimeExpandedReplication(maxTime time.Duration, gracePeriod time.Durat

// MaxTimeExpandedReplication is an ExpandedReplication implementation that determines
// if a block is eligible for expanded replication based on how recent its MaxTime (most
// recent sample) is. An upload grace period can optionally be used during queries to ensure
// that blocks are not expected to be replicated to store-gateways that may not have yet had
// a chance to sync them.
// recent sample) is. An upload grace period can optionally be used to ensure that blocks
// are synced to store-gateways before they are expected to be available by queriers.
type MaxTimeExpandedReplication struct {
maxTime time.Duration
gracePeriod time.Duration
Expand All @@ -87,14 +86,13 @@ type MaxTimeExpandedReplication struct {
func (e *MaxTimeExpandedReplication) EligibleForSync(b ReplicatedBlock) bool {
now := e.now()
maxTimeDelta := now.Sub(b.GetMaxTime())
return maxTimeDelta <= e.maxTime
// We start syncing blocks `gracePeriod` before they become eligible for querying to
// ensure that they've been loaded before queriers expect them to be available.
return maxTimeDelta <= (e.maxTime + e.gracePeriod)
}

func (e *MaxTimeExpandedReplication) EligibleForQuerying(b ReplicatedBlock) bool {
now := e.now()
maxTimeDelta := now.Sub(b.GetMaxTime())
// To be eligible for querying a block must have a max time within `maxTime-gracePeriod` since
// we need to allow store-gateways to sync blocks that have recently become eligible for expanded
// replication.
return maxTimeDelta <= (e.maxTime - e.gracePeriod)
return maxTimeDelta <= e.maxTime
}
14 changes: 11 additions & 3 deletions pkg/storegateway/expanded_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,23 @@ func TestMaxTimeExpandedReplication(t *testing.T) {
MaxTime: now.Add(-25 * time.Hour).UnixMilli(),
},
expectedSync: true,
expectedQuery: false,
expectedQuery: true,
},
"max time on boundary including grace period": {
block: bucketindex.Block{
MinTime: now.Add(-49 * time.Hour).UnixMilli(),
MaxTime: now.Add(-(24*time.Hour + 15*time.Minute)).UnixMilli(),
MaxTime: now.Add(-(25*time.Hour + 45*time.Minute)).UnixMilli(),
},
expectedSync: true,
expectedQuery: true,
expectedQuery: false,
},
"max time inside grace period": {
block: bucketindex.Block{
MinTime: now.Add(-49 * time.Hour).UnixMilli(),
MaxTime: now.Add(-(25*time.Hour + 15*time.Minute)).UnixMilli(),
},
expectedSync: true,
expectedQuery: false,
},
"max time too old": {
block: bucketindex.Block{
Expand Down
7 changes: 6 additions & 1 deletion pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,12 @@ func newStoreGateway(gatewayCfg Config, storageCfg mimir_tsdb.BlocksStorageConfi

var expandedReplication ExpandedReplication = NewNopExpandedReplication()
if gatewayCfg.ExpandedReplication.Enabled {
expandedReplication = NewMaxTimeExpandedReplication(gatewayCfg.ExpandedReplication.MaxTimeThreshold, 0)
expandedReplication = NewMaxTimeExpandedReplication(
gatewayCfg.ExpandedReplication.MaxTimeThreshold,
// Exclude blocks which have recently become eligible for expanded replication, in order to give
// enough time to store-gateways to discover and load them (3 times the sync interval)
mimir_tsdb.NewBlockDiscoveryDelayMultiplier*storageCfg.BucketStore.SyncInterval,
)
}

shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, expandedReplication, limits, logger)
Expand Down

0 comments on commit 305ff5e

Please sign in to comment.