Skip to content

Commit

Permalink
Add grace period to expanded replication time range
Browse files Browse the repository at this point in the history
Much like how block uploads have a grace period, add a grace period
when querying with expanded replication to make sure store-gateways
have enough time to load blocks before they are queried.

Signed-off-by: Nick Pillitteri <[email protected]>
  • Loading branch information
56quarters committed Jan 10, 2025
1 parent 9207b48 commit 655630e
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 60 deletions.
7 changes: 6 additions & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,12 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa

var expandedReplication storegateway.ExpandedReplication
if gatewayCfg.ExpandedReplication.Enabled {
expandedReplication = storegateway.NewMaxTimeExpandedReplication(gatewayCfg.ExpandedReplication.MaxTimeThreshold)
expandedReplication = storegateway.NewMaxTimeExpandedReplication(
gatewayCfg.ExpandedReplication.MaxTimeThreshold,
// Exclude blocks which have recently become eligible for expanded replication, in order to give
// enough time to store-gateways to discover and load them (3 times the sync interval)
mimir_tsdb.NewBlockDiscoveryDelayMultiplier*storageCfg.BucketStore.SyncInterval,
)
} else {
expandedReplication = storegateway.NewNopExpandedReplication()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_replicated_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blocks bucketin
// Find the replication set of each block we need to query.
for _, block := range blocks {
var ringOpts []ring.Option
if s.expandedReplication.Eligible(block) {
if s.expandedReplication.EligibleForQuerying(block) {
ringOpts = append(ringOpts, expandedReplicationOption)
}

Expand Down
66 changes: 50 additions & 16 deletions pkg/storegateway/expanded_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

var (
errInvalidExpandedReplicationMaxTimeThreshold = errors.New("invalid expanded replication max time threshold, the value must be greater than 0")
errInvalidExpandedReplicationMaxTimeThreshold = errors.New("invalid expanded replication max time threshold, the value must be at least one hour")
)

type ExpandedReplicationConfig struct {
Expand All @@ -23,24 +23,38 @@ func (cfg *ExpandedReplicationConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, p
}

func (cfg *ExpandedReplicationConfig) Validate() error {
if cfg.Enabled && cfg.MaxTimeThreshold <= 0 {
if cfg.Enabled && cfg.MaxTimeThreshold < time.Hour {
return errInvalidExpandedReplicationMaxTimeThreshold
}

return nil
}

// ReplicatedBlock is metadata about a TSDB block that may be eligible for expanded
// replication (replication by more store-gateways than the configured replication factor).
// ReplicatedBlock is a TSDB block that may be eligible to be synced to more store-gateways
// than the configured replication factor based on metadata about the block.
type ReplicatedBlock interface {
GetMinTime() time.Time
GetMaxTime() time.Time
}

// ExpandedReplication implementations determine if a block should be replicated to more
// store-gateways than the configured replication factor based on its metadata.
// QueryableReplicatedBlock is a TSDB block that may be eligible to be queried from more
// store-gateways than the configured replication factor based on metadata about the block.
type QueryableReplicatedBlock interface {
ReplicatedBlock

GetUploadedAt() time.Time
}

// ExpandedReplication determines if a TSDB block is eligible to be sync to and queried from more
// store-gateways than the configured replication factor based on metadata about the block.
type ExpandedReplication interface {
Eligible(b ReplicatedBlock) bool
// EligibleForSync returns true if the block can be synced to more than the configured (via
// replication factor) number of store-gateways, false otherwise.
EligibleForSync(b ReplicatedBlock) bool

// EligibleForQuerying returns true if the block can be safely queried from more than the
// configured (via replication factor) number of store-gateways, false otherwise.
EligibleForQuerying(b QueryableReplicatedBlock) bool
}

func NewNopExpandedReplication() *NopExpandedReplication {
Expand All @@ -50,24 +64,44 @@ func NewNopExpandedReplication() *NopExpandedReplication {
// NopExpandedReplication is an ExpandedReplication implementation that always returns false.
type NopExpandedReplication struct{}

func (n NopExpandedReplication) Eligible(ReplicatedBlock) bool {
func (n NopExpandedReplication) EligibleForSync(ReplicatedBlock) bool {
return false
}

func (n NopExpandedReplication) EligibleForQuerying(QueryableReplicatedBlock) bool {
return false
}

func NewMaxTimeExpandedReplication(maxTime time.Duration) *MaxTimeExpandedReplication {
return &MaxTimeExpandedReplication{maxTime: maxTime, now: time.Now}
func NewMaxTimeExpandedReplication(maxTime time.Duration, gracePeriod time.Duration) *MaxTimeExpandedReplication {
return &MaxTimeExpandedReplication{
maxTime: maxTime,
gracePeriod: gracePeriod,
now: time.Now,
}
}

// MaxTimeExpandedReplication is an ExpandedReplication implementation that determines
// if a block is eligible for expanded replication based on how recent its MaxTime (most
// recent sample) is.
// recent sample) is. An upload grace period can optionally be used during queries to ensure
// that blocks are not expected to be replicated to store-gateways that may not have yet had
// a chance to sync them.
type MaxTimeExpandedReplication struct {
maxTime time.Duration
now func() time.Time
maxTime time.Duration
gracePeriod time.Duration
now func() time.Time
}

func (e *MaxTimeExpandedReplication) isMaxTimeInWindow(b ReplicatedBlock, now time.Time) bool {
maxTimeDelta := now.Sub(b.GetMaxTime())
return maxTimeDelta <= e.maxTime
}

func (e *MaxTimeExpandedReplication) EligibleForSync(b ReplicatedBlock) bool {
return e.isMaxTimeInWindow(b, e.now())
}

func (e *MaxTimeExpandedReplication) Eligible(b ReplicatedBlock) bool {
func (e *MaxTimeExpandedReplication) EligibleForQuerying(b QueryableReplicatedBlock) bool {
now := e.now()
delta := now.Sub(b.GetMaxTime())
return delta <= e.maxTime
uploadedDelta := now.Sub(b.GetUploadedAt())
return uploadedDelta > e.gracePeriod && e.isMaxTimeInWindow(b, now)
}
101 changes: 62 additions & 39 deletions pkg/storegateway/expanded_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,56 +6,79 @@ import (
"testing"
"time"

"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/pkg/storage/tsdb/block"
"github.com/grafana/mimir/pkg/storage/tsdb/bucketindex"
)

func TestMaxTimeExpandedReplication_Eligible(t *testing.T) {
func TestMaxTimeExpandedReplication(t *testing.T) {
// Round "now" to the nearest millisecond since we are using millisecond precision
// for min/max times for the blocks.
now := time.Now().Round(time.Millisecond)
replication := NewMaxTimeExpandedReplication(2 * time.Hour)
replication := NewMaxTimeExpandedReplication(25*time.Hour, 45*time.Minute)
replication.now = func() time.Time { return now }

t.Run("max time outside limit", func(t *testing.T) {
b := block.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: now.Add(-5 * time.Hour).UnixMilli(),
MaxTime: now.Add(-4 * time.Hour).UnixMilli(),
type testCase struct {
block bucketindex.Block
expectedSync bool
expectedQuery bool
}

testCases := map[string]testCase{
"max time eligible": {
block: bucketindex.Block{
MinTime: now.Add(-24 * time.Hour).UnixMilli(),
MaxTime: now.Add(-12 * time.Hour).UnixMilli(),
UploadedAt: now.Add(-6 * time.Hour).Unix(),
},
expectedSync: true,
expectedQuery: true,
},
"max time eligible recent upload": {
block: bucketindex.Block{
MinTime: now.Add(-24 * time.Hour).UnixMilli(),
MaxTime: now.Add(-12 * time.Hour).UnixMilli(),
UploadedAt: now.Add(-15 * time.Minute).Unix(),
},
}
require.False(t, replication.Eligible(&b))
})

t.Run("max time on limit", func(t *testing.T) {
b := block.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: now.Add(-4 * time.Hour).UnixMilli(),
MaxTime: now.Add(-2 * time.Hour).UnixMilli(),
expectedSync: true,
expectedQuery: false,
},
"max time on boundary": {
block: bucketindex.Block{
MinTime: now.Add(-25 * time.Hour).UnixMilli(),
MaxTime: now.Add(-13 * time.Hour).UnixMilli(),
UploadedAt: now.Add(-6 * time.Hour).Unix(),
},
}
require.True(t, replication.Eligible(&b))
})

t.Run("max time inside min time outside limit", func(t *testing.T) {
b := block.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: now.Add(-3 * time.Hour).UnixMilli(),
MaxTime: now.Add(-time.Hour).UnixMilli(),
expectedSync: true,
expectedQuery: true,
},
"max time on boundary recent upload": {
block: bucketindex.Block{
MinTime: now.Add(-25 * time.Hour).UnixMilli(),
MaxTime: now.Add(-13 * time.Hour).UnixMilli(),
UploadedAt: now.Add(-30 * time.Minute).Unix(),
},
}
require.True(t, replication.Eligible(&b))
})

t.Run("max and min time inside limit", func(t *testing.T) {
b := block.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: now.Add(-1 * time.Hour).UnixMilli(),
MaxTime: now.UnixMilli(),
expectedSync: true,
expectedQuery: false,
},
"max time too old": {
block: bucketindex.Block{
MinTime: now.Add(-72 * time.Hour).UnixMilli(),
MaxTime: now.Add(-48 * time.Hour).UnixMilli(),
UploadedAt: now.Add(-24 * time.Hour).Unix(),
},
}
require.True(t, replication.Eligible(&b))
})
expectedSync: false,
expectedQuery: false,
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
canSync := replication.EligibleForSync(&tc.block)
canQuery := replication.EligibleForQuerying(&tc.block)

require.Equal(t, tc.expectedSync, canSync, "expected to be able to sync block %+v using %+v", tc.block, replication)
require.Equal(t, tc.expectedQuery, canQuery, "expected to be able to query block %+v using %+v", tc.block, replication)
})
}
}
2 changes: 1 addition & 1 deletion pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func newStoreGateway(gatewayCfg Config, storageCfg mimir_tsdb.BlocksStorageConfi

var expandedReplication ExpandedReplication
if gatewayCfg.ExpandedReplication.Enabled {
expandedReplication = NewMaxTimeExpandedReplication(gatewayCfg.ExpandedReplication.MaxTimeThreshold)
expandedReplication = NewMaxTimeExpandedReplication(gatewayCfg.ExpandedReplication.MaxTimeThreshold, 0)
} else {
expandedReplication = NewNopExpandedReplication()
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/storegateway/sharding_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package storegateway

import (
"context"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
Expand Down Expand Up @@ -118,7 +119,7 @@ func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string,

for blockID := range metas {
ringOpts := []ring.Option{bufOption}
if s.expandedReplication.Eligible(metas[blockID]) {
if s.expandedReplication.EligibleForSync(metas[blockID]) {
ringOpts = append(ringOpts, expandedReplicationOption)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/sharding_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestShuffleShardingStrategy(t *testing.T) {
},
"multiple ACTIVE instances in the ring with RF = 1 and SS = 3 and ER = true": {
replicationFactor: 1,
expandedReplication: NewMaxTimeExpandedReplication(25 * time.Hour),
expandedReplication: NewMaxTimeExpandedReplication(25*time.Hour, 45*time.Minute),
limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3},
setupRing: func(r *ring.Desc) {
r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt, false, time.Time{})
Expand Down

0 comments on commit 655630e

Please sign in to comment.