Skip to content

Commit

Permalink
Add grace period to expanded replication time range
Browse files Browse the repository at this point in the history
Much like how block uploads have a grace period, add a grace period
when querying with expanded replication to make sure store-gateways
have enough time to load blocks before they are queried.

Signed-off-by: Nick Pillitteri <[email protected]>
  • Loading branch information
56quarters committed Jan 10, 2025
1 parent a3b09a6 commit 9a2b10e
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 13 deletions.
7 changes: 6 additions & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,12 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa

var expandedReplication storegateway.ExpandedReplication
if gatewayCfg.ExpandedReplication.Enabled {
expandedReplication = storegateway.NewMaxTimeExpandedReplication(gatewayCfg.ExpandedReplication.MaxTimeThreshold)
expandedReplication = storegateway.NewMaxTimeExpandedReplication(
gatewayCfg.ExpandedReplication.MaxTimeThreshold,
// Exclude blocks which have recently become eligible for expanded replication, in order to give
// enough time to store-gateways to discover and load them (3 times the sync interval)
mimir_tsdb.NewBlockDiscoveryDelayMultiplier*storageCfg.BucketStore.SyncInterval,
)
} else {
expandedReplication = storegateway.NewNopExpandedReplication()
}
Expand Down
23 changes: 15 additions & 8 deletions pkg/storegateway/expanded_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

var (
errInvalidExpandedReplicationMaxTimeThreshold = errors.New("invalid expanded replication max time threshold, the value must be greater than 0")
errInvalidExpandedReplicationMaxTimeThreshold = errors.New("invalid expanded replication max time threshold, the value must be at least one hour")
)

type ExpandedReplicationConfig struct {
Expand All @@ -23,7 +23,7 @@ func (cfg *ExpandedReplicationConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, p
}

func (cfg *ExpandedReplicationConfig) Validate() error {
if cfg.Enabled && cfg.MaxTimeThreshold <= 0 {
if cfg.Enabled && cfg.MaxTimeThreshold < time.Hour {
return errInvalidExpandedReplicationMaxTimeThreshold
}

Expand Down Expand Up @@ -54,20 +54,27 @@ func (n NopExpandedReplication) Eligible(ReplicatedBlock) bool {
return false
}

func NewMaxTimeExpandedReplication(maxTime time.Duration) *MaxTimeExpandedReplication {
return &MaxTimeExpandedReplication{maxTime: maxTime, now: time.Now}
func NewMaxTimeExpandedReplication(maxTime time.Duration, gracePeriod time.Duration) *MaxTimeExpandedReplication {
return &MaxTimeExpandedReplication{
maxTime: maxTime,
gracePeriod: gracePeriod,
now: time.Now,
}
}

// MaxTimeExpandedReplication is an ExpandedReplication implementation that determines
// if a block is eligible for expanded replication based on how recent its MaxTime (most
// recent sample) is.
// recent sample) is. A grace period can optionally be used during queries to ensure that
// blocks are not expected to be replicated to store-gateways that may not have yet had a
// chance to sync them.
type MaxTimeExpandedReplication struct {
maxTime time.Duration
now func() time.Time
maxTime time.Duration
gracePeriod time.Duration
now func() time.Time
}

func (e *MaxTimeExpandedReplication) Eligible(b ReplicatedBlock) bool {
now := e.now()
delta := now.Sub(b.GetMaxTime())
return delta <= e.maxTime
return delta <= (e.maxTime - e.gracePeriod)
}
14 changes: 12 additions & 2 deletions pkg/storegateway/expanded_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestMaxTimeExpandedReplication_Eligible(t *testing.T) {
// Round "now" to the nearest millisecond since we are using millisecond precision
// for min/max times for the blocks.
now := time.Now().Round(time.Millisecond)
replication := NewMaxTimeExpandedReplication(2 * time.Hour)
replication := NewMaxTimeExpandedReplication(2*time.Hour, 15*time.Minute)
replication.now = func() time.Time { return now }

t.Run("max time outside limit", func(t *testing.T) {
Expand All @@ -29,13 +29,23 @@ func TestMaxTimeExpandedReplication_Eligible(t *testing.T) {
require.False(t, replication.Eligible(&b))
})

t.Run("max time on limit", func(t *testing.T) {
t.Run("max time within grace period", func(t *testing.T) {
b := block.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: now.Add(-4 * time.Hour).UnixMilli(),
MaxTime: now.Add(-2 * time.Hour).UnixMilli(),
},
}
require.False(t, replication.Eligible(&b))
})

t.Run("max time on grace period", func(t *testing.T) {
b := block.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: now.Add(-4 * time.Hour).UnixMilli(),
MaxTime: now.Add(-105 * time.Minute).UnixMilli(),
},
}
require.True(t, replication.Eligible(&b))
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func newStoreGateway(gatewayCfg Config, storageCfg mimir_tsdb.BlocksStorageConfi

var expandedReplication ExpandedReplication
if gatewayCfg.ExpandedReplication.Enabled {
expandedReplication = NewMaxTimeExpandedReplication(gatewayCfg.ExpandedReplication.MaxTimeThreshold)
expandedReplication = NewMaxTimeExpandedReplication(gatewayCfg.ExpandedReplication.MaxTimeThreshold, 0)
} else {
expandedReplication = NewNopExpandedReplication()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/storegateway/sharding_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package storegateway

import (
"context"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/sharding_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestShuffleShardingStrategy(t *testing.T) {
},
"multiple ACTIVE instances in the ring with RF = 1 and SS = 3 and ER = true": {
replicationFactor: 1,
expandedReplication: NewMaxTimeExpandedReplication(25 * time.Hour),
expandedReplication: NewMaxTimeExpandedReplication(25*time.Hour, 45*time.Minute),
limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3},
setupRing: func(r *ring.Desc) {
r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{})
Expand Down

0 comments on commit 9a2b10e

Please sign in to comment.