From ba79bcf614b215d3a4f45cff42aaa863816c003a Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Wed, 7 Jul 2021 11:43:47 +0200 Subject: [PATCH 1/2] Allow disabling of ring heartbeats by setting relevant options to zero. Signed-off-by: Steve Simpson --- CHANGELOG.md | 7 ++++++ docs/blocks-storage/compactor.md | 2 +- docs/blocks-storage/store-gateway.md | 2 +- docs/configuration/config-file-reference.md | 12 ++++----- pkg/alertmanager/alertmanager_ring.go | 2 +- pkg/compactor/compactor_ring.go | 2 +- pkg/distributor/distributor_ring.go | 2 +- pkg/ring/basic_lifecycler.go | 19 +++++++------- pkg/ring/lifecycler.go | 15 +++++------ pkg/ruler/ruler_ring.go | 2 +- pkg/storegateway/gateway_ring.go | 2 +- pkg/util/time.go | 11 ++++++++ pkg/util/time_test.go | 28 +++++++++++++++++++++ 13 files changed, 77 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 804b58761c..295151ca9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,13 @@ * `-alertmanager.sharding-ring.heartbeat-timeout` * `-compactor.ring.heartbeat-timeout` * `-store-gateway.sharding-ring.heartbeat-timeout` +* [ENHANCEMENT] Ring: allow heartbeats to be explicitly disabled by setting the interval to zero. This is considered experimental. This applies to the following configuration options: #4344 + * `-distributor.ring.heartbeat-period` + * `-ingester.heartbeat-period` + * `-ruler.ring.heartbeat-period` + * `-alertmanager.sharding-ring.heartbeat-period` + * `-compactor.ring.heartbeat-period` + * `-store-gateway.sharding-ring.heartbeat-period` * [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336 ## 1.10.0-rc.0 / 2021-06-28 diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 5a55cabb7c..ca30832d93 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -209,7 +209,7 @@ compactor: # CLI flag: -compactor.ring.multi.mirror-timeout [mirror_timeout: | default = 2s] - # Period at which to heartbeat to the ring. + # Period at which to heartbeat to the ring. 0 = disabled. # CLI flag: -compactor.ring.heartbeat-period [heartbeat_period: | default = 5s] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 3cd768fea2..a00297a445 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -232,7 +232,7 @@ store_gateway: # CLI flag: -store-gateway.sharding-ring.multi.mirror-timeout [mirror_timeout: | default = 2s] - # Period at which to heartbeat to the ring. + # Period at which to heartbeat to the ring. 0 = disabled. # CLI flag: -store-gateway.sharding-ring.heartbeat-period [heartbeat_period: | default = 15s] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index c034a7cdb5..b8892152ea 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -563,7 +563,7 @@ ring: # CLI flag: -distributor.ring.multi.mirror-timeout [mirror_timeout: | default = 2s] - # Period at which to heartbeat to the ring. + # Period at which to heartbeat to the ring. 0 = disabled. # CLI flag: -distributor.ring.heartbeat-period [heartbeat_period: | default = 5s] @@ -679,7 +679,7 @@ lifecycler: # CLI flag: -ingester.num-tokens [num_tokens: | default = 128] - # Period at which to heartbeat to consul. + # Period at which to heartbeat to consul. 0 = disabled. # CLI flag: -ingester.heartbeat-period [heartbeat_period: | default = 5s] @@ -1581,7 +1581,7 @@ ring: # CLI flag: -ruler.ring.multi.mirror-timeout [mirror_timeout: | default = 2s] - # Period at which to heartbeat to the ring. + # Period at which to heartbeat to the ring. 0 = disabled. # CLI flag: -ruler.ring.heartbeat-period [heartbeat_period: | default = 5s] @@ -1907,7 +1907,7 @@ sharding_ring: # CLI flag: -alertmanager.sharding-ring.multi.mirror-timeout [mirror_timeout: | default = 2s] - # Period at which to heartbeat to the ring. + # Period at which to heartbeat to the ring. 0 = disabled. # CLI flag: -alertmanager.sharding-ring.heartbeat-period [heartbeat_period: | default = 15s] @@ -5179,7 +5179,7 @@ sharding_ring: # CLI flag: -compactor.ring.multi.mirror-timeout [mirror_timeout: | default = 2s] - # Period at which to heartbeat to the ring. + # Period at which to heartbeat to the ring. 0 = disabled. # CLI flag: -compactor.ring.heartbeat-period [heartbeat_period: | default = 5s] @@ -5257,7 +5257,7 @@ sharding_ring: # CLI flag: -store-gateway.sharding-ring.multi.mirror-timeout [mirror_timeout: | default = 2s] - # Period at which to heartbeat to the ring. + # Period at which to heartbeat to the ring. 0 = disabled. # CLI flag: -store-gateway.sharding-ring.heartbeat-period [heartbeat_period: | default = 15s] diff --git a/pkg/alertmanager/alertmanager_ring.go b/pkg/alertmanager/alertmanager_ring.go index f0b707a090..0c60d0f69c 100644 --- a/pkg/alertmanager/alertmanager_ring.go +++ b/pkg/alertmanager/alertmanager_ring.go @@ -76,7 +76,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { // Ring flags cfg.KVStore.RegisterFlagsWithPrefix(rfprefix, "alertmanagers/", f) - f.DurationVar(&cfg.HeartbeatPeriod, rfprefix+"heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring.") + f.DurationVar(&cfg.HeartbeatPeriod, rfprefix+"heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.") f.DurationVar(&cfg.HeartbeatTimeout, rfprefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which alertmanagers are considered unhealthy within the ring. 0 = never (timeout disabled).") f.IntVar(&cfg.ReplicationFactor, rfprefix+"replication-factor", 3, "The replication factor to use when sharding the alertmanager.") f.BoolVar(&cfg.ZoneAwarenessEnabled, rfprefix+"zone-awareness-enabled", false, "True to enable zone-awareness and replicate alerts across different availability zones.") diff --git a/pkg/compactor/compactor_ring.go b/pkg/compactor/compactor_ring.go index d384220a32..285a1fa6bc 100644 --- a/pkg/compactor/compactor_ring.go +++ b/pkg/compactor/compactor_ring.go @@ -50,7 +50,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { // Ring flags cfg.KVStore.RegisterFlagsWithPrefix("compactor.ring.", "collectors/", f) - f.DurationVar(&cfg.HeartbeatPeriod, "compactor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring.") + f.DurationVar(&cfg.HeartbeatPeriod, "compactor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.") f.DurationVar(&cfg.HeartbeatTimeout, "compactor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which compactors are considered unhealthy within the ring. 0 = never (timeout disabled).") // Wait stability flags. diff --git a/pkg/distributor/distributor_ring.go b/pkg/distributor/distributor_ring.go index ce70464e34..3655aa9985 100644 --- a/pkg/distributor/distributor_ring.go +++ b/pkg/distributor/distributor_ring.go @@ -42,7 +42,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { // Ring flags cfg.KVStore.RegisterFlagsWithPrefix("distributor.ring.", "collectors/", f) - f.DurationVar(&cfg.HeartbeatPeriod, "distributor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring.") + f.DurationVar(&cfg.HeartbeatPeriod, "distributor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.") f.DurationVar(&cfg.HeartbeatTimeout, "distributor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which distributors are considered unhealthy within the ring. 0 = never (timeout disabled).") // Instance flags diff --git a/pkg/ring/basic_lifecycler.go b/pkg/ring/basic_lifecycler.go index 6c19d72c28..64f5f6002c 100644 --- a/pkg/ring/basic_lifecycler.go +++ b/pkg/ring/basic_lifecycler.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/util" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -182,12 +183,12 @@ func (l *BasicLifecycler) starting(ctx context.Context) error { } func (l *BasicLifecycler) running(ctx context.Context) error { - heartbeatTicker := time.NewTicker(l.cfg.HeartbeatPeriod) - defer heartbeatTicker.Stop() + heartbeatTickerStop, heartbeatTickerChan := util.NewDisableableTicker(l.cfg.HeartbeatPeriod) + defer heartbeatTickerStop() for { select { - case <-heartbeatTicker.C: + case <-heartbeatTickerChan: l.heartbeat(ctx) case f := <-l.actorChan: @@ -214,13 +215,13 @@ func (l *BasicLifecycler) stopping(runningError error) error { }() // Heartbeat while the stopping delegate function is running. - heartbeatTicker := time.NewTicker(l.cfg.HeartbeatPeriod) - defer heartbeatTicker.Stop() + heartbeatTickerStop, heartbeatTickerChan := util.NewDisableableTicker(l.cfg.HeartbeatPeriod) + defer heartbeatTickerStop() heartbeatLoop: for { select { - case <-heartbeatTicker.C: + case <-heartbeatTickerChan: l.heartbeat(context.Background()) case <-done: break heartbeatLoop @@ -292,8 +293,8 @@ func (l *BasicLifecycler) registerInstance(ctx context.Context) error { } func (l *BasicLifecycler) waitStableTokens(ctx context.Context, period time.Duration) error { - heartbeatTicker := time.NewTicker(l.cfg.HeartbeatPeriod) - defer heartbeatTicker.Stop() + heartbeatTickerStop, heartbeatTickerChan := util.NewDisableableTicker(l.cfg.HeartbeatPeriod) + defer heartbeatTickerStop() // The first observation will occur after the specified period. level.Info(l.logger).Log("msg", "waiting stable tokens", "ring", l.ringName) @@ -312,7 +313,7 @@ func (l *BasicLifecycler) waitStableTokens(ctx context.Context, period time.Dura level.Info(l.logger).Log("msg", "tokens verification succeeded", "ring", l.ringName) return nil - case <-heartbeatTicker.C: + case <-heartbeatTickerChan: l.heartbeat(ctx) case <-ctx.Done(): diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 73584ea0a2..37897b70ba 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -17,6 +17,7 @@ import ( "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" @@ -83,7 +84,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag } f.IntVar(&cfg.NumTokens, prefix+"num-tokens", 128, "Number of tokens for each ingester.") - f.DurationVar(&cfg.HeartbeatPeriod, prefix+"heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul.") + f.DurationVar(&cfg.HeartbeatPeriod, prefix+"heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul. 0 = disabled.") f.DurationVar(&cfg.JoinAfter, prefix+"join-after", 0*time.Second, "Period to wait for a claim from another member; will join automatically after this.") f.DurationVar(&cfg.ObservePeriod, prefix+"observe-period", 0*time.Second, "Observe tokens after generating to resolve collisions. Useful when using gossiping ring.") f.DurationVar(&cfg.MinReadyDuration, prefix+"min-ready-duration", 1*time.Minute, "Minimum duration to wait before becoming ready. This is to work around race conditions with ingesters exiting and updating the ring.") @@ -392,8 +393,8 @@ func (i *Lifecycler) loop(ctx context.Context) error { autoJoinAfter := time.After(i.cfg.JoinAfter) var observeChan <-chan time.Time = nil - heartbeatTicker := time.NewTicker(i.cfg.HeartbeatPeriod) - defer heartbeatTicker.Stop() + heartbeatTickerStop, heartbeatTickerChan := util.NewDisableableTicker(i.cfg.HeartbeatPeriod) + defer heartbeatTickerStop() for { select { @@ -442,7 +443,7 @@ func (i *Lifecycler) loop(ctx context.Context) error { observeChan = time.After(i.cfg.ObservePeriod) } - case <-heartbeatTicker.C: + case <-heartbeatTickerChan: consulHeartbeats.WithLabelValues(i.RingName).Inc() if err := i.updateConsul(context.Background()); err != nil { level.Error(log.Logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err) @@ -469,8 +470,8 @@ func (i *Lifecycler) stopping(runningError error) error { return nil } - heartbeatTicker := time.NewTicker(i.cfg.HeartbeatPeriod) - defer heartbeatTicker.Stop() + heartbeatTickerStop, heartbeatTickerChan := util.NewDisableableTicker(i.cfg.HeartbeatPeriod) + defer heartbeatTickerStop() // Mark ourselved as Leaving so no more samples are send to us. err := i.changeState(context.Background(), LEAVING) @@ -489,7 +490,7 @@ func (i *Lifecycler) stopping(runningError error) error { heartbeatLoop: for { select { - case <-heartbeatTicker.C: + case <-heartbeatTickerChan: consulHeartbeats.WithLabelValues(i.RingName).Inc() if err := i.updateConsul(context.Background()); err != nil { level.Error(log.Logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err) diff --git a/pkg/ruler/ruler_ring.go b/pkg/ruler/ruler_ring.go index a3902ed1c4..e8577e8289 100644 --- a/pkg/ruler/ruler_ring.go +++ b/pkg/ruler/ruler_ring.go @@ -56,7 +56,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { // Ring flags cfg.KVStore.RegisterFlagsWithPrefix("ruler.ring.", "rulers/", f) - f.DurationVar(&cfg.HeartbeatPeriod, "ruler.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring.") + f.DurationVar(&cfg.HeartbeatPeriod, "ruler.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.") f.DurationVar(&cfg.HeartbeatTimeout, "ruler.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which rulers are considered unhealthy within the ring. 0 = never (timeout disabled).") // Instance flags diff --git a/pkg/storegateway/gateway_ring.go b/pkg/storegateway/gateway_ring.go index f32bf37ccd..c868c2b789 100644 --- a/pkg/storegateway/gateway_ring.go +++ b/pkg/storegateway/gateway_ring.go @@ -94,7 +94,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { // Ring flags cfg.KVStore.RegisterFlagsWithPrefix(ringFlagsPrefix, "collectors/", f) - f.DurationVar(&cfg.HeartbeatPeriod, ringFlagsPrefix+"heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring.") + f.DurationVar(&cfg.HeartbeatPeriod, ringFlagsPrefix+"heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.") f.DurationVar(&cfg.HeartbeatTimeout, ringFlagsPrefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which store gateways are considered unhealthy within the ring. 0 = never (timeout disabled)."+sharedOptionWithQuerier) f.IntVar(&cfg.ReplicationFactor, ringFlagsPrefix+"replication-factor", 3, "The replication factor to use when sharding blocks."+sharedOptionWithQuerier) f.StringVar(&cfg.TokensFilePath, ringFlagsPrefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") diff --git a/pkg/util/time.go b/pkg/util/time.go index 58f951a8b3..8816b1d7d2 100644 --- a/pkg/util/time.go +++ b/pkg/util/time.go @@ -73,3 +73,14 @@ func DurationWithPositiveJitter(input time.Duration, variancePerc float64) time. return input + time.Duration(jitter) } + +// NewDisableableTicker essentially wraps NewTicker but allows the ticker to be disabled by passing +// zero duration as the interval. Returns a function for stopping the ticker, and the ticker channel. +func NewDisableableTicker(interval time.Duration) (func(), <-chan time.Time) { + if interval == 0 { + return func() {}, nil + } + + tick := time.NewTicker(interval) + return func() { tick.Stop() }, tick.C +} diff --git a/pkg/util/time_test.go b/pkg/util/time_test.go index 700fe55fab..ab1da4a85b 100644 --- a/pkg/util/time_test.go +++ b/pkg/util/time_test.go @@ -103,3 +103,31 @@ func TestParseTime(t *testing.T) { assert.Equal(t, TimeToMillis(test.result), ts) } } + +func TestNewDisableableTicker_Enabled(t *testing.T) { + stop, ch := NewDisableableTicker(10 * time.Millisecond) + defer stop() + + time.Sleep(100 * time.Millisecond) + + select { + case <-ch: + break + default: + t.Error("ticker should have ticked when enabled") + } +} + +func TestNewDisableableTicker_Disabled(t *testing.T) { + stop, ch := NewDisableableTicker(0) + defer stop() + + time.Sleep(100 * time.Millisecond) + + select { + case <-ch: + t.Error("ticker should not have ticked when disabled") + default: + break + } +} From 010ff2f818bca7f1593798592457531deef5567a Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Thu, 8 Jul 2021 13:19:03 +0200 Subject: [PATCH 2/2] Review comments. Signed-off-by: Steve Simpson --- docs/configuration/v1-guarantees.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 36e39221ff..3af5dd4cd1 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -87,4 +87,11 @@ Currently experimental features are: - `-ruler.ring.heartbeat-timeout=0` - `-alertmanager.sharding-ring.heartbeat-timeout=0` - `-compactor.ring.heartbeat-timeout=0` - - `-store-gateway.sharding-ring.heartbeat-timeout=0` \ No newline at end of file + - `-store-gateway.sharding-ring.heartbeat-timeout=0` +- Disabling ring heartbeats + - `-distributor.ring.heartbeat-period=0` + - `-ingester.heartbeat-period=0` + - `-ruler.ring.heartbeat-period=0` + - `-alertmanager.sharding-ring.heartbeat-period=0` + - `-compactor.ring.heartbeat-period=0` + - `-store-gateway.sharding-ring.heartbeat-period=0`