Skip to content

Commit

Permalink
Allow setting ring heartbeat timeout to zero to disable timeout check. (
Browse files Browse the repository at this point in the history
#4342)

* Allow setting ring heartbeat timeout to zero to disable timeout check.

This change allows the various ring heartbeat timeouts to be configured
with zero, as a means of disabling the timeout. This is expected to be
used with a separate enhancement to allow disabling heartbeats. When the
heartbeat timeout is disabled, instances will always appear as healthy
in the ring.

Signed-off-by: Steve Simpson <[email protected]>

* Review comments.

Signed-off-by: Steve Simpson <[email protected]>
  • Loading branch information
stevesg authored Jul 9, 2021
1 parent f8b08a3 commit ed62246
Show file tree
Hide file tree
Showing 14 changed files with 70 additions and 19 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260
* [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262
* [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341
* [ENHANCEMENT] Ring: allow experimental configuration of disabling of heartbeat timeouts by setting the relevant configuration value to zero. Applies to the following: #4342
* `-distributor.ring.heartbeat-timeout`
* `-ring.heartbeat-timeout`
* `-ruler.ring.heartbeat-timeout`
* `-alertmanager.sharding-ring.heartbeat-timeout`
* `-compactor.ring.heartbeat-timeout`
* `-store-gateway.sharding-ring.heartbeat-timeout`
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336

## 1.10.0-rc.0 / 2021-06-28
Expand Down
2 changes: 1 addition & 1 deletion docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ compactor:
[heartbeat_period: <duration> | default = 5s]

# The heartbeat timeout after which compactors are considered unhealthy
# within the ring.
# within the ring. 0 = never (timeout disabled).
# CLI flag: -compactor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

Expand Down
4 changes: 2 additions & 2 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ store_gateway:
[heartbeat_period: <duration> | default = 15s]

# The heartbeat timeout after which store gateways are considered unhealthy
# within the ring. This option needs be set both on the store-gateway and
# querier when running in microservices mode.
# within the ring. 0 = never (timeout disabled). This option needs be set
# both on the store-gateway and querier when running in microservices mode.
# CLI flag: -store-gateway.sharding-ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

Expand Down
13 changes: 7 additions & 6 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ ring:
[heartbeat_period: <duration> | default = 5s]
# The heartbeat timeout after which distributors are considered unhealthy
# within the ring.
# within the ring. 0 = never (timeout disabled).
# CLI flag: -distributor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
Expand Down Expand Up @@ -662,6 +662,7 @@ lifecycler:
[mirror_timeout: <duration> | default = 2s]
# The heartbeat timeout after which ingesters are skipped for reads/writes.
# 0 = never (timeout disabled).
# CLI flag: -ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
Expand Down Expand Up @@ -1585,7 +1586,7 @@ ring:
[heartbeat_period: <duration> | default = 5s]

# The heartbeat timeout after which rulers are considered unhealthy within the
# ring.
# ring. 0 = never (timeout disabled).
# CLI flag: -ruler.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

Expand Down Expand Up @@ -1906,7 +1907,7 @@ sharding_ring:
[heartbeat_period: <duration> | default = 15s]
# The heartbeat timeout after which alertmanagers are considered unhealthy
# within the ring.
# within the ring. 0 = never (timeout disabled).
# CLI flag: -alertmanager.sharding-ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
Expand Down Expand Up @@ -5178,7 +5179,7 @@ sharding_ring:
[heartbeat_period: <duration> | default = 5s]
# The heartbeat timeout after which compactors are considered unhealthy within
# the ring.
# the ring. 0 = never (timeout disabled).
# CLI flag: -compactor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
Expand Down Expand Up @@ -5256,8 +5257,8 @@ sharding_ring:
[heartbeat_period: <duration> | default = 15s]
# The heartbeat timeout after which store gateways are considered unhealthy
# within the ring. This option needs be set both on the store-gateway and
# querier when running in microservices mode.
# within the ring. 0 = never (timeout disabled). This option needs be set both
# on the store-gateway and querier when running in microservices mode.
# CLI flag: -store-gateway.sharding-ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,10 @@ Currently experimental features are:
- user config size (`-alertmanager.max-config-size-bytes`)
- templates count in user config (`-alertmanager.max-templates-count`)
- max template size (`-alertmanager.max-template-size-bytes`)
- Disabling ring heartbeat timeouts
- `-distributor.ring.heartbeat-timeout=0`
- `-ring.heartbeat-timeout=0`
- `-ruler.ring.heartbeat-timeout=0`
- `-alertmanager.sharding-ring.heartbeat-timeout=0`
- `-compactor.ring.heartbeat-timeout=0`
- `-store-gateway.sharding-ring.heartbeat-timeout=0`
2 changes: 1 addition & 1 deletion pkg/alertmanager/alertmanager_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix(rfprefix, "alertmanagers/", f)
f.DurationVar(&cfg.HeartbeatPeriod, rfprefix+"heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, rfprefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which alertmanagers are considered unhealthy within the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, rfprefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which alertmanagers are considered unhealthy within the ring. 0 = never (timeout disabled).")
f.IntVar(&cfg.ReplicationFactor, rfprefix+"replication-factor", 3, "The replication factor to use when sharding the alertmanager.")
f.BoolVar(&cfg.ZoneAwarenessEnabled, rfprefix+"zone-awareness-enabled", false, "True to enable zone-awareness and replicate alerts across different availability zones.")

Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix("compactor.ring.", "collectors/", f)
f.DurationVar(&cfg.HeartbeatPeriod, "compactor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "compactor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which compactors are considered unhealthy within the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "compactor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which compactors are considered unhealthy within the ring. 0 = never (timeout disabled).")

// Wait stability flags.
f.DurationVar(&cfg.WaitStabilityMinDuration, "compactor.ring.wait-stability-min-duration", time.Minute, "Minimum time to wait for ring stability at startup. 0 to disable.")
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix("distributor.ring.", "collectors/", f)
f.DurationVar(&cfg.HeartbeatPeriod, "distributor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "distributor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which distributors are considered unhealthy within the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "distributor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which distributors are considered unhealthy within the ring. 0 = never (timeout disabled).")

// Instance flags
cfg.InstanceInterfaceNames = []string{"eth0", "en0"}
Expand Down
13 changes: 11 additions & 2 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (d *Desc) FindIngestersByState(state InstanceState) []InstanceDesc {
func (d *Desc) Ready(now time.Time, heartbeatTimeout time.Duration) error {
numTokens := 0
for id, ingester := range d.Ingesters {
if now.Sub(time.Unix(ingester.Timestamp, 0)) > heartbeatTimeout {
if !ingester.IsHeartbeatHealthy(heartbeatTimeout, now) {
return fmt.Errorf("instance %s past heartbeat timeout", id)
} else if ingester.State != ACTIVE {
return fmt.Errorf("instance %s in state %v", id, ingester.State)
Expand Down Expand Up @@ -136,7 +136,16 @@ func (i *InstanceDesc) GetRegisteredAt() time.Time {
func (i *InstanceDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration, now time.Time) bool {
healthy := op.IsInstanceInStateHealthy(i.State)

return healthy && now.Unix()-i.Timestamp <= heartbeatTimeout.Milliseconds()/1000
return healthy && i.IsHeartbeatHealthy(heartbeatTimeout, now)
}

// IsHeartbeatHealthy returns whether the heartbeat timestamp for the ingester is within the
// specified timeout period. A timeout of zero disables the timeout; the heartbeat is ignored.
func (i *InstanceDesc) IsHeartbeatHealthy(heartbeatTimeout time.Duration, now time.Time) bool {
if heartbeatTimeout == 0 {
return true
}
return now.Sub(time.Unix(i.Timestamp, 0)) <= heartbeatTimeout
}

// Merge merges other ring into this one. Returns sub-ring that represents the change,
Expand Down
8 changes: 8 additions & 0 deletions pkg/ring/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,18 @@ func TestDesc_Ready(t *testing.T) {
t.Fatal("expected ready, got", err)
}

if err := r.Ready(now, 0); err != nil {
t.Fatal("expected ready, got", err)
}

if err := r.Ready(now.Add(5*time.Minute), 10*time.Second); err == nil {
t.Fatal("expected !ready (no heartbeat from active ingester), but got no error")
}

if err := r.Ready(now.Add(5*time.Minute), 0); err != nil {
t.Fatal("expected ready (no heartbeat but timeout disabled), got", err)
}

r = &Desc{
Ingesters: map[string]InstanceDesc{
"ing1": {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.KVStore.RegisterFlagsWithPrefix(prefix, "collectors/", f)

f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.")
f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes. 0 = never (timeout disabled).")
f.IntVar(&cfg.ReplicationFactor, prefix+"distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
f.BoolVar(&cfg.ZoneAwarenessEnabled, prefix+"distributor.zone-awareness-enabled", false, "True to enable the zone-awareness and replicate ingested samples across different availability zones.")
}
Expand Down
23 changes: 21 additions & 2 deletions pkg/ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,11 @@ func TestRing_GetAllHealthy(t *testing.T) {
}

func TestRing_GetReplicationSetForOperation(t *testing.T) {
const heartbeatTimeout = time.Minute
now := time.Now()

tests := map[string]struct {
ringInstances map[string]InstanceDesc
ringHeartbeatTimeout time.Duration
ringReplicationFactor int
expectedErrForRead error
expectedSetForRead []string
Expand All @@ -405,6 +405,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {
}{
"should return error on empty ring": {
ringInstances: nil,
ringHeartbeatTimeout: time.Minute,
ringReplicationFactor: 1,
expectedErrForRead: ErrEmptyRing,
expectedErrForWrite: ErrEmptyRing,
Expand All @@ -418,6 +419,21 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {
"instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-30 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-40 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)},
},
ringHeartbeatTimeout: time.Minute,
ringReplicationFactor: 1,
expectedSetForRead: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
expectedSetForWrite: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
expectedSetForReporting: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
},
"should succeed on instances with old timestamps but heartbeat timeout disabled": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-2": {Addr: "127.0.0.2", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-3": {Addr: "127.0.0.3", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
},
ringHeartbeatTimeout: 0,
ringReplicationFactor: 1,
expectedSetForRead: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
expectedSetForWrite: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
Expand All @@ -431,6 +447,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {
"instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-30 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
},
ringHeartbeatTimeout: time.Minute,
ringReplicationFactor: 1,
expectedErrForRead: ErrTooManyUnhealthyInstances,
expectedErrForWrite: ErrTooManyUnhealthyInstances,
Expand All @@ -444,6 +461,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {
"instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-30 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
},
ringHeartbeatTimeout: time.Minute,
ringReplicationFactor: 3,
expectedSetForRead: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4"},
expectedSetForWrite: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4"},
Expand All @@ -457,6 +475,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {
"instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
},
ringHeartbeatTimeout: time.Minute,
ringReplicationFactor: 3,
expectedErrForRead: ErrTooManyUnhealthyInstances,
expectedErrForWrite: ErrTooManyUnhealthyInstances,
Expand All @@ -474,7 +493,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {

ring := Ring{
cfg: Config{
HeartbeatTimeout: heartbeatTimeout,
HeartbeatTimeout: testData.ringHeartbeatTimeout,
ReplicationFactor: testData.ringReplicationFactor,
},
ringDesc: ringDesc,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/ruler_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix("ruler.ring.", "rulers/", f)
f.DurationVar(&cfg.HeartbeatPeriod, "ruler.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "ruler.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which rulers are considered unhealthy within the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "ruler.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which rulers are considered unhealthy within the ring. 0 = never (timeout disabled).")

// Instance flags
cfg.InstanceInterfaceNames = []string{"eth0", "en0"}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/gateway_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix(ringFlagsPrefix, "collectors/", f)
f.DurationVar(&cfg.HeartbeatPeriod, ringFlagsPrefix+"heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, ringFlagsPrefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which store gateways are considered unhealthy within the ring."+sharedOptionWithQuerier)
f.DurationVar(&cfg.HeartbeatTimeout, ringFlagsPrefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which store gateways are considered unhealthy within the ring. 0 = never (timeout disabled)."+sharedOptionWithQuerier)
f.IntVar(&cfg.ReplicationFactor, ringFlagsPrefix+"replication-factor", 3, "The replication factor to use when sharding blocks."+sharedOptionWithQuerier)
f.StringVar(&cfg.TokensFilePath, ringFlagsPrefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")
f.BoolVar(&cfg.ZoneAwarenessEnabled, ringFlagsPrefix+"zone-awareness-enabled", false, "True to enable zone-awareness and replicate blocks across different availability zones.")
Expand Down

0 comments on commit ed62246

Please sign in to comment.