From 11761b271fe12f533d14ddc8d3571eb851fcfa1f Mon Sep 17 00:00:00 2001 From: David Grant Date: Thu, 3 Oct 2024 14:39:06 -0700 Subject: [PATCH 01/27] Ring: Add a buffering interval to observed WatchKey updates. --- ring/ring.go | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/ring/ring.go b/ring/ring.go index c8db7da50..cdb83668b 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -18,6 +18,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "go.uber.org/atomic" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/internal/slices" @@ -143,6 +144,7 @@ var ( type Config struct { KVStore kv.Config `yaml:"kvstore"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout" category:"advanced"` + UpdateInterval time.Duration `yaml:"update_interval" category:"advanced"` ReplicationFactor int `yaml:"replication_factor"` ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` ExcludedZones flagext.StringSliceCSV `yaml:"excluded_zones" category:"advanced"` @@ -162,6 +164,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.KVStore.RegisterFlagsWithPrefix(prefix, "collectors/", f) f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes. 0 = never (timeout disabled).") + f.DurationVar(&cfg.UpdateInterval, prefix+"ring.update-interval", 250*time.Millisecond, "How often to recompute ring state when a change is detected from the KVStore.") f.IntVar(&cfg.ReplicationFactor, prefix+"distributor.replication-factor", 3, "The number of ingesters to write to and read from.") f.BoolVar(&cfg.ZoneAwarenessEnabled, prefix+"distributor.zone-awareness-enabled", false, "True to enable the zone-awareness and replicate ingested samples across different availability zones.") f.Var(&cfg.ExcludedZones, prefix+"distributor.excluded-zones", "Comma-separated list of zones to exclude from the ring. Instances in excluded zones will be filtered out from the ring.") @@ -324,18 +327,38 @@ func (r *Ring) loop(ctx context.Context) error { r.updateRingMetrics() r.mtx.Unlock() + // Debounce WatchKey updates, as they can be frequent enough to cause lock contention. + var newVal atomic.Pointer[Desc] + go r.processKVUpdates(ctx, &newVal) + + // (WatchKey blocks until our ctx is done.) r.KVClient.WatchKey(ctx, r.key, func(value interface{}) bool { if value == nil { level.Info(r.logger).Log("msg", "ring doesn't exist in KV store yet") return true } - r.updateRingState(value.(*Desc)) + newVal.Store(value.(*Desc)) return true }) + return nil } +func (r *Ring) processKVUpdates(ctx context.Context, newVal *atomic.Pointer[Desc]) { + t := time.NewTicker(r.cfg.UpdateInterval) + defer t.Stop() + + select { + case <-t.C: + if value := newVal.Swap(nil); value != nil { + r.updateRingState(value) + } + case <-ctx.Done(): + return + } +} + func (r *Ring) updateRingState(ringDesc *Desc) { r.mtx.RLock() prevRing := r.ringDesc From be934fe251aef8e8d4c8af26b10baed434861338 Mon Sep 17 00:00:00 2001 From: David Grant Date: Thu, 3 Oct 2024 14:41:20 -0700 Subject: [PATCH 02/27] Don't just update once. --- ring/ring.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/ring/ring.go b/ring/ring.go index cdb83668b..6fca53aae 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -349,13 +349,15 @@ func (r *Ring) processKVUpdates(ctx context.Context, newVal *atomic.Pointer[Desc t := time.NewTicker(r.cfg.UpdateInterval) defer t.Stop() - select { - case <-t.C: - if value := newVal.Swap(nil); value != nil { - r.updateRingState(value) + for { + select { + case <-t.C: + if value := newVal.Swap(nil); value != nil { + r.updateRingState(value) + } + case <-ctx.Done(): + return } - case <-ctx.Done(): - return } } From c1c8d209818840e71a968e15e42b894aae90ab48 Mon Sep 17 00:00:00 2001 From: David Grant Date: Thu, 3 Oct 2024 14:49:49 -0700 Subject: [PATCH 03/27] Tuck periodic updater into loop method. --- ring/ring.go | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/ring/ring.go b/ring/ring.go index 6fca53aae..524068e41 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -327,11 +327,27 @@ func (r *Ring) loop(ctx context.Context) error { r.updateRingMetrics() r.mtx.Unlock() - // Debounce WatchKey updates, as they can be frequent enough to cause lock contention. var newVal atomic.Pointer[Desc] - go r.processKVUpdates(ctx, &newVal) - // (WatchKey blocks until our ctx is done.) + // Debounce WatchKey updates, as they can be frequent enough to cause lock + // contention. The most recent update is the one we'll use when we + // periodically update the ring. + + go func() { + t := time.NewTicker(r.cfg.UpdateInterval) + defer t.Stop() + for { + select { + case <-t.C: + if value := newVal.Swap(nil); value != nil { + r.updateRingState(value) + } + case <-ctx.Done(): + return + } + } + }() + r.KVClient.WatchKey(ctx, r.key, func(value interface{}) bool { if value == nil { level.Info(r.logger).Log("msg", "ring doesn't exist in KV store yet") @@ -345,22 +361,6 @@ func (r *Ring) loop(ctx context.Context) error { return nil } -func (r *Ring) processKVUpdates(ctx context.Context, newVal *atomic.Pointer[Desc]) { - t := time.NewTicker(r.cfg.UpdateInterval) - defer t.Stop() - - for { - select { - case <-t.C: - if value := newVal.Swap(nil); value != nil { - r.updateRingState(value) - } - case <-ctx.Done(): - return - } - } -} - func (r *Ring) updateRingState(ringDesc *Desc) { r.mtx.RLock() prevRing := r.ringDesc From 76dd98d30b7c0f2940ce0dbc9dd0f711486d5747 Mon Sep 17 00:00:00 2001 From: David Grant Date: Thu, 3 Oct 2024 14:51:03 -0700 Subject: [PATCH 04/27] Rm newline. --- ring/ring.go | 1 - 1 file changed, 1 deletion(-) diff --git a/ring/ring.go b/ring/ring.go index 524068e41..615b81c3b 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -357,7 +357,6 @@ func (r *Ring) loop(ctx context.Context) error { newVal.Store(value.(*Desc)) return true }) - return nil } From f8aaa8ad0488b2fe6c45c843569f8e3a05e5c1f9 Mon Sep 17 00:00:00 2001 From: David Grant Date: Thu, 3 Oct 2024 15:20:28 -0700 Subject: [PATCH 05/27] Fix existing tests. --- ring/ring_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ring/ring_test.go b/ring/ring_test.go index 909c5d019..3b1b5607f 100644 --- a/ring/ring_test.go +++ b/ring/ring_test.go @@ -139,6 +139,7 @@ func benchmarkUpdateRingState(b *testing.B, numInstances, numTokens int, updateT cfg := Config{ KVStore: kv.Config{}, HeartbeatTimeout: 0, // get healthy stats + UpdateInterval: 250 * time.Millisecond, ReplicationFactor: 3, ZoneAwarenessEnabled: true, } @@ -3608,6 +3609,7 @@ func TestRingUpdates(t *testing.T) { cfg := Config{ KVStore: kv.Config{Mock: inmem}, HeartbeatTimeout: 1 * time.Minute, + UpdateInterval: 20 * time.Millisecond, ReplicationFactor: 3, ExcludedZones: flagext.StringSliceCSV(testData.excludedZones), } @@ -3716,6 +3718,7 @@ func TestRing_ShuffleShard_Caching(t *testing.T) { cfg := Config{ KVStore: kv.Config{Mock: inmem}, HeartbeatTimeout: 1 * time.Minute, + UpdateInterval: 20 * time.Millisecond, ReplicationFactor: 3, ZoneAwarenessEnabled: true, } From 6ae22515742814eb6b00cba3ae6d9a1827eb667f Mon Sep 17 00:00:00 2001 From: David Grant Date: Thu, 3 Oct 2024 15:25:09 -0700 Subject: [PATCH 06/27] Changelog --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e0ad1280a..5617d65de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,6 +80,7 @@ * [CHANGE] Changed `ShouldLog()` function signature in `middleware.OptionalLogging` interface to `ShouldLog(context.Context) (bool, string)`: the returned `string` contains an optional reason. When reason is valued, `GRPCServerLog` adds `()` suffix to the error. #514 * [CHANGE] Cache: Remove superfluous `cache.RemoteCacheClient` interface and unify all caches using the `cache.Cache` interface. #520 * [CHANGE] Updated the minimum required Go version to 1.21. #540 +* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538 * [CHANGE] memberlist: Metric `memberlist_client_messages_in_broadcast_queue` is now split into `queue="local"` and `queue="gossip"` values. #539 * [CHANGE] memberlist: Failure to fast-join a cluster via contacting a node is now logged at `info` instead of `debug`. #585 * [CHANGE] `Service.AddListener` and `Manager.AddListener` now return function for stopping the listener. #564 @@ -230,7 +231,7 @@ * [ENHANCEMENT] Runtimeconfig: support gzip-compressed files with `.gz` extension. #571 * [ENHANCEMENT] grpcclient: Support custom gRPC compressors. #583 * [ENHANCEMENT] Adapt `metrics.SendSumOfGaugesPerTenant` to use `metrics.MetricOption`. #584 -* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538 +* [ENHANCEMENT] ring: updates from the backing KVStore now only invalidate ring caches once per `-ring.update-interval`. * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 * [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109 From 7fd45847a57143dfc9f7deb3c0f1584d854448b7 Mon Sep 17 00:00:00 2001 From: David Grant Date: Thu, 3 Oct 2024 17:13:46 -0700 Subject: [PATCH 07/27] Add some tests around update debouncing. --- ring/ring.go | 28 +++++++++++++++++++--------- ring/ring_test.go | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 9 deletions(-) diff --git a/ring/ring.go b/ring/ring.go index 615b81c3b..20bf3c259 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -218,13 +218,13 @@ type Ring struct { // Number of registered instances per zone. instancesCountPerZone map[string]int - // Nubmber of registered instances with tokens per zone. + // Number of registered instances with tokens per zone. instancesWithTokensCountPerZone map[string]int // Number of registered instances are writable and have tokens. writableInstancesWithTokensCount int - // Nubmber of registered instances with tokens per zone that are writable. + // Number of registered instances with tokens per zone that are writable. writableInstancesWithTokensCountPerZone map[string]int // Cache of shuffle-sharded subrings per identifier. Invalidated when topology changes. @@ -232,6 +232,9 @@ type Ring struct { shuffledSubringCache map[subringCacheKey]*Ring shuffledSubringWithLookbackCache map[subringCacheKey]cachedSubringWithLookback[*Ring] + // The last observed update from the KV store. + watchKeyUpdate atomic.Pointer[Desc] + numMembersGaugeVec *prometheus.GaugeVec totalTokensGauge prometheus.Gauge oldestTimestampGaugeVec *prometheus.GaugeVec @@ -327,8 +330,6 @@ func (r *Ring) loop(ctx context.Context) error { r.updateRingMetrics() r.mtx.Unlock() - var newVal atomic.Pointer[Desc] - // Debounce WatchKey updates, as they can be frequent enough to cause lock // contention. The most recent update is the one we'll use when we // periodically update the ring. @@ -339,9 +340,7 @@ func (r *Ring) loop(ctx context.Context) error { for { select { case <-t.C: - if value := newVal.Swap(nil); value != nil { - r.updateRingState(value) - } + r.observeKeyUpdate() case <-ctx.Done(): return } @@ -353,13 +352,24 @@ func (r *Ring) loop(ctx context.Context) error { level.Info(r.logger).Log("msg", "ring doesn't exist in KV store yet") return true } - - newVal.Store(value.(*Desc)) + r.storeKeyUpdate(value.(*Desc)) return true }) return nil } +// storeKeyUpdate stores a new watch key update for later storage in the ring. +func (r *Ring) storeKeyUpdate(value *Desc) { + r.watchKeyUpdate.Store(value) +} + +// observeKeyUpdate is called periodically to update the ring state if a new watch key update is available. +func (r *Ring) observeKeyUpdate() { + if value := r.watchKeyUpdate.Swap(nil); value != nil { + r.updateRingState(value) + } +} + func (r *Ring) updateRingState(ringDesc *Desc) { r.mtx.RLock() prevRing := r.ringDesc diff --git a/ring/ring_test.go b/ring/ring_test.go index 3b1b5607f..1e2931cd0 100644 --- a/ring/ring_test.go +++ b/ring/ring_test.go @@ -3571,6 +3571,46 @@ func compareReplicationSets(first, second ReplicationSet) (added, removed []stri return } +// Verify that WatchKey updates make it to the ring, and the latest update is observed. +func TestRingWatchKeyUpdates(t *testing.T) { + cfg := Config{ReplicationFactor: 3} + ring, err := NewWithStoreClientAndStrategy(cfg, testRingName, testRingKey, nil, NewDefaultReplicationStrategy(), prometheus.NewRegistry(), log.NewNopLogger()) + require.NoError(t, err) + + desc1 := NewDesc() + desc1.Ingesters = map[string]InstanceDesc{ + "instance-1": generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{1}, time.Now()), + } + desc2 := NewDesc() + desc2.Ingesters = map[string]InstanceDesc{ + "instance-1": generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{1}, time.Now()), + "instance-2": generateRingInstanceWithInfo("instance-2", "zone-a", []uint32{2}, time.Now()), + } + + // processUpdate is a stand-in for the ring's timer goroutine that normally calls observeKeyUpdate. + processUpdate := func() *Desc { + ring.observeKeyUpdate() + ring.mtx.RLock() + defer ring.mtx.RUnlock() + return ring.ringDesc + } + + assert.Empty(t, processUpdate().Ingesters, "no desc initially") + assert.Empty(t, processUpdate().Ingesters, "can update multiple times without any key update stored") + + ring.storeKeyUpdate(desc1) + assert.Same(t, desc1, processUpdate()) + assert.Same(t, desc1, processUpdate(), "no change if no new update") + + ring.storeKeyUpdate(desc2) + assert.Same(t, desc2, processUpdate()) + assert.Same(t, desc2, processUpdate(), "no change if no new update") + + ring.storeKeyUpdate(desc1) + ring.storeKeyUpdate(desc2) + assert.Same(t, desc2, processUpdate(), "should observe last update") +} + // This test verifies that ring is getting updates, even after extending check in the loop method. func TestRingUpdates(t *testing.T) { const ( From acc31693d730d081a156203a4e353f34dfbbb334 Mon Sep 17 00:00:00 2001 From: David Grant Date: Thu, 3 Oct 2024 17:23:56 -0700 Subject: [PATCH 08/27] Adjust changelog. --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5617d65de..67cb88ca5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,7 @@ * [CHANGE] memberlist: Metric `memberlist_client_messages_in_broadcast_queue` is now split into `queue="local"` and `queue="gossip"` values. #539 * [CHANGE] memberlist: Failure to fast-join a cluster via contacting a node is now logged at `info` instead of `debug`. #585 * [CHANGE] `Service.AddListener` and `Manager.AddListener` now return function for stopping the listener. #564 +* [CHANGE] ring: KVStrore updates now processed at most once per `-ring.update-interval`. #592 * [FEATURE] Cache: Add support for configuring a Redis cache backend. #268 #271 #276 * [FEATURE] Add support for waiting on the rate limiter using the new `WaitN` method. #279 * [FEATURE] Add `log.BufferedLogger` type. #338 @@ -231,7 +232,6 @@ * [ENHANCEMENT] Runtimeconfig: support gzip-compressed files with `.gz` extension. #571 * [ENHANCEMENT] grpcclient: Support custom gRPC compressors. #583 * [ENHANCEMENT] Adapt `metrics.SendSumOfGaugesPerTenant` to use `metrics.MetricOption`. #584 -* [ENHANCEMENT] ring: updates from the backing KVStore now only invalidate ring caches once per `-ring.update-interval`. * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 * [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109 From 4aca6fc3749aa7f442525fe4b74cf1701ab2c3c2 Mon Sep 17 00:00:00 2001 From: David Grant Date: Fri, 4 Oct 2024 06:57:55 -0700 Subject: [PATCH 09/27] Update CHANGELOG.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Peter Štibraný --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 67cb88ca5..0b00333bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,7 +84,7 @@ * [CHANGE] memberlist: Metric `memberlist_client_messages_in_broadcast_queue` is now split into `queue="local"` and `queue="gossip"` values. #539 * [CHANGE] memberlist: Failure to fast-join a cluster via contacting a node is now logged at `info` instead of `debug`. #585 * [CHANGE] `Service.AddListener` and `Manager.AddListener` now return function for stopping the listener. #564 -* [CHANGE] ring: KVStrore updates now processed at most once per `-ring.update-interval`. #592 +* [CHANGE] ring: ring updates now processed at most once per `-ring.update-interval`. #592 * [FEATURE] Cache: Add support for configuring a Redis cache backend. #268 #271 #276 * [FEATURE] Add support for waiting on the rate limiter using the new `WaitN` method. #279 * [FEATURE] Add `log.BufferedLogger` type. #338 From f583a420f3e1e358b97aef11959570a02f949107 Mon Sep 17 00:00:00 2001 From: David Grant Date: Fri, 4 Oct 2024 08:43:49 -0700 Subject: [PATCH 10/27] Fix tests. --- servicediscovery/ring_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/servicediscovery/ring_test.go b/servicediscovery/ring_test.go index dfa8f90a6..12ebccc90 100644 --- a/servicediscovery/ring_test.go +++ b/servicediscovery/ring_test.go @@ -28,7 +28,11 @@ func TestRingServiceDiscovery_WithoutMaxUsedInstances(t *testing.T) { t.Cleanup(func() { _ = closer.Close() }) // Create a ring client. - ringCfg := ring.Config{HeartbeatTimeout: time.Minute, ReplicationFactor: 1} + ringCfg := ring.Config{ + HeartbeatTimeout: time.Minute, + ReplicationFactor: 1, + UpdateInterval: 20 * time.Millisecond, + } ringClient, err := ring.NewWithStoreClientAndStrategy(ringCfg, "test", ringKey, inmem, ring.NewDefaultReplicationStrategy(), nil, log.NewNopLogger()) require.NoError(t, err) @@ -130,7 +134,11 @@ func TestRingServiceDiscovery_WithMaxUsedInstances(t *testing.T) { t.Cleanup(func() { _ = closer.Close() }) // Create a ring client. - ringCfg := ring.Config{HeartbeatTimeout: time.Minute, ReplicationFactor: 1} + ringCfg := ring.Config{ + HeartbeatTimeout: time.Minute, + ReplicationFactor: 1, + UpdateInterval: 20 * time.Millisecond, + } ringClient, err := ring.NewWithStoreClientAndStrategy(ringCfg, "test", ringKey, inmem, ring.NewDefaultReplicationStrategy(), nil, log.NewNopLogger()) require.NoError(t, err) From f37aaf85a6f23266d95bc80910e80b91d0ae1cb1 Mon Sep 17 00:00:00 2001 From: David Grant Date: Fri, 4 Oct 2024 08:44:03 -0700 Subject: [PATCH 11/27] Validate cfg.UpdateInterval. --- ring/ring.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ring/ring.go b/ring/ring.go index 20bf3c259..cecb39aaa 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -275,6 +275,9 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client if cfg.ReplicationFactor <= 0 { return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor) } + if cfg.UpdateInterval <= 0 { + return nil, fmt.Errorf("UpdateInterval must be greater than zero: %d", cfg.UpdateInterval) + } r := &Ring{ key: key, From e1c121ac565494d1c22de67fe35c460d93578b2f Mon Sep 17 00:00:00 2001 From: David Grant Date: Fri, 4 Oct 2024 09:00:25 -0700 Subject: [PATCH 12/27] Don't mutate the config. --- ring/ring.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/ring/ring.go b/ring/ring.go index cecb39aaa..34d4e61eb 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -33,6 +33,8 @@ const ( // GetBufferSize is the suggested size of buffers passed to Ring.Get(). It's based on // a typical replication factor 3, plus extra room for a JOINING + LEAVING instance. GetBufferSize = 5 + + defaultUpdateInterval = 250 * time.Millisecond ) // ReadRing represents the read interface to the ring. @@ -164,7 +166,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.KVStore.RegisterFlagsWithPrefix(prefix, "collectors/", f) f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes. 0 = never (timeout disabled).") - f.DurationVar(&cfg.UpdateInterval, prefix+"ring.update-interval", 250*time.Millisecond, "How often to recompute ring state when a change is detected from the KVStore.") + f.DurationVar(&cfg.UpdateInterval, prefix+"ring.update-interval", defaultUpdateInterval, "How often to recompute ring state when a change is detected from the KVStore.") f.IntVar(&cfg.ReplicationFactor, prefix+"distributor.replication-factor", 3, "The number of ingesters to write to and read from.") f.BoolVar(&cfg.ZoneAwarenessEnabled, prefix+"distributor.zone-awareness-enabled", false, "True to enable the zone-awareness and replicate ingested samples across different availability zones.") f.Var(&cfg.ExcludedZones, prefix+"distributor.excluded-zones", "Comma-separated list of zones to exclude from the ring. Instances in excluded zones will be filtered out from the ring.") @@ -275,9 +277,6 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client if cfg.ReplicationFactor <= 0 { return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor) } - if cfg.UpdateInterval <= 0 { - return nil, fmt.Errorf("UpdateInterval must be greater than zero: %d", cfg.UpdateInterval) - } r := &Ring{ key: key, @@ -338,7 +337,11 @@ func (r *Ring) loop(ctx context.Context) error { // periodically update the ring. go func() { - t := time.NewTicker(r.cfg.UpdateInterval) + interval := r.cfg.UpdateInterval + if interval <= 0 { + interval = defaultUpdateInterval + } + t := time.NewTicker(interval) defer t.Stop() for { select { From 56702ddd110fbaa7092c75c6fe7bb71ecafb6bf7 Mon Sep 17 00:00:00 2001 From: David Grant Date: Tue, 8 Oct 2024 12:30:57 -0700 Subject: [PATCH 13/27] Move delay bits to updateObserver, default to noDelay. --- ring/ring.go | 46 ++++++--------------- ring/ring_test.go | 40 ------------------ ring/update_observer.go | 79 ++++++++++++++++++++++++++++++++++++ ring/update_observer_test.go | 63 ++++++++++++++++++++++++++++ 4 files changed, 155 insertions(+), 73 deletions(-) create mode 100644 ring/update_observer.go create mode 100644 ring/update_observer_test.go diff --git a/ring/ring.go b/ring/ring.go index 34d4e61eb..37639c724 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -332,50 +332,30 @@ func (r *Ring) loop(ctx context.Context) error { r.updateRingMetrics() r.mtx.Unlock() - // Debounce WatchKey updates, as they can be frequent enough to cause lock - // contention. The most recent update is the one we'll use when we - // periodically update the ring. - - go func() { - interval := r.cfg.UpdateInterval - if interval <= 0 { - interval = defaultUpdateInterval - } - t := time.NewTicker(interval) - defer t.Stop() - for { - select { - case <-t.C: - r.observeKeyUpdate() - case <-ctx.Done(): - return - } - } - }() + var observer updateObserver[Desc] + + if r.cfg.UpdateInterval > 0 { + // Debounce WatchKey updates, as they can be frequent enough to cause lock + // contention. The most recent update is the one we'll use when we + // periodically update the ring. + d := newDelayedObserver(r.cfg.UpdateInterval, r.updateRingState) + d.run(ctx) + observer = d + } else { + observer = newNoDelayObserver(r.updateRingState) + } r.KVClient.WatchKey(ctx, r.key, func(value interface{}) bool { if value == nil { level.Info(r.logger).Log("msg", "ring doesn't exist in KV store yet") return true } - r.storeKeyUpdate(value.(*Desc)) + observer.observeUpdate(value.(*Desc)) return true }) return nil } -// storeKeyUpdate stores a new watch key update for later storage in the ring. -func (r *Ring) storeKeyUpdate(value *Desc) { - r.watchKeyUpdate.Store(value) -} - -// observeKeyUpdate is called periodically to update the ring state if a new watch key update is available. -func (r *Ring) observeKeyUpdate() { - if value := r.watchKeyUpdate.Swap(nil); value != nil { - r.updateRingState(value) - } -} - func (r *Ring) updateRingState(ringDesc *Desc) { r.mtx.RLock() prevRing := r.ringDesc diff --git a/ring/ring_test.go b/ring/ring_test.go index 1e2931cd0..3b1b5607f 100644 --- a/ring/ring_test.go +++ b/ring/ring_test.go @@ -3571,46 +3571,6 @@ func compareReplicationSets(first, second ReplicationSet) (added, removed []stri return } -// Verify that WatchKey updates make it to the ring, and the latest update is observed. -func TestRingWatchKeyUpdates(t *testing.T) { - cfg := Config{ReplicationFactor: 3} - ring, err := NewWithStoreClientAndStrategy(cfg, testRingName, testRingKey, nil, NewDefaultReplicationStrategy(), prometheus.NewRegistry(), log.NewNopLogger()) - require.NoError(t, err) - - desc1 := NewDesc() - desc1.Ingesters = map[string]InstanceDesc{ - "instance-1": generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{1}, time.Now()), - } - desc2 := NewDesc() - desc2.Ingesters = map[string]InstanceDesc{ - "instance-1": generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{1}, time.Now()), - "instance-2": generateRingInstanceWithInfo("instance-2", "zone-a", []uint32{2}, time.Now()), - } - - // processUpdate is a stand-in for the ring's timer goroutine that normally calls observeKeyUpdate. - processUpdate := func() *Desc { - ring.observeKeyUpdate() - ring.mtx.RLock() - defer ring.mtx.RUnlock() - return ring.ringDesc - } - - assert.Empty(t, processUpdate().Ingesters, "no desc initially") - assert.Empty(t, processUpdate().Ingesters, "can update multiple times without any key update stored") - - ring.storeKeyUpdate(desc1) - assert.Same(t, desc1, processUpdate()) - assert.Same(t, desc1, processUpdate(), "no change if no new update") - - ring.storeKeyUpdate(desc2) - assert.Same(t, desc2, processUpdate()) - assert.Same(t, desc2, processUpdate(), "no change if no new update") - - ring.storeKeyUpdate(desc1) - ring.storeKeyUpdate(desc2) - assert.Same(t, desc2, processUpdate(), "should observe last update") -} - // This test verifies that ring is getting updates, even after extending check in the loop method. func TestRingUpdates(t *testing.T) { const ( diff --git a/ring/update_observer.go b/ring/update_observer.go new file mode 100644 index 000000000..3559fe22d --- /dev/null +++ b/ring/update_observer.go @@ -0,0 +1,79 @@ +package ring + +import ( + "context" + "time" + + "go.uber.org/atomic" +) + +type updateObserver[T any] interface { + observeUpdate(*T) +} + +// delayedObserver is an observer that waits for a certain interval before sending +// the update to the receiver. This is useful when the updates are frequent and +// we only care to observe the latest one. +type delayedObserver[T any] struct { + value atomic.Pointer[T] + receiver func(*T) + interval time.Duration +} + +func newDelayedObserver[T any](interval time.Duration, receiver func(*T)) *delayedObserver[T] { + if interval <= 0 { + panic("newDelayedObserver: interval must be greater than 0") + } + return &delayedObserver[T]{ + receiver: receiver, + interval: interval, + } +} + +// observeUpdate stores an updated value for a later flush. +func (w *delayedObserver[T]) observeUpdate(u *T) { + w.value.Store(u) +} + +func (w *delayedObserver[T]) run(ctx context.Context) { + go func() { + t := time.NewTicker(w.interval) + defer t.Stop() + for { + select { + case <-t.C: + w.flush() + case <-ctx.Done(): + return + } + } + }() +} + +// flush sends the update to the receiver if there is one. +func (w *delayedObserver[T]) flush() { + if v := w.value.Swap(nil); v != nil { + w.receiver(v) + } +} + +var _ updateObserver[int] = &delayedObserver[int]{} + +// noDelayObserver is an observer that synchronously sends the update to the +// receiver. +type noDelayObserver[T any] struct { + receiver func(*T) +} + +func newNoDelayObserver[T any](receiver func(*T)) *noDelayObserver[T] { + return &noDelayObserver[T]{ + receiver: receiver, + } +} + +// observeUpdate sends the update to the receiver immediately. +func (w *noDelayObserver[T]) observeUpdate(u *T) { + w.receiver(u) +} + +var _ updateObserver[int] = &noDelayObserver[int]{} diff --git a/ring/update_observer_test.go b/ring/update_observer_test.go new file mode 100644 index 000000000..2432fed9a --- /dev/null +++ b/ring/update_observer_test.go @@ -0,0 +1,63 @@ +package ring + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// Verify that updates trigger the receiver, and the latest update is observed. +func TestDelayedUpdates(t *testing.T) { + one := 1 + two := 2 + + var val *int = nil + storeVal := func(d *int) { + val = d + } + + o := newDelayedObserver(1*time.Millisecond, storeVal) + + assert.Nil(t, val, "no desc initially") + o.flush() + assert.Nil(t, val, "flush without update is a no-op") + o.flush() + assert.Nil(t, val, "multiple flushes without update is a no-op") + + o.observeUpdate(&one) + assert.Nil(t, val, "no flush immediately") + o.flush() + assert.Same(t, &one, val, "flush after update") + o.flush() + assert.Same(t, &one, val, "no change if no new update") + + o.observeUpdate(&two) + o.flush() + assert.Same(t, &two, val, "flush after update") + o.flush() + assert.Same(t, &two, val, "no change if no new update") + + o.observeUpdate(&one) + o.observeUpdate(&two) + o.flush() + assert.Same(t, &two, val, "should observe last update") +} + +func TestNoDelay(t *testing.T) { + one := 1 + two := 2 + + var val *int = nil + storeVal := func(d *int) { + val = d + } + o := newNoDelayObserver(storeVal) + assert.Nil(t, val) + + o.observeUpdate(&one) + assert.Same(t, &one, val) + + o.observeUpdate(&two) + assert.Same(t, &two, val) +} From 3f0abaf0bd844e40eaf9b325bc7d562bdc98b426 Mon Sep 17 00:00:00 2001 From: David Grant Date: Tue, 8 Oct 2024 12:46:23 -0700 Subject: [PATCH 14/27] Don't need a noDelayObserver. --- ring/ring.go | 12 +++--------- ring/update_observer.go | 19 ------------------- ring/update_observer_test.go | 20 +------------------- 3 files changed, 4 insertions(+), 47 deletions(-) diff --git a/ring/ring.go b/ring/ring.go index 37639c724..80c0d3f4d 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -18,7 +18,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "go.uber.org/atomic" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/internal/slices" @@ -234,9 +233,6 @@ type Ring struct { shuffledSubringCache map[subringCacheKey]*Ring shuffledSubringWithLookbackCache map[subringCacheKey]cachedSubringWithLookback[*Ring] - // The last observed update from the KV store. - watchKeyUpdate atomic.Pointer[Desc] - numMembersGaugeVec *prometheus.GaugeVec totalTokensGauge prometheus.Gauge oldestTimestampGaugeVec *prometheus.GaugeVec @@ -332,7 +328,7 @@ func (r *Ring) loop(ctx context.Context) error { r.updateRingMetrics() r.mtx.Unlock() - var observer updateObserver[Desc] + updateFunc := r.updateRingState if r.cfg.UpdateInterval > 0 { // Debounce WatchKey updates, as they can be frequent enough to cause lock @@ -340,9 +336,7 @@ func (r *Ring) loop(ctx context.Context) error { // periodically update the ring. d := newDelayedObserver(r.cfg.UpdateInterval, r.updateRingState) d.run(ctx) - observer = d - } else { - observer = newNoDelayObserver(r.updateRingState) + updateFunc = d.observeUpdate } r.KVClient.WatchKey(ctx, r.key, func(value interface{}) bool { @@ -350,7 +344,7 @@ func (r *Ring) loop(ctx context.Context) error { level.Info(r.logger).Log("msg", "ring doesn't exist in KV store yet") return true } - observer.observeUpdate(value.(*Desc)) + updateFunc(value.(*Desc)) return true }) return nil diff --git a/ring/update_observer.go b/ring/update_observer.go index 3559fe22d..dfda5c4d3 100644 --- a/ring/update_observer.go +++ b/ring/update_observer.go @@ -58,22 +58,3 @@ func (w *delayedObserver[T]) flush() { } var _ updateObserver[int] = &delayedObserver[int]{} - -// noDelayObserver is an observer that synchronously sends the update to the -// receiver. -type noDelayObserver[T any] struct { - receiver func(*T) -} - -func newNoDelayObserver[T any](receiver func(*T)) *noDelayObserver[T] { - return &noDelayObserver[T]{ - receiver: receiver, - } -} - -// observeUpdate sends the update to the receiver immediately. -func (w *noDelayObserver[T]) observeUpdate(u *T) { - w.receiver(u) -} - -var _ updateObserver[int] = &noDelayObserver[int]{} diff --git a/ring/update_observer_test.go b/ring/update_observer_test.go index 2432fed9a..82889c175 100644 --- a/ring/update_observer_test.go +++ b/ring/update_observer_test.go @@ -12,7 +12,7 @@ func TestDelayedUpdates(t *testing.T) { one := 1 two := 2 - var val *int = nil + var val *int storeVal := func(d *int) { val = d } @@ -43,21 +43,3 @@ func TestDelayedUpdates(t *testing.T) { o.flush() assert.Same(t, &two, val, "should observe last update") } - -func TestNoDelay(t *testing.T) { - one := 1 - two := 2 - - var val *int = nil - storeVal := func(d *int) { - val = d - } - o := newNoDelayObserver(storeVal) - assert.Nil(t, val) - - o.observeUpdate(&one) - assert.Same(t, &one, val) - - o.observeUpdate(&two) - assert.Same(t, &two, val) -} From 2840d1092c628e0ee89a226f997a38b0eb678462 Mon Sep 17 00:00:00 2001 From: David Grant Date: Tue, 8 Oct 2024 13:11:57 -0700 Subject: [PATCH 15/27] Don't need an interface. --- ring/update_observer.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/ring/update_observer.go b/ring/update_observer.go index dfda5c4d3..d8b8979fa 100644 --- a/ring/update_observer.go +++ b/ring/update_observer.go @@ -7,13 +7,9 @@ import ( "go.uber.org/atomic" ) -type updateObserver[T any] interface { - observeUpdate(*T) -} - -// delayedObserver is an observer that waits for a certain interval before sending -// the update to the receiver. This is useful when the updates are frequent and -// we only care to observe the latest one. +// delayedObserver waits for a certain interval before sending an update to the +// receiver. This is useful when the updates are frequent and we only care to +// observe the latest one. type delayedObserver[T any] struct { value atomic.Pointer[T] receiver func(*T) @@ -56,5 +52,3 @@ func (w *delayedObserver[T]) flush() { w.receiver(v) } } - -var _ updateObserver[int] = &delayedObserver[int]{} From 7f73429e5a2a2216a757d93c1a9e0108c9516054 Mon Sep 17 00:00:00 2001 From: David Grant Date: Tue, 8 Oct 2024 13:14:14 -0700 Subject: [PATCH 16/27] Rename observer files. --- ring/{update_observer.go => observer.go} | 0 ring/{update_observer_test.go => observer_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename ring/{update_observer.go => observer.go} (100%) rename ring/{update_observer_test.go => observer_test.go} (100%) diff --git a/ring/update_observer.go b/ring/observer.go similarity index 100% rename from ring/update_observer.go rename to ring/observer.go diff --git a/ring/update_observer_test.go b/ring/observer_test.go similarity index 100% rename from ring/update_observer_test.go rename to ring/observer_test.go From 7ddb44b8160f2a5ceae9b64ae1047b9cb54daa7f Mon Sep 17 00:00:00 2001 From: David Grant Date: Tue, 8 Oct 2024 14:16:39 -0700 Subject: [PATCH 17/27] observeUpdate -> put; add a basic test for the ticker loop. --- ring/observer.go | 4 ++-- ring/observer_test.go | 41 +++++++++++++++++++++++++++++++++++++---- ring/ring.go | 2 +- 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/ring/observer.go b/ring/observer.go index d8b8979fa..1d5277ab7 100644 --- a/ring/observer.go +++ b/ring/observer.go @@ -26,8 +26,8 @@ func newDelayedObserver[T any](interval time.Duration, receiver func(*T)) *delay } } -// observeUpdate stores an updated value for a later flush. -func (w *delayedObserver[T]) observeUpdate(u *T) { +// put stores an updated value for a later flush. +func (w *delayedObserver[T]) put(u *T) { w.value.Store(u) } diff --git a/ring/observer_test.go b/ring/observer_test.go index 82889c175..e2d2f41c9 100644 --- a/ring/observer_test.go +++ b/ring/observer_test.go @@ -1,6 +1,8 @@ package ring import ( + "context" + "sync" "testing" "time" @@ -25,21 +27,52 @@ func TestDelayedUpdates(t *testing.T) { o.flush() assert.Nil(t, val, "multiple flushes without update is a no-op") - o.observeUpdate(&one) + o.put(&one) assert.Nil(t, val, "no flush immediately") o.flush() assert.Same(t, &one, val, "flush after update") o.flush() assert.Same(t, &one, val, "no change if no new update") - o.observeUpdate(&two) + o.put(&two) o.flush() assert.Same(t, &two, val, "flush after update") o.flush() assert.Same(t, &two, val, "no change if no new update") - o.observeUpdate(&one) - o.observeUpdate(&two) + o.put(&one) + o.put(&two) o.flush() assert.Same(t, &two, val, "should observe last update") } + +func TestTickUpdates(t *testing.T) { + // This just exercises the ticker path. + one := 1 + two := 2 + + var mu sync.Mutex + var val *int + putVal := func(d *int) { + mu.Lock() + defer mu.Unlock() + val = d + } + getVal := func() *int { + mu.Lock() + defer mu.Unlock() + return val + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + o := newDelayedObserver(3*time.Millisecond, putVal) + o.run(ctx) + + o.put(&one) + assert.Eventually(t, func() bool { return getVal() == &one }, 100*time.Millisecond, 1*time.Millisecond) + + o.put(&two) + assert.Eventually(t, func() bool { return getVal() == &two }, 100*time.Millisecond, 1*time.Millisecond) +} diff --git a/ring/ring.go b/ring/ring.go index 80c0d3f4d..57f141883 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -336,7 +336,7 @@ func (r *Ring) loop(ctx context.Context) error { // periodically update the ring. d := newDelayedObserver(r.cfg.UpdateInterval, r.updateRingState) d.run(ctx) - updateFunc = d.observeUpdate + updateFunc = d.put } r.KVClient.WatchKey(ctx, r.key, func(value interface{}) bool { From 12a5324fe7e91fb95c36035dd0f2d87555debee1 Mon Sep 17 00:00:00 2001 From: David Grant Date: Tue, 8 Oct 2024 15:24:39 -0700 Subject: [PATCH 18/27] Default no delay. --- CHANGELOG.md | 2 +- ring/ring.go | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c7534c4b4..ac366bab6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -85,7 +85,6 @@ * [CHANGE] memberlist: Failure to fast-join a cluster via contacting a node is now logged at `info` instead of `debug`. #585 * [CHANGE] `Service.AddListener` and `Manager.AddListener` now return function for stopping the listener. #564 * [CHANGE] ring: Add `InstanceRingReader` interface to `ring` package. #597 -* [CHANGE] ring: ring updates now processed at most once per `-ring.update-interval`. #592 * [FEATURE] Cache: Add support for configuring a Redis cache backend. #268 #271 #276 * [FEATURE] Add support for waiting on the rate limiter using the new `WaitN` method. #279 * [FEATURE] Add `log.BufferedLogger` type. #338 @@ -234,6 +233,7 @@ * [ENHANCEMENT] grpcclient: Support custom gRPC compressors. #583 * [ENHANCEMENT] Adapt `metrics.SendSumOfGaugesPerTenant` to use `metrics.MetricOption`. #584 * [ENHANCEMENT] Cache: Add `.Add()` and `.Set()` methods to cache clients. #591 +* [ENHANCEMENT] ring: ring updates can now be processed once per interval specified by `-ring.update-interval`. #592 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 * [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109 diff --git a/ring/ring.go b/ring/ring.go index 57f141883..63830b75c 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -32,8 +32,6 @@ const ( // GetBufferSize is the suggested size of buffers passed to Ring.Get(). It's based on // a typical replication factor 3, plus extra room for a JOINING + LEAVING instance. GetBufferSize = 5 - - defaultUpdateInterval = 250 * time.Millisecond ) // ReadRing represents the read interface to the ring. @@ -165,7 +163,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.KVStore.RegisterFlagsWithPrefix(prefix, "collectors/", f) f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes. 0 = never (timeout disabled).") - f.DurationVar(&cfg.UpdateInterval, prefix+"ring.update-interval", defaultUpdateInterval, "How often to recompute ring state when a change is detected from the KVStore.") + f.DurationVar(&cfg.UpdateInterval, prefix+"ring.update-interval", 0, "How often to recompute ring state when a change is detected from the KVStore. 0 = no delay.") f.IntVar(&cfg.ReplicationFactor, prefix+"distributor.replication-factor", 3, "The number of ingesters to write to and read from.") f.BoolVar(&cfg.ZoneAwarenessEnabled, prefix+"distributor.zone-awareness-enabled", false, "True to enable the zone-awareness and replicate ingested samples across different availability zones.") f.Var(&cfg.ExcludedZones, prefix+"distributor.excluded-zones", "Comma-separated list of zones to exclude from the ring. Instances in excluded zones will be filtered out from the ring.") From b9ca62b6d555a150045c2ee44da5439534ad135f Mon Sep 17 00:00:00 2001 From: David Grant Date: Thu, 10 Oct 2024 21:01:52 -0700 Subject: [PATCH 19/27] Add debounce to memberlist notification code. --- kv/memberlist/memberlist_client.go | 82 +++++++++++++-- kv/memberlist/memberlist_client_test.go | 129 +++++++++++++++++++++++- 2 files changed, 199 insertions(+), 12 deletions(-) diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index 1d96363fe..4790ba833 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -137,6 +137,7 @@ type KVConfig struct { GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time" category:"advanced"` DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time" category:"advanced"` EnableCompression bool `yaml:"compression_enabled" category:"advanced"` + NotifyInterval time.Duration `yaml:"notify_interval" category:"advanced"` // ip:port to advertise other cluster members. Used for NAT traversal AdvertiseAddr string `yaml:"advertise_addr"` @@ -195,6 +196,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.DurationVar(&cfg.DeadNodeReclaimTime, prefix+"memberlist.dead-node-reclaim-time", mlDefaults.DeadNodeReclaimTime, "How soon can dead node's name be reclaimed with new address. 0 to disable.") f.IntVar(&cfg.MessageHistoryBufferBytes, prefix+"memberlist.message-history-buffer-bytes", 0, "How much space to use for keeping received and sent messages in memory for troubleshooting (two buffers). 0 to disable.") f.BoolVar(&cfg.EnableCompression, prefix+"memberlist.compression-enabled", mlDefaults.EnableCompression, "Enable message compression. This can be used to reduce bandwidth usage at the cost of slightly more CPU utilization.") + f.DurationVar(&cfg.NotifyInterval, prefix+"memberlist.notify-interval", 0, "How frequently to notify watchers when a key changes. Can reduce CPU activity in large memberlist deployments. 0 to notify without delay.") f.StringVar(&cfg.AdvertiseAddr, prefix+"memberlist.advertise-addr", mlDefaults.AdvertiseAddr, "Gossip address to advertise to other members in the cluster. Used for NAT traversal.") f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.") f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.") @@ -251,6 +253,10 @@ type KV struct { watchers map[string][]chan string prefixWatchers map[string][]chan string + // Delayed notifications for watchers + notifMu sync.Mutex + keyNotifications map[string]struct{} + // Buffers with sent and received messages. Used for troubleshooting only. // New messages are appended, old messages (based on configured size limit) removed from the front. messagesMu sync.Mutex @@ -359,17 +365,18 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace mlkv := &KV{ - cfg: cfg, - logger: logger, - registerer: registerer, - provider: dnsProvider, - store: make(map[string]ValueDesc), - codecs: make(map[string]codec.Codec), - watchers: make(map[string][]chan string), - prefixWatchers: make(map[string][]chan string), - workersChannels: make(map[string]chan valueUpdate), - shutdown: make(chan struct{}), - maxCasRetries: maxCasRetries, + cfg: cfg, + logger: logger, + registerer: registerer, + provider: dnsProvider, + store: make(map[string]ValueDesc), + codecs: make(map[string]codec.Codec), + watchers: make(map[string][]chan string), + keyNotifications: make(map[string]struct{}), + prefixWatchers: make(map[string][]chan string), + workersChannels: make(map[string]chan valueUpdate), + shutdown: make(chan struct{}), + maxCasRetries: maxCasRetries, } mlkv.createAndRegisterMetrics() @@ -486,6 +493,13 @@ func (m *KV) running(ctx context.Context) error { return errFailedToJoinCluster } + if m.cfg.NotifyInterval > 0 { + // Start delayed key notifications. + notifTicker := time.NewTicker(m.cfg.NotifyInterval) + defer notifTicker.Stop() + go m.sendKeyNotifications(ctx, notifTicker.C) + } + var tickerChan <-chan time.Time if m.cfg.RejoinInterval > 0 && len(m.cfg.JoinMembers) > 0 { t := time.NewTicker(m.cfg.RejoinInterval) @@ -905,7 +919,53 @@ func removeWatcherChannel(k string, w chan string, watchers map[string][]chan st } } +// notifyWatchers sends notification to all watchers of given key. If delay is +// enabled, it accumulates them for later sending. func (m *KV) notifyWatchers(key string) { + if m.cfg.NotifyInterval <= 0 { + m.notifyWatchersSync(key) + return + } + + m.notifMu.Lock() + defer m.notifMu.Unlock() + m.keyNotifications[key] = struct{}{} +} + +// sendKeyNotifications sends accumulated notifications to all watchers of +// respective keys when the given channel ticks. +func (m *KV) sendKeyNotifications(ctx context.Context, tickChan <-chan time.Time) { + if m.cfg.NotifyInterval <= 0 { + panic("sendNotifications called with NotifyInterval <= 0") + } + + newNotifs := func() map[string]struct{} { + m.notifMu.Lock() + defer m.notifMu.Unlock() + + if len(m.keyNotifications) == 0 { + return nil + } + newMap := make(map[string]struct{}) + notifs := m.keyNotifications + m.keyNotifications = newMap + return notifs + } + + for { + select { + case <-tickChan: + for key := range newNotifs() { + m.notifyWatchersSync(key) + } + case <-ctx.Done(): + return + } + } +} + +// notifyWatcherSync immediately sends notification to all watchers of given key. +func (m *KV) notifyWatchersSync(key string) { m.watchersMu.Lock() defer m.watchersMu.Unlock() diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index 4df4f0572..74b078058 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -255,7 +255,6 @@ func getLocalhostAddrs() []string { func TestBasicGetAndCas(t *testing.T) { c := dataCodec{} - name := "Ing 1" var cfg KVConfig flagext.DefaultValues(&cfg) cfg.TCPTransport = TCPTransportConfig{ @@ -278,6 +277,7 @@ func TestBasicGetAndCas(t *testing.T) { } // Create member in PENDING state, with some tokens + name := "Ing 1" err = cas(kv, key, updateFn(name)) require.NoError(t, err) @@ -1783,3 +1783,130 @@ func marshalState(t *testing.T, kvps ...*KeyValuePair) []byte { return buf.Bytes() } + +func TestNotificationDelay(t *testing.T) { + codec := dataCodec{} + + cfg := KVConfig{} + cfg.Codecs = append(cfg.Codecs, codec) + cfg.TCPTransport = TCPTransportConfig{ + BindAddrs: getLocalhostAddrs(), + } + // We're going to trigger sends manually, so effectively disable the automatic send interval. + const hundredYears = 100 * 365 * 24 * time.Hour + cfg.NotifyInterval = hundredYears + + kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), kv)) + }) + + cli, err := NewClient(kv, codec) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tick := make(chan time.Time) + go kv.sendKeyNotifications(ctx, tick) + + dbMu := sync.Mutex{} + db := make(map[string]*data) + keyCalls := make(map[string]int) + + setData := func(key string, d *data) { + dbMu.Lock() + defer dbMu.Unlock() + db[key] = d + keyCalls[key]++ + } + getData := func(key string) *data { + dbMu.Lock() + defer dbMu.Unlock() + return db[key] + } + callsForKey := func(key string) int { + dbMu.Lock() + defer dbMu.Unlock() + return keyCalls[key] + } + verifyVal := func(k, v string) bool { + d := getData(k) + if d == nil { + return false + } + _, ok := d.Members[v] + return ok + } + + watchData := func(key string) { + go cli.WatchKey(ctx, key, func(in any) bool { + setData(key, in.(*data)) + return true + }) + } + + watchData("foo_123") + watchData("foo_124") + + assert.Equal(t, 0, callsForKey("foo_123")) + assert.Equal(t, 0, callsForKey("foo_124")) + + err = cas(cli, "foo_123", updateFn("val1")) + require.NoError(t, err) + + tick <- time.Now() + + require.Eventually(t, func() bool { + return verifyVal("foo_123", "val1") + }, 1*time.Second, 5*time.Millisecond) + + assert.Equal(t, 1, callsForKey("foo_123")) + assert.Equal(t, 0, callsForKey("foo_124")) + + // Test coalescing of updates. + err = cas(cli, "foo_123", updateFn("val2")) + require.NoError(t, err) + err = cas(cli, "foo_123", updateFn("val3")) + require.NoError(t, err) + err = cas(cli, "foo_123", updateFn("val4")) + require.NoError(t, err) + + assert.Equal(t, 1, callsForKey("foo_123"), "no flush -> no callback") + assert.Equal(t, 0, callsForKey("foo_124"), "no flush -> no callback") + + tick <- time.Now() + + require.Eventually(t, func() bool { + return verifyVal("foo_123", "val4") + }, 1*time.Second, 5*time.Millisecond, "multiple updates should be coalesced into the last one") + + assert.Equal(t, 2, callsForKey("foo_123")) + assert.Equal(t, 0, callsForKey("foo_124")) + + err = cas(cli, "foo_123", updateFn("val100")) + require.NoError(t, err) + err = cas(cli, "foo_124", updateFn("val101")) + require.NoError(t, err) + + tick <- time.Now() + + require.Eventually(t, func() bool { + return verifyVal("foo_123", "val100") && verifyVal("foo_124", "val101") + }, 1*time.Second, 5*time.Millisecond) + + assert.Equal(t, 3, callsForKey("foo_123")) + assert.Equal(t, 1, callsForKey("foo_124")) + + require.NotPanics(t, func() { + tick <- time.Now() + tick <- time.Now() + tick <- time.Now() + }, "shouldn't panic or anything like that when ticked without updates") + + time.Sleep(100 * time.Millisecond) + assert.Equal(t, 3, callsForKey("foo_123")) + assert.Equal(t, 1, callsForKey("foo_124")) +} From a75be3a29867641cf03c832273ac048823dba41e Mon Sep 17 00:00:00 2001 From: David Grant Date: Thu, 10 Oct 2024 21:05:11 -0700 Subject: [PATCH 20/27] Rm debounce from ring. --- ring/observer.go | 54 ------------------------------ ring/observer_test.go | 78 ------------------------------------------- ring/ring.go | 16 ++------- ring/ring_test.go | 3 -- 4 files changed, 2 insertions(+), 149 deletions(-) delete mode 100644 ring/observer.go delete mode 100644 ring/observer_test.go diff --git a/ring/observer.go b/ring/observer.go deleted file mode 100644 index 1d5277ab7..000000000 --- a/ring/observer.go +++ /dev/null @@ -1,54 +0,0 @@ -package ring - -import ( - "context" - "time" - - "go.uber.org/atomic" -) - -// delayedObserver waits for a certain interval before sending an update to the -// receiver. This is useful when the updates are frequent and we only care to -// observe the latest one. -type delayedObserver[T any] struct { - value atomic.Pointer[T] - receiver func(*T) - interval time.Duration -} - -func newDelayedObserver[T any](interval time.Duration, receiver func(*T)) *delayedObserver[T] { - if interval <= 0 { - panic("newDelayedObserver: interval must be greater than 0") - } - return &delayedObserver[T]{ - receiver: receiver, - interval: interval, - } -} - -// put stores an updated value for a later flush. -func (w *delayedObserver[T]) put(u *T) { - w.value.Store(u) -} - -func (w *delayedObserver[T]) run(ctx context.Context) { - go func() { - t := time.NewTicker(w.interval) - defer t.Stop() - for { - select { - case <-t.C: - w.flush() - case <-ctx.Done(): - return - } - } - }() -} - -// flush sends the update to the receiver if there is one. -func (w *delayedObserver[T]) flush() { - if v := w.value.Swap(nil); v != nil { - w.receiver(v) - } -} diff --git a/ring/observer_test.go b/ring/observer_test.go deleted file mode 100644 index e2d2f41c9..000000000 --- a/ring/observer_test.go +++ /dev/null @@ -1,78 +0,0 @@ -package ring - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -// Verify that updates trigger the receiver, and the latest update is observed. -func TestDelayedUpdates(t *testing.T) { - one := 1 - two := 2 - - var val *int - storeVal := func(d *int) { - val = d - } - - o := newDelayedObserver(1*time.Millisecond, storeVal) - - assert.Nil(t, val, "no desc initially") - o.flush() - assert.Nil(t, val, "flush without update is a no-op") - o.flush() - assert.Nil(t, val, "multiple flushes without update is a no-op") - - o.put(&one) - assert.Nil(t, val, "no flush immediately") - o.flush() - assert.Same(t, &one, val, "flush after update") - o.flush() - assert.Same(t, &one, val, "no change if no new update") - - o.put(&two) - o.flush() - assert.Same(t, &two, val, "flush after update") - o.flush() - assert.Same(t, &two, val, "no change if no new update") - - o.put(&one) - o.put(&two) - o.flush() - assert.Same(t, &two, val, "should observe last update") -} - -func TestTickUpdates(t *testing.T) { - // This just exercises the ticker path. - one := 1 - two := 2 - - var mu sync.Mutex - var val *int - putVal := func(d *int) { - mu.Lock() - defer mu.Unlock() - val = d - } - getVal := func() *int { - mu.Lock() - defer mu.Unlock() - return val - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - o := newDelayedObserver(3*time.Millisecond, putVal) - o.run(ctx) - - o.put(&one) - assert.Eventually(t, func() bool { return getVal() == &one }, 100*time.Millisecond, 1*time.Millisecond) - - o.put(&two) - assert.Eventually(t, func() bool { return getVal() == &two }, 100*time.Millisecond, 1*time.Millisecond) -} diff --git a/ring/ring.go b/ring/ring.go index 63830b75c..d47eb8fe2 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -143,7 +143,6 @@ var ( type Config struct { KVStore kv.Config `yaml:"kvstore"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout" category:"advanced"` - UpdateInterval time.Duration `yaml:"update_interval" category:"advanced"` ReplicationFactor int `yaml:"replication_factor"` ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` ExcludedZones flagext.StringSliceCSV `yaml:"excluded_zones" category:"advanced"` @@ -163,7 +162,6 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.KVStore.RegisterFlagsWithPrefix(prefix, "collectors/", f) f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes. 0 = never (timeout disabled).") - f.DurationVar(&cfg.UpdateInterval, prefix+"ring.update-interval", 0, "How often to recompute ring state when a change is detected from the KVStore. 0 = no delay.") f.IntVar(&cfg.ReplicationFactor, prefix+"distributor.replication-factor", 3, "The number of ingesters to write to and read from.") f.BoolVar(&cfg.ZoneAwarenessEnabled, prefix+"distributor.zone-awareness-enabled", false, "True to enable the zone-awareness and replicate ingested samples across different availability zones.") f.Var(&cfg.ExcludedZones, prefix+"distributor.excluded-zones", "Comma-separated list of zones to exclude from the ring. Instances in excluded zones will be filtered out from the ring.") @@ -326,23 +324,13 @@ func (r *Ring) loop(ctx context.Context) error { r.updateRingMetrics() r.mtx.Unlock() - updateFunc := r.updateRingState - - if r.cfg.UpdateInterval > 0 { - // Debounce WatchKey updates, as they can be frequent enough to cause lock - // contention. The most recent update is the one we'll use when we - // periodically update the ring. - d := newDelayedObserver(r.cfg.UpdateInterval, r.updateRingState) - d.run(ctx) - updateFunc = d.put - } - r.KVClient.WatchKey(ctx, r.key, func(value interface{}) bool { if value == nil { level.Info(r.logger).Log("msg", "ring doesn't exist in KV store yet") return true } - updateFunc(value.(*Desc)) + + r.updateRingState(value.(*Desc)) return true }) return nil diff --git a/ring/ring_test.go b/ring/ring_test.go index 3b1b5607f..909c5d019 100644 --- a/ring/ring_test.go +++ b/ring/ring_test.go @@ -139,7 +139,6 @@ func benchmarkUpdateRingState(b *testing.B, numInstances, numTokens int, updateT cfg := Config{ KVStore: kv.Config{}, HeartbeatTimeout: 0, // get healthy stats - UpdateInterval: 250 * time.Millisecond, ReplicationFactor: 3, ZoneAwarenessEnabled: true, } @@ -3609,7 +3608,6 @@ func TestRingUpdates(t *testing.T) { cfg := Config{ KVStore: kv.Config{Mock: inmem}, HeartbeatTimeout: 1 * time.Minute, - UpdateInterval: 20 * time.Millisecond, ReplicationFactor: 3, ExcludedZones: flagext.StringSliceCSV(testData.excludedZones), } @@ -3718,7 +3716,6 @@ func TestRing_ShuffleShard_Caching(t *testing.T) { cfg := Config{ KVStore: kv.Config{Mock: inmem}, HeartbeatTimeout: 1 * time.Minute, - UpdateInterval: 20 * time.Millisecond, ReplicationFactor: 3, ZoneAwarenessEnabled: true, } From 4e145cfc6fb739b404340d24fad2cefafea4fb21 Mon Sep 17 00:00:00 2001 From: David Grant Date: Thu, 10 Oct 2024 21:06:31 -0700 Subject: [PATCH 21/27] undo servicediscovery changes --- servicediscovery/ring_test.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/servicediscovery/ring_test.go b/servicediscovery/ring_test.go index 12ebccc90..dfa8f90a6 100644 --- a/servicediscovery/ring_test.go +++ b/servicediscovery/ring_test.go @@ -28,11 +28,7 @@ func TestRingServiceDiscovery_WithoutMaxUsedInstances(t *testing.T) { t.Cleanup(func() { _ = closer.Close() }) // Create a ring client. - ringCfg := ring.Config{ - HeartbeatTimeout: time.Minute, - ReplicationFactor: 1, - UpdateInterval: 20 * time.Millisecond, - } + ringCfg := ring.Config{HeartbeatTimeout: time.Minute, ReplicationFactor: 1} ringClient, err := ring.NewWithStoreClientAndStrategy(ringCfg, "test", ringKey, inmem, ring.NewDefaultReplicationStrategy(), nil, log.NewNopLogger()) require.NoError(t, err) @@ -134,11 +130,7 @@ func TestRingServiceDiscovery_WithMaxUsedInstances(t *testing.T) { t.Cleanup(func() { _ = closer.Close() }) // Create a ring client. - ringCfg := ring.Config{ - HeartbeatTimeout: time.Minute, - ReplicationFactor: 1, - UpdateInterval: 20 * time.Millisecond, - } + ringCfg := ring.Config{HeartbeatTimeout: time.Minute, ReplicationFactor: 1} ringClient, err := ring.NewWithStoreClientAndStrategy(ringCfg, "test", ringKey, inmem, ring.NewDefaultReplicationStrategy(), nil, log.NewNopLogger()) require.NoError(t, err) From 082a1d032b3d21e89ee5e4d517542a30539887a4 Mon Sep 17 00:00:00 2001 From: David Grant Date: Thu, 10 Oct 2024 21:13:01 -0700 Subject: [PATCH 22/27] revise changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df2a7657d..f9cb11448 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -233,9 +233,9 @@ * [ENHANCEMENT] grpcclient: Support custom gRPC compressors. #583 * [ENHANCEMENT] Adapt `metrics.SendSumOfGaugesPerTenant` to use `metrics.MetricOption`. #584 * [ENHANCEMENT] Cache: Add `.Add()` and `.Set()` methods to cache clients. #591 -* [ENHANCEMENT] ring: ring updates can now be processed once per interval specified by `-ring.update-interval`. #592 * [ENHANCEMENT] Cache: Add `.Advance()` methods to mock cache clients for easier testing of TTLs. #601 * [ENHANCEMENT] Memberlist: Add concurrency to the transport's WriteTo method. #525 +* [ENHANCEMENT] Memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storms in large clusters. #592 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 * [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109 From 8e2a4a08e480deb51915ec551a79d8b655d19781 Mon Sep 17 00:00:00 2001 From: David Grant Date: Thu, 10 Oct 2024 21:58:27 -0700 Subject: [PATCH 23/27] Wait up to 3*casInterval like other tests. --- kv/memberlist/memberlist_client_test.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index 74b078058..2c999e21d 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -1786,7 +1786,6 @@ func marshalState(t *testing.T, kvps ...*KeyValuePair) []byte { func TestNotificationDelay(t *testing.T) { codec := dataCodec{} - cfg := KVConfig{} cfg.Codecs = append(cfg.Codecs, codec) cfg.TCPTransport = TCPTransportConfig{ @@ -1795,7 +1794,6 @@ func TestNotificationDelay(t *testing.T) { // We're going to trigger sends manually, so effectively disable the automatic send interval. const hundredYears = 100 * 365 * 24 * time.Hour cfg.NotifyInterval = hundredYears - kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) @@ -1806,12 +1804,18 @@ func TestNotificationDelay(t *testing.T) { cli, err := NewClient(kv, codec) require.NoError(t, err) + casInterval := 1 * time.Second + ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // Arrange to do our own ticking. tick := make(chan time.Time) go kv.sendKeyNotifications(ctx, tick) + // Mirror any WatchKey updates to our own map and verify that they + // eventually arrive. + dbMu := sync.Mutex{} db := make(map[string]*data) keyCalls := make(map[string]int) @@ -1861,7 +1865,7 @@ func TestNotificationDelay(t *testing.T) { require.Eventually(t, func() bool { return verifyVal("foo_123", "val1") - }, 1*time.Second, 5*time.Millisecond) + }, 3*casInterval, 25*time.Millisecond) assert.Equal(t, 1, callsForKey("foo_123")) assert.Equal(t, 0, callsForKey("foo_124")) @@ -1881,7 +1885,7 @@ func TestNotificationDelay(t *testing.T) { require.Eventually(t, func() bool { return verifyVal("foo_123", "val4") - }, 1*time.Second, 5*time.Millisecond, "multiple updates should be coalesced into the last one") + }, 3*casInterval, 25*time.Millisecond, "multiple updates should be coalesced into the last one") assert.Equal(t, 2, callsForKey("foo_123")) assert.Equal(t, 0, callsForKey("foo_124")) @@ -1895,7 +1899,7 @@ func TestNotificationDelay(t *testing.T) { require.Eventually(t, func() bool { return verifyVal("foo_123", "val100") && verifyVal("foo_124", "val101") - }, 1*time.Second, 5*time.Millisecond) + }, 3*casInterval, 25*time.Millisecond) assert.Equal(t, 3, callsForKey("foo_123")) assert.Equal(t, 1, callsForKey("foo_124")) From b5df789fc078add871b7d877002320f2d53db361 Mon Sep 17 00:00:00 2001 From: David Grant Date: Fri, 11 Oct 2024 11:47:07 -0700 Subject: [PATCH 24/27] Remove CAS/WatchKey layers from notification delay tests. --- kv/memberlist/memberlist_client.go | 29 ++-- kv/memberlist/memberlist_client_test.go | 179 ++++++++++-------------- 2 files changed, 94 insertions(+), 114 deletions(-) diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index 4790ba833..b45965ab8 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -497,7 +497,7 @@ func (m *KV) running(ctx context.Context) error { // Start delayed key notifications. notifTicker := time.NewTicker(m.cfg.NotifyInterval) defer notifTicker.Stop() - go m.sendKeyNotifications(ctx, notifTicker.C) + go m.monitorKeyNotifications(ctx, notifTicker.C) } var tickerChan <-chan time.Time @@ -932,14 +932,26 @@ func (m *KV) notifyWatchers(key string) { m.keyNotifications[key] = struct{}{} } -// sendKeyNotifications sends accumulated notifications to all watchers of +// monitorKeyNotifications sends accumulated notifications to all watchers of // respective keys when the given channel ticks. -func (m *KV) sendKeyNotifications(ctx context.Context, tickChan <-chan time.Time) { +func (m *KV) monitorKeyNotifications(ctx context.Context, tickChan <-chan time.Time) { if m.cfg.NotifyInterval <= 0 { panic("sendNotifications called with NotifyInterval <= 0") } + for { + select { + case <-tickChan: + m.sendKeyNotifications() + case <-ctx.Done(): + return + } + } +} + +func (m *KV) sendKeyNotifications() { newNotifs := func() map[string]struct{} { + // Grab and clear accumulated notifications. m.notifMu.Lock() defer m.notifMu.Unlock() @@ -952,15 +964,8 @@ func (m *KV) sendKeyNotifications(ctx context.Context, tickChan <-chan time.Time return notifs } - for { - select { - case <-tickChan: - for key := range newNotifs() { - m.notifyWatchersSync(key) - } - case <-ctx.Done(): - return - } + for key := range newNotifs() { + m.notifyWatchersSync(key) } } diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index 2c999e21d..17a0ebc64 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -1801,116 +1801,91 @@ func TestNotificationDelay(t *testing.T) { require.NoError(t, services.StopAndAwaitTerminated(context.Background(), kv)) }) - cli, err := NewClient(kv, codec) - require.NoError(t, err) - - casInterval := 1 * time.Second - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Arrange to do our own ticking. - tick := make(chan time.Time) - go kv.sendKeyNotifications(ctx, tick) + watchChan := make(chan string, 16) - // Mirror any WatchKey updates to our own map and verify that they - // eventually arrive. + // Add ourselves as a watcher. + kv.watchersMu.Lock() + kv.watchers["foo_123"] = append(kv.watchers["foo_123"], watchChan) + kv.watchers["foo_124"] = append(kv.watchers["foo_124"], watchChan) + kv.watchersMu.Unlock() - dbMu := sync.Mutex{} - db := make(map[string]*data) - keyCalls := make(map[string]int) + defer func() { + kv.watchersMu.Lock() + removeWatcherChannel("foo_123", watchChan, kv.watchers) + removeWatcherChannel("foo_124", watchChan, kv.watchers) + kv.watchersMu.Unlock() + }() - setData := func(key string, d *data) { - dbMu.Lock() - defer dbMu.Unlock() - db[key] = d - keyCalls[key]++ - } - getData := func(key string) *data { - dbMu.Lock() - defer dbMu.Unlock() - return db[key] - } - callsForKey := func(key string) int { - dbMu.Lock() - defer dbMu.Unlock() - return keyCalls[key] - } - verifyVal := func(k, v string) bool { - d := getData(k) - if d == nil { - return false + verifyNotifs := func(expected map[string]int, comment string) { + observed := make(map[string]int, len(expected)) + for kk := range expected { + observed[kk] = 0 + } + loop: + for { + select { + case k := <-watchChan: + observed[k]++ + default: + break loop + } } - _, ok := d.Members[v] - return ok + require.Equal(t, expected, observed, comment) } - watchData := func(key string) { - go cli.WatchKey(ctx, key, func(in any) bool { - setData(key, in.(*data)) - return true - }) + drainChan := func() { + for { + select { + case <-watchChan: + default: + return + } + } } - watchData("foo_123") - watchData("foo_124") - - assert.Equal(t, 0, callsForKey("foo_123")) - assert.Equal(t, 0, callsForKey("foo_124")) - - err = cas(cli, "foo_123", updateFn("val1")) - require.NoError(t, err) - - tick <- time.Now() - - require.Eventually(t, func() bool { - return verifyVal("foo_123", "val1") - }, 3*casInterval, 25*time.Millisecond) - - assert.Equal(t, 1, callsForKey("foo_123")) - assert.Equal(t, 0, callsForKey("foo_124")) + kv.notifyWatchers("foo_123") + kv.sendKeyNotifications() + verifyNotifs(map[string]int{"foo_123": 1}, "1 change 1 notification") // Test coalescing of updates. - err = cas(cli, "foo_123", updateFn("val2")) - require.NoError(t, err) - err = cas(cli, "foo_123", updateFn("val3")) - require.NoError(t, err) - err = cas(cli, "foo_123", updateFn("val4")) - require.NoError(t, err) - - assert.Equal(t, 1, callsForKey("foo_123"), "no flush -> no callback") - assert.Equal(t, 0, callsForKey("foo_124"), "no flush -> no callback") - - tick <- time.Now() - - require.Eventually(t, func() bool { - return verifyVal("foo_123", "val4") - }, 3*casInterval, 25*time.Millisecond, "multiple updates should be coalesced into the last one") - - assert.Equal(t, 2, callsForKey("foo_123")) - assert.Equal(t, 0, callsForKey("foo_124")) - - err = cas(cli, "foo_123", updateFn("val100")) - require.NoError(t, err) - err = cas(cli, "foo_124", updateFn("val101")) - require.NoError(t, err) - - tick <- time.Now() - - require.Eventually(t, func() bool { - return verifyVal("foo_123", "val100") && verifyVal("foo_124", "val101") - }, 3*casInterval, 25*time.Millisecond) - - assert.Equal(t, 3, callsForKey("foo_123")) - assert.Equal(t, 1, callsForKey("foo_124")) - - require.NotPanics(t, func() { - tick <- time.Now() - tick <- time.Now() - tick <- time.Now() - }, "shouldn't panic or anything like that when ticked without updates") - - time.Sleep(100 * time.Millisecond) - assert.Equal(t, 3, callsForKey("foo_123")) - assert.Equal(t, 1, callsForKey("foo_124")) + drainChan() + verifyNotifs(map[string]int{"foo_123": 0}, "chan drained") + kv.notifyWatchers("foo_123") + verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification") + kv.notifyWatchers("foo_123") + verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification") + kv.notifyWatchers("foo_123") + verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification") + kv.notifyWatchers("foo_123") + verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification") + kv.notifyWatchers("foo_123") + verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification") + kv.notifyWatchers("foo_123") + verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification") + kv.sendKeyNotifications() + verifyNotifs(map[string]int{"foo_123": 1}, "flush should coalesce updates") + + // multiple buffered updates + drainChan() + verifyNotifs(map[string]int{"foo_123": 0}, "chan drained") + kv.notifyWatchers("foo_123") + kv.sendKeyNotifications() + kv.notifyWatchers("foo_123") + kv.sendKeyNotifications() + verifyNotifs(map[string]int{"foo_123": 2}, "two buffered updates") + + // multiple keys + drainChan() + kv.notifyWatchers("foo_123") + kv.notifyWatchers("foo_124") + kv.sendKeyNotifications() + verifyNotifs(map[string]int{"foo_123": 1, "foo_124": 1}, "2 changes 2 notifications") + kv.sendKeyNotifications() + verifyNotifs(map[string]int{"foo_123": 0, "foo_124": 0}, "no new notifications") + + // and finally, sendKeyNotifications can be called repeatedly without new updates. + kv.sendKeyNotifications() + kv.sendKeyNotifications() + kv.sendKeyNotifications() + kv.sendKeyNotifications() } From 91d4db9f67345813748bd2888f57d56c728d2c19 Mon Sep 17 00:00:00 2001 From: David Grant Date: Fri, 11 Oct 2024 12:00:39 -0700 Subject: [PATCH 25/27] Exercise monitor method. Remove all unnecessary stuff from test. --- kv/memberlist/memberlist_client_test.go | 36 ++++++++++++++++--------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index 17a0ebc64..a2e97263e 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -1785,22 +1785,12 @@ func marshalState(t *testing.T, kvps ...*KeyValuePair) []byte { } func TestNotificationDelay(t *testing.T) { - codec := dataCodec{} cfg := KVConfig{} - cfg.Codecs = append(cfg.Codecs, codec) - cfg.TCPTransport = TCPTransportConfig{ - BindAddrs: getLocalhostAddrs(), - } // We're going to trigger sends manually, so effectively disable the automatic send interval. const hundredYears = 100 * 365 * 24 * time.Hour cfg.NotifyInterval = hundredYears kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) - t.Cleanup(func() { - require.NoError(t, services.StopAndAwaitTerminated(context.Background(), kv)) - }) - watchChan := make(chan string, 16) // Add ourselves as a watcher. @@ -1816,7 +1806,7 @@ func TestNotificationDelay(t *testing.T) { kv.watchersMu.Unlock() }() - verifyNotifs := func(expected map[string]int, comment string) { + verifyNotifs := func(expected map[string]int, comment string) bool { observed := make(map[string]int, len(expected)) for kk := range expected { observed[kk] = 0 @@ -1830,7 +1820,7 @@ func TestNotificationDelay(t *testing.T) { break loop } } - require.Equal(t, expected, observed, comment) + return assert.Equal(t, expected, observed, comment) } drainChan := func() { @@ -1883,9 +1873,29 @@ func TestNotificationDelay(t *testing.T) { kv.sendKeyNotifications() verifyNotifs(map[string]int{"foo_123": 0, "foo_124": 0}, "no new notifications") - // and finally, sendKeyNotifications can be called repeatedly without new updates. + // sendKeyNotifications can be called repeatedly without new updates. kv.sendKeyNotifications() kv.sendKeyNotifications() kv.sendKeyNotifications() kv.sendKeyNotifications() + + // Finally, exercise the monitor method. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tick := make(chan time.Time) + go kv.monitorKeyNotifications(ctx, tick) + kv.notifyWatchers("foo_123") + tick <- time.Now() + + require.Eventually(t, func() bool { + select { + case k := <-watchChan: + if k != "foo_123" { + panic(fmt.Sprintf("unexpected key: %s", k)) + } + return true + default: // nothing yet. + return false + } + }, 20*time.Second, 100*time.Millisecond) } From ad78368ef3834d0a54e0f8b4bb3f8d6750d49796 Mon Sep 17 00:00:00 2001 From: David Grant Date: Fri, 11 Oct 2024 12:03:19 -0700 Subject: [PATCH 26/27] Doc comment. --- kv/memberlist/memberlist_client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index b45965ab8..452798e04 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -949,6 +949,7 @@ func (m *KV) monitorKeyNotifications(ctx context.Context, tickChan <-chan time.T } } +// sendKeyNotifications sends accumulated notifications to watchers of respective keys. func (m *KV) sendKeyNotifications() { newNotifs := func() map[string]struct{} { // Grab and clear accumulated notifications. From 3cbd4627832e8d0678efb8902b6b862385d0de72 Mon Sep 17 00:00:00 2001 From: David Grant Date: Fri, 11 Oct 2024 12:21:40 -0700 Subject: [PATCH 27/27] didn't mean to commit that bit. --- kv/memberlist/memberlist_client_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index a2e97263e..0dc5ff083 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -1806,7 +1806,7 @@ func TestNotificationDelay(t *testing.T) { kv.watchersMu.Unlock() }() - verifyNotifs := func(expected map[string]int, comment string) bool { + verifyNotifs := func(expected map[string]int, comment string) { observed := make(map[string]int, len(expected)) for kk := range expected { observed[kk] = 0 @@ -1820,7 +1820,7 @@ func TestNotificationDelay(t *testing.T) { break loop } } - return assert.Equal(t, expected, observed, comment) + require.Equal(t, expected, observed, comment) } drainChan := func() {