diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 95ade33f54..cfab2094f9 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -246,7 +246,12 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa var expandedReplication storegateway.ExpandedReplication if gatewayCfg.ExpandedReplication.Enabled { - expandedReplication = storegateway.NewMaxTimeExpandedReplication(gatewayCfg.ExpandedReplication.MaxTimeThreshold) + expandedReplication = storegateway.NewMaxTimeExpandedReplication( + gatewayCfg.ExpandedReplication.MaxTimeThreshold, + // Exclude blocks which have recently become eligible for expanded replication, in order to give + // enough time to store-gateways to discover and load them (3 times the sync interval) + mimir_tsdb.NewBlockDiscoveryDelayMultiplier*storageCfg.BucketStore.SyncInterval, + ) } else { expandedReplication = storegateway.NewNopExpandedReplication() } diff --git a/pkg/storegateway/expanded_replication.go b/pkg/storegateway/expanded_replication.go index 55dfade222..b10190089b 100644 --- a/pkg/storegateway/expanded_replication.go +++ b/pkg/storegateway/expanded_replication.go @@ -9,7 +9,7 @@ import ( ) var ( - errInvalidExpandedReplicationMaxTimeThreshold = errors.New("invalid expanded replication max time threshold, the value must be greater than 0") + errInvalidExpandedReplicationMaxTimeThreshold = errors.New("invalid expanded replication max time threshold, the value must be at least one hour") ) type ExpandedReplicationConfig struct { @@ -23,7 +23,7 @@ func (cfg *ExpandedReplicationConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, p } func (cfg *ExpandedReplicationConfig) Validate() error { - if cfg.Enabled && cfg.MaxTimeThreshold <= 0 { + if cfg.Enabled && cfg.MaxTimeThreshold < time.Hour { return errInvalidExpandedReplicationMaxTimeThreshold } @@ -54,20 +54,27 @@ func (n NopExpandedReplication) Eligible(ReplicatedBlock) bool { return false } -func NewMaxTimeExpandedReplication(maxTime time.Duration) *MaxTimeExpandedReplication { - return &MaxTimeExpandedReplication{maxTime: maxTime, now: time.Now} +func NewMaxTimeExpandedReplication(maxTime time.Duration, gracePeriod time.Duration) *MaxTimeExpandedReplication { + return &MaxTimeExpandedReplication{ + maxTime: maxTime, + gracePeriod: gracePeriod, + now: time.Now, + } } // MaxTimeExpandedReplication is an ExpandedReplication implementation that determines // if a block is eligible for expanded replication based on how recent its MaxTime (most -// recent sample) is. +// recent sample) is. A grace period can optionally be used during queries to ensure that +// blocks are not expected to be replicated to store-gateways that may not have yet had a +// chance to sync them. type MaxTimeExpandedReplication struct { - maxTime time.Duration - now func() time.Time + maxTime time.Duration + gracePeriod time.Duration + now func() time.Time } func (e *MaxTimeExpandedReplication) Eligible(b ReplicatedBlock) bool { now := e.now() delta := now.Sub(b.GetMaxTime()) - return delta <= e.maxTime + return delta <= (e.maxTime - e.gracePeriod) } diff --git a/pkg/storegateway/expanded_replication_test.go b/pkg/storegateway/expanded_replication_test.go index 8ab52a9c5f..b957c1d1b1 100644 --- a/pkg/storegateway/expanded_replication_test.go +++ b/pkg/storegateway/expanded_replication_test.go @@ -16,7 +16,7 @@ func TestMaxTimeExpandedReplication_Eligible(t *testing.T) { // Round "now" to the nearest millisecond since we are using millisecond precision // for min/max times for the blocks. now := time.Now().Round(time.Millisecond) - replication := NewMaxTimeExpandedReplication(2 * time.Hour) + replication := NewMaxTimeExpandedReplication(2*time.Hour, 15*time.Minute) replication.now = func() time.Time { return now } t.Run("max time outside limit", func(t *testing.T) { @@ -29,13 +29,23 @@ func TestMaxTimeExpandedReplication_Eligible(t *testing.T) { require.False(t, replication.Eligible(&b)) }) - t.Run("max time on limit", func(t *testing.T) { + t.Run("max time within grace period", func(t *testing.T) { b := block.Meta{ BlockMeta: tsdb.BlockMeta{ MinTime: now.Add(-4 * time.Hour).UnixMilli(), MaxTime: now.Add(-2 * time.Hour).UnixMilli(), }, } + require.False(t, replication.Eligible(&b)) + }) + + t.Run("max time on grace period", func(t *testing.T) { + b := block.Meta{ + BlockMeta: tsdb.BlockMeta{ + MinTime: now.Add(-4 * time.Hour).UnixMilli(), + MaxTime: now.Add(-105 * time.Minute).UnixMilli(), + }, + } require.True(t, replication.Eligible(&b)) }) diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 3570326947..39f338fe78 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -181,7 +181,7 @@ func newStoreGateway(gatewayCfg Config, storageCfg mimir_tsdb.BlocksStorageConfi var expandedReplication ExpandedReplication if gatewayCfg.ExpandedReplication.Enabled { - expandedReplication = NewMaxTimeExpandedReplication(gatewayCfg.ExpandedReplication.MaxTimeThreshold) + expandedReplication = NewMaxTimeExpandedReplication(gatewayCfg.ExpandedReplication.MaxTimeThreshold, 0) } else { expandedReplication = NewNopExpandedReplication() } diff --git a/pkg/storegateway/sharding_strategy.go b/pkg/storegateway/sharding_strategy.go index 8869bfb8cd..7cca98a1ea 100644 --- a/pkg/storegateway/sharding_strategy.go +++ b/pkg/storegateway/sharding_strategy.go @@ -7,6 +7,7 @@ package storegateway import ( "context" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/ring" diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go index 167f57fab5..fdc14904c7 100644 --- a/pkg/storegateway/sharding_strategy_test.go +++ b/pkg/storegateway/sharding_strategy_test.go @@ -203,7 +203,7 @@ func TestShuffleShardingStrategy(t *testing.T) { }, "multiple ACTIVE instances in the ring with RF = 1 and SS = 3 and ER = true": { replicationFactor: 1, - expandedReplication: NewMaxTimeExpandedReplication(25 * time.Hour), + expandedReplication: NewMaxTimeExpandedReplication(25*time.Hour, 45*time.Minute), limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3}, setupRing: func(r *ring.Desc) { r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{})