Skip to content

Commit

Permalink
Rename feature to "dynamic_replication"
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Pillitteri <[email protected]>
  • Loading branch information
56quarters committed Jan 14, 2025
1 parent 48d3d73 commit 5aa26f6
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 95 deletions.
6 changes: 3 additions & 3 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -11582,7 +11582,7 @@
},
{
"kind": "block",
"name": "expanded_replication",
"name": "dynamic_replication",
"required": false,
"desc": "",
"blockEntries": [
Expand All @@ -11593,7 +11593,7 @@
"desc": "Use a higher number of replicas for recent blocks. Useful to spread query load more evenly at the cost of slightly higher disk usage.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "store-gateway.expanded-replication.enabled",
"fieldFlag": "store-gateway.dynamic-replication.enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
Expand All @@ -11604,7 +11604,7 @@
"desc": "Threshold of the most recent sample in a block used to determine it is eligible for higher than default replication. If a block has samples within this amount of time, it is considered recent and will be owned by more replicas.",
"fieldValue": null,
"fieldDefaultValue": 90000000000000,
"fieldFlag": "store-gateway.expanded-replication.max-time-threshold",
"fieldFlag": "store-gateway.dynamic-replication.max-time-threshold",
"fieldType": "duration",
"fieldCategory": "experimental"
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -3181,12 +3181,12 @@ Usage of ./cmd/mimir/mimir:
How long to wait between SIGTERM and shutdown. After receiving SIGTERM, Mimir will report not-ready status via /ready endpoint.
-store-gateway.disabled-tenants comma-separated-list-of-strings
Comma separated list of tenants that cannot be loaded by the store-gateway. If specified, and the store-gateway would normally load a given tenant for (via -store-gateway.enabled-tenants or sharding), it will be ignored instead.
-store-gateway.enabled-tenants comma-separated-list-of-strings
Comma separated list of tenants that can be loaded by the store-gateway. If specified, only blocks for these tenants will be loaded by the store-gateway, otherwise all tenants can be loaded. Subject to sharding.
-store-gateway.expanded-replication.enabled
-store-gateway.dynamic-replication.enabled
[experimental] Use a higher number of replicas for recent blocks. Useful to spread query load more evenly at the cost of slightly higher disk usage.
-store-gateway.expanded-replication.max-time-threshold duration
-store-gateway.dynamic-replication.max-time-threshold duration
[experimental] Threshold of the most recent sample in a block used to determine it is eligible for higher than default replication. If a block has samples within this amount of time, it is considered recent and will be owned by more replicas. (default 25h0m0s)
-store-gateway.enabled-tenants comma-separated-list-of-strings
Comma separated list of tenants that can be loaded by the store-gateway. If specified, only blocks for these tenants will be loaded by the store-gateway, otherwise all tenants can be loaded. Subject to sharding.
-store-gateway.sharding-ring.auto-forget-enabled
When enabled, a store-gateway is automatically removed from the ring after failing to heartbeat the ring for a period longer than 10 times the configured -store-gateway.sharding-ring.heartbeat-timeout. (default true)
-store-gateway.sharding-ring.consul.acl-token string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4849,18 +4849,18 @@ sharding_ring:
# CLI flag: -store-gateway.sharding-ring.unregister-on-shutdown
[unregister_on_shutdown: <boolean> | default = true]
# Experimental expanded replication configuration.
expanded_replication:
# Experimental dynamic replication configuration.
dynamic_replication:
# (experimental) Use a higher number of replicas for recent blocks. Useful to
# spread query load more evenly at the cost of slightly higher disk usage.
# CLI flag: -store-gateway.expanded-replication.enabled
# CLI flag: -store-gateway.dynamic-replication.enabled
[enabled: <boolean> | default = false]
# (experimental) Threshold of the most recent sample in a block used to
# determine it is eligible for higher than default replication. If a block has
# samples within this amount of time, it is considered recent and will be
# owned by more replicas.
# CLI flag: -store-gateway.expanded-replication.max-time-threshold
# CLI flag: -store-gateway.dynamic-replication.max-time-threshold
[max_time_threshold: <duration> | default = 25h]
# (advanced) Comma separated list of tenants that can be loaded by the
Expand Down
10 changes: 5 additions & 5 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,17 +244,17 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
return nil, errors.Wrap(err, "failed to create store-gateway ring client")
}

var expandedReplication storegateway.ExpandedReplication = storegateway.NewNopExpandedReplication()
if gatewayCfg.ExpandedReplication.Enabled {
expandedReplication = storegateway.NewMaxTimeExpandedReplication(
gatewayCfg.ExpandedReplication.MaxTimeThreshold,
var dynamicReplication storegateway.DynamicReplication = storegateway.NewNopDynamicReplication()
if gatewayCfg.DynamicReplication.Enabled {
dynamicReplication = storegateway.NewMaxTimeDynamicReplication(
gatewayCfg.DynamicReplication.MaxTimeThreshold,
// Exclude blocks which have recently become eligible for expanded replication, in order to give
// enough time to store-gateways to discover and load them (3 times the sync interval)
mimir_tsdb.NewBlockDiscoveryDelayMultiplier*storageCfg.BucketStore.SyncInterval,
)
}

stores, err = newBlocksStoreReplicationSet(storesRing, randomLoadBalancing, expandedReplication, limits, querierCfg.StoreGatewayClient, logger, reg)
stores, err = newBlocksStoreReplicationSet(storesRing, randomLoadBalancing, dynamicReplication, limits, querierCfg.StoreGatewayClient, logger, reg)
if err != nil {
return nil, errors.Wrap(err, "failed to create store set")
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/querier/blocks_store_replicated_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ const (
type blocksStoreReplicationSet struct {
services.Service

storesRing *ring.Ring
clientsPool *client.Pool
balancingStrategy loadBalancingStrategy
expandedReplication storegateway.ExpandedReplication
limits BlocksStoreLimits
storesRing *ring.Ring
clientsPool *client.Pool
balancingStrategy loadBalancingStrategy
dynamicReplication storegateway.DynamicReplication
limits BlocksStoreLimits

// Subservices manager.
subservices *services.Manager
Expand All @@ -50,19 +50,19 @@ type blocksStoreReplicationSet struct {
func newBlocksStoreReplicationSet(
storesRing *ring.Ring,
balancingStrategy loadBalancingStrategy,
expandedReplication storegateway.ExpandedReplication,
dynamicReplication storegateway.DynamicReplication,
limits BlocksStoreLimits,
clientConfig ClientConfig,
logger log.Logger,
reg prometheus.Registerer,
) (*blocksStoreReplicationSet, error) {
s := &blocksStoreReplicationSet{
storesRing: storesRing,
clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg),
expandedReplication: expandedReplication,
balancingStrategy: balancingStrategy,
limits: limits,
subservicesWatcher: services.NewFailureWatcher(),
storesRing: storesRing,
clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg),
dynamicReplication: dynamicReplication,
balancingStrategy: balancingStrategy,
limits: limits,
subservicesWatcher: services.NewFailureWatcher(),
}

var err error
Expand Down Expand Up @@ -106,13 +106,13 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blocks bucketin
instances := make(map[string]ring.InstanceDesc)

userRing := storegateway.GetShuffleShardingSubring(s.storesRing, userID, s.limits)
expandedReplicationOption := ring.WithReplicationFactor(userRing.InstancesCount())
replicationOption := ring.WithReplicationFactor(userRing.InstancesCount())

// Find the replication set of each block we need to query.
for _, block := range blocks {
var ringOpts []ring.Option
if s.expandedReplication.EligibleForQuerying(block) {
ringOpts = append(ringOpts, expandedReplicationOption)
if s.dynamicReplication.EligibleForQuerying(block) {
ringOpts = append(ringOpts, replicationOption)
}

// Note that we don't pass buffers since we retain instances from the returned replication set.
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/blocks_store_replicated_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
}

reg := prometheus.NewPedanticRegistry()
s, err := newBlocksStoreReplicationSet(r, noLoadBalancing, storegateway.NewNopExpandedReplication(), limits, ClientConfig{}, log.NewNopLogger(), reg)
s, err := newBlocksStoreReplicationSet(r, noLoadBalancing, storegateway.NewNopDynamicReplication(), limits, ClientConfig{}, log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck
Expand Down Expand Up @@ -426,7 +426,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor_ShouldSupportRandomLoadBalancin

limits := &blocksStoreLimitsMock{storeGatewayTenantShardSize: 0}
reg := prometheus.NewPedanticRegistry()
s, err := newBlocksStoreReplicationSet(r, randomLoadBalancing, storegateway.NewNopExpandedReplication(), limits, ClientConfig{}, log.NewNopLogger(), reg)
s, err := newBlocksStoreReplicationSet(r, randomLoadBalancing, storegateway.NewNopDynamicReplication(), limits, ClientConfig{}, log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ var (
errInvalidExpandedReplicationMaxTimeThreshold = errors.New("invalid expanded replication max time threshold, the value must be at least one hour")
)

type ExpandedReplicationConfig struct {
type DynamicReplicationConfig struct {
Enabled bool `yaml:"enabled" category:"experimental"`
MaxTimeThreshold time.Duration `yaml:"max_time_threshold" category:"experimental"`
}

func (cfg *ExpandedReplicationConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.BoolVar(&cfg.Enabled, prefix+"expanded-replication.enabled", false, "Use a higher number of replicas for recent blocks. Useful to spread query load more evenly at the cost of slightly higher disk usage.")
f.DurationVar(&cfg.MaxTimeThreshold, prefix+"expanded-replication.max-time-threshold", 25*time.Hour, "Threshold of the most recent sample in a block used to determine it is eligible for higher than default replication. If a block has samples within this amount of time, it is considered recent and will be owned by more replicas.")
func (cfg *DynamicReplicationConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.BoolVar(&cfg.Enabled, prefix+"dynamic-replication.enabled", false, "Use a higher number of replicas for recent blocks. Useful to spread query load more evenly at the cost of slightly higher disk usage.")
f.DurationVar(&cfg.MaxTimeThreshold, prefix+"dynamic-replication.max-time-threshold", 25*time.Hour, "Threshold of the most recent sample in a block used to determine it is eligible for higher than default replication. If a block has samples within this amount of time, it is considered recent and will be owned by more replicas.")
}

func (cfg *ExpandedReplicationConfig) Validate() error {
func (cfg *DynamicReplicationConfig) Validate() error {
if cfg.Enabled && cfg.MaxTimeThreshold < time.Hour {
return errInvalidExpandedReplicationMaxTimeThreshold
}
Expand All @@ -38,9 +38,9 @@ type ReplicatedBlock interface {
GetMaxTime() time.Time
}

// ExpandedReplication determines if a TSDB block is eligible to be sync to and queried from more
// DynamicReplication determines if a TSDB block is eligible to be sync to and queried from more
// store-gateways than the configured replication factor based on metadata about the block.
type ExpandedReplication interface {
type DynamicReplication interface {
// EligibleForSync returns true if the block can be synced to more than the configured (via
// replication factor) number of store-gateways, false otherwise.
EligibleForSync(b ReplicatedBlock) bool
Expand All @@ -50,48 +50,48 @@ type ExpandedReplication interface {
EligibleForQuerying(b ReplicatedBlock) bool
}

func NewNopExpandedReplication() *NopExpandedReplication {
return &NopExpandedReplication{}
func NewNopDynamicReplication() *NopDynamicReplication {
return &NopDynamicReplication{}
}

// NopExpandedReplication is an ExpandedReplication implementation that always returns false.
type NopExpandedReplication struct{}
// NopDynamicReplication is an DynamicReplication implementation that always returns false.
type NopDynamicReplication struct{}

func (n NopExpandedReplication) EligibleForSync(ReplicatedBlock) bool {
func (n NopDynamicReplication) EligibleForSync(ReplicatedBlock) bool {
return false
}

func (n NopExpandedReplication) EligibleForQuerying(ReplicatedBlock) bool {
func (n NopDynamicReplication) EligibleForQuerying(ReplicatedBlock) bool {
return false
}

func NewMaxTimeExpandedReplication(maxTime time.Duration, gracePeriod time.Duration) *MaxTimeExpandedReplication {
return &MaxTimeExpandedReplication{
func NewMaxTimeDynamicReplication(maxTime time.Duration, gracePeriod time.Duration) *MaxTimeDynamicReplication {
return &MaxTimeDynamicReplication{
maxTime: maxTime,
gracePeriod: gracePeriod,
now: time.Now,
}
}

// MaxTimeExpandedReplication is an ExpandedReplication implementation that determines
// MaxTimeDynamicReplication is an DynamicReplication implementation that determines
// if a block is eligible for expanded replication based on how recent its MaxTime (most
// recent sample) is. An upload grace period can optionally be used to ensure that blocks
// are synced to store-gateways before they are expected to be available by queriers.
type MaxTimeExpandedReplication struct {
type MaxTimeDynamicReplication struct {
maxTime time.Duration
gracePeriod time.Duration
now func() time.Time
}

func (e *MaxTimeExpandedReplication) EligibleForSync(b ReplicatedBlock) bool {
func (e *MaxTimeDynamicReplication) EligibleForSync(b ReplicatedBlock) bool {
now := e.now()
maxTimeDelta := now.Sub(b.GetMaxTime())
// We start syncing blocks `gracePeriod` before they become eligible for querying to
// ensure that they've been loaded before queriers expect them to be available.
return maxTimeDelta <= (e.maxTime + e.gracePeriod)
}

func (e *MaxTimeExpandedReplication) EligibleForQuerying(b ReplicatedBlock) bool {
func (e *MaxTimeDynamicReplication) EligibleForQuerying(b ReplicatedBlock) bool {
now := e.now()
maxTimeDelta := now.Sub(b.GetMaxTime())
return maxTimeDelta <= e.maxTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestMaxTimeExpandedReplication(t *testing.T) {
// Round "now" to the nearest millisecond since we are using millisecond precision
// for min/max times for the blocks.
now := time.Now().Round(time.Millisecond)
replication := NewMaxTimeExpandedReplication(25*time.Hour, 45*time.Minute)
replication := NewMaxTimeDynamicReplication(25*time.Hour, 45*time.Minute)
replication.now = func() time.Time { return now }

type testCase struct {
Expand Down
18 changes: 9 additions & 9 deletions pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ var (

// Config holds the store gateway config.
type Config struct {
ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration."`
ExpandedReplication ExpandedReplicationConfig `yaml:"expanded_replication" doc:"description=Experimental expanded replication configuration." category:"experimental"`
ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration."`
DynamicReplication DynamicReplicationConfig `yaml:"dynamic_replication" doc:"description=Experimental dynamic replication configuration." category:"experimental"`

EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants" category:"advanced"`
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants" category:"advanced"`
Expand All @@ -63,7 +63,7 @@ type Config struct {
// RegisterFlags registers the Config flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
cfg.ShardingRing.RegisterFlags(f, logger)
cfg.ExpandedReplication.RegisterFlagsWithPrefix(f, "store-gateway.")
cfg.DynamicReplication.RegisterFlagsWithPrefix(f, "store-gateway.")

f.Var(&cfg.EnabledTenants, "store-gateway.enabled-tenants", "Comma separated list of tenants that can be loaded by the store-gateway. If specified, only blocks for these tenants will be loaded by the store-gateway, otherwise all tenants can be loaded. Subject to sharding.")
f.Var(&cfg.DisabledTenants, "store-gateway.disabled-tenants", "Comma separated list of tenants that cannot be loaded by the store-gateway. If specified, and the store-gateway would normally load a given tenant for (via -store-gateway.enabled-tenants or sharding), it will be ignored instead.")
Expand All @@ -75,7 +75,7 @@ func (cfg *Config) Validate(limits validation.Limits) error {
return errInvalidTenantShardSize
}

if err := cfg.ExpandedReplication.Validate(); err != nil {
if err := cfg.DynamicReplication.Validate(); err != nil {
return err
}

Expand Down Expand Up @@ -179,17 +179,17 @@ func newStoreGateway(gatewayCfg Config, storageCfg mimir_tsdb.BlocksStorageConfi
return nil, errors.Wrap(err, "create ring client")
}

var expandedReplication ExpandedReplication = NewNopExpandedReplication()
if gatewayCfg.ExpandedReplication.Enabled {
expandedReplication = NewMaxTimeExpandedReplication(
gatewayCfg.ExpandedReplication.MaxTimeThreshold,
var dynamicReplication DynamicReplication = NewNopDynamicReplication()
if gatewayCfg.DynamicReplication.Enabled {
dynamicReplication = NewMaxTimeDynamicReplication(
gatewayCfg.DynamicReplication.MaxTimeThreshold,
// Exclude blocks which have recently become eligible for expanded replication, in order to give
// enough time to store-gateways to discover and load them (3 times the sync interval)
mimir_tsdb.NewBlockDiscoveryDelayMultiplier*storageCfg.BucketStore.SyncInterval,
)
}

shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, expandedReplication, limits, logger)
shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, dynamicReplication, limits, logger)

allowedTenants := util.NewAllowedTenants(gatewayCfg.EnabledTenants, gatewayCfg.DisabledTenants)
if len(gatewayCfg.EnabledTenants) > 0 {
Expand Down
Loading

0 comments on commit 5aa26f6

Please sign in to comment.