From 49fbd0e9411ba6109d5c59b060970c0208370783 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Sat, 7 Dec 2024 11:54:19 -0500 Subject: [PATCH] kvserver,rac2: turn off raftReceiveQueue.maxLen enforcement in apply_to_all mode The existing maxLen enforcement is already dubious: - Length does not equal bytes, so offers limited protection from OOMs. - The limit is per replica and not an aggregate. - We run a cooperative system, and historically the sender has respected RaftConfig.RaftMaxInflightBytes, which is a byte limit. The only reason for additional protection on the receiver is when there are rapid repeated leader changes for a large number of ranges for which the receiver has replicas. Even in this case, the behavior is surprising since the receive queue overflows even though the sender has done nothing wrong -- and it is very unlikely that this overflow is actually protecting against an OOM. With RACv2 in apply_to_all mode, the senders have a 16MiB regular token pool that applies to a whole (store, tenant) pair. This is stricter than the per range defaultRaftMaxInflightBytes (32MiB), both in value, and because it is an aggregate limit. The aforementioned "only reason" is even more unnecessary. So we remove this in the case of apply_to_all. An alternative would be to have replicaSendStream respect the receiver limit in apply_to_all mode. The complexity there is that replicaSendStream grabs a bunch of tokens equal to the byte size of the send-queue, and expects to send all those messages. To respect a count limit, it will need to quickly return tokens it can't use (since they are a shared resource), which adds complexity to the already complex token management logic. Fixes #135851 Epic: none Release note: None --- .../rac2/wait_for_eval_config.go | 78 +++++++++++----- .../rac2/wait_for_eval_config_test.go | 28 ++++-- pkg/kv/kvserver/store.go | 12 +++ pkg/kv/kvserver/store_raft.go | 63 +++++++++++-- pkg/kv/kvserver/store_raft_test.go | 88 +++++++++++++++++++ 5 files changed, 235 insertions(+), 34 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/wait_for_eval_config.go b/pkg/kv/kvserver/kvflowcontrol/rac2/wait_for_eval_config.go index 5378b078f15e..4d0aaf92c206 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/wait_for_eval_config.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/wait_for_eval_config.go @@ -40,8 +40,15 @@ type WaitForEvalConfig struct { waitCategory WaitForEvalCategory waitCategoryDecreasedCh chan struct{} } + // watcherMu is ordered before mu. cbs are executed while holding watcherMu. + watcherMu struct { + syncutil.Mutex + cbs []WatcherCallback + } } +type WatcherCallback func(wc WaitForEvalCategory) + // NewWaitForEvalConfig constructs WaitForEvalConfig. func NewWaitForEvalConfig(st *cluster.Settings) *WaitForEvalConfig { w := &WaitForEvalConfig{st: st} @@ -59,32 +66,42 @@ func NewWaitForEvalConfig(st *cluster.Settings) *WaitForEvalConfig { // notifyChanged is called whenever any of the cluster settings that affect // WaitForEval change. It is also called for initialization. func (w *WaitForEvalConfig) notifyChanged() { - w.mu.Lock() - defer w.mu.Unlock() - // Call computeCategory while holding w.mu to serialize the computation in - // case of concurrent callbacks. This ensures the latest settings are used - // to set the current state, and we don't have a situation where a slow - // goroutine samples the settings, then after some arbitrary duration - // acquires the mutex and sets a stale state. - waitCategory := w.computeCategory() - if w.mu.waitCategoryDecreasedCh == nil { - // Initialization. - w.mu.waitCategoryDecreasedCh = make(chan struct{}) + func() { + w.mu.Lock() + defer w.mu.Unlock() + // Call computeCategory while holding w.mu to serialize the computation in + // case of concurrent callbacks. This ensures the latest settings are used + // to set the current state, and we don't have a situation where a slow + // goroutine samples the settings, then after some arbitrary duration + // acquires the mutex and sets a stale state. + waitCategory := w.computeCategory() + if w.mu.waitCategoryDecreasedCh == nil { + // Initialization. + w.mu.waitCategoryDecreasedCh = make(chan struct{}) + w.mu.waitCategory = waitCategory + return + } + // Not initialization. + if w.mu.waitCategory > waitCategory { + close(w.mu.waitCategoryDecreasedCh) + w.mu.waitCategoryDecreasedCh = make(chan struct{}) + } + // Else w.mu.waitCategory <= waitCategory. Since the set of requests that + // are subject to replication admission/flow control is growing (or staying + // the same), we don't need to tell the existing waiting requests to restart + // their wait, using the latest value of waitCategory, since they are + // unaffected by the change. + w.mu.waitCategory = waitCategory - return - } - // Not initialization. - if w.mu.waitCategory > waitCategory { - close(w.mu.waitCategoryDecreasedCh) - w.mu.waitCategoryDecreasedCh = make(chan struct{}) + }() + w.watcherMu.Lock() + defer w.watcherMu.Unlock() + w.mu.RLock() + wc := w.mu.waitCategory + w.mu.RUnlock() + for _, cb := range w.watcherMu.cbs { + cb(wc) } - // Else w.mu.waitCategory <= waitCategory. Since the set of requests that - // are subject to replication admission/flow control is growing (or staying - // the same), we don't need to tell the existing waiting requests to restart - // their wait, using the latest value of waitCategory, since they are - // unaffected by the change. - - w.mu.waitCategory = waitCategory } // Current returns the current category, and a channel that will be closed if @@ -110,3 +127,16 @@ func (w *WaitForEvalConfig) computeCategory() WaitForEvalCategory { } panic(errors.AssertionFailedf("unknown mode %v", mode)) } + +// RegisterWatcher registers a callback that provides the latest state of +// WaitForEvalCategory. The first call happens within this method, before +// returning. +func (w *WaitForEvalConfig) RegisterWatcher(cb WatcherCallback) { + w.watcherMu.Lock() + defer w.watcherMu.Unlock() + w.mu.RLock() + wc := w.mu.waitCategory + w.mu.RUnlock() + cb(wc) + w.watcherMu.cbs = append(w.watcherMu.cbs, cb) +} diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/wait_for_eval_config_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/wait_for_eval_config_test.go index b5d266fd8791..225dc7707894 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/wait_for_eval_config_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/wait_for_eval_config_test.go @@ -26,47 +26,65 @@ func TestWaitForEvalConfig(t *testing.T) { kvflowcontrol.Enabled.Override(ctx, &st.SV, true) kvflowcontrol.Mode.Override(ctx, &st.SV, kvflowcontrol.ApplyToAll) + var expectedWC WaitForEvalCategory + // All work waits for eval. w := NewWaitForEvalConfig(st) + expectedWC = AllWorkWaitsForEval wec, ch1 := w.Current() - require.Equal(t, AllWorkWaitsForEval, wec) + require.Equal(t, expectedWC, wec) require.NotNil(t, ch1) require.False(t, wec.Bypass(admissionpb.ElasticWorkClass)) require.False(t, wec.Bypass(admissionpb.RegularWorkClass)) + cbCount := 0 + cb := func(wc WaitForEvalCategory) { + cbCount++ + require.Equal(t, expectedWC, wc) + } + w.RegisterWatcher(cb) + require.Equal(t, 1, cbCount) // No work waits for eval. + expectedWC = NoWorkWaitsForEval kvflowcontrol.Enabled.Override(ctx, &st.SV, false) var ch2 <-chan struct{} wec, ch2 = w.Current() - require.Equal(t, NoWorkWaitsForEval, wec) + require.Equal(t, expectedWC, wec) require.NotNil(t, ch2) require.NotEqual(t, ch1, ch2) require.True(t, wec.Bypass(admissionpb.ElasticWorkClass)) require.True(t, wec.Bypass(admissionpb.RegularWorkClass)) + require.Equal(t, 2, cbCount) // All work waits for eval. + expectedWC = AllWorkWaitsForEval kvflowcontrol.Enabled.Override(ctx, &st.SV, true) var ch3 <-chan struct{} wec, ch3 = w.Current() - require.Equal(t, AllWorkWaitsForEval, wec) + require.Equal(t, expectedWC, wec) // Channel has not changed. require.Equal(t, ch2, ch3) + require.Equal(t, 3, cbCount) // Elastic work waits for eval. + expectedWC = OnlyElasticWorkWaitsForEval kvflowcontrol.Mode.Override(ctx, &st.SV, kvflowcontrol.ApplyToElastic) var ch4 <-chan struct{} wec, ch4 = w.Current() - require.Equal(t, OnlyElasticWorkWaitsForEval, wec) + require.Equal(t, expectedWC, wec) require.NotNil(t, ch4) require.NotEqual(t, ch3, ch4) require.False(t, wec.Bypass(admissionpb.ElasticWorkClass)) require.True(t, wec.Bypass(admissionpb.RegularWorkClass)) + require.Equal(t, 4, cbCount) // All work waits for eval. + expectedWC = AllWorkWaitsForEval kvflowcontrol.Mode.Override(ctx, &st.SV, kvflowcontrol.ApplyToAll) var ch5 <-chan struct{} wec, ch5 = w.Current() - require.Equal(t, AllWorkWaitsForEval, wec) + require.Equal(t, expectedWC, wec) // Channel has not changed. require.Equal(t, ch4, ch5) + require.Equal(t, 5, cbCount) } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 8394e72c2105..3b4863ce79c0 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -372,6 +372,7 @@ func testStoreConfig(clock *hlc.Clock, version roachpb.Version) StoreConfig { ), KVFlowAdmittedPiggybacker: node_rac2.NewAdmittedPiggybacker(), KVFlowStreamTokenProvider: rac2.NewStreamTokenCounterProvider(st, clock), + KVFlowWaitForEvalConfig: rac2.NewWaitForEvalConfig(st), KVFlowEvalWaitMetrics: rac2.NewEvalWaitMetrics(), KVFlowRangeControllerMetrics: rac2.NewRangeControllerMetrics(), } @@ -1550,6 +1551,17 @@ func NewStore( CurCount: s.metrics.RaftRcvdQueuedBytes, Settings: cfg.Settings, }) + s.cfg.KVFlowWaitForEvalConfig.RegisterWatcher(func(wc rac2.WaitForEvalCategory) { + // When the system is configured with rac2.AllWorkWaitsForEval, RACv2 is + // running in a mode where all senders are using send token pools for all + // messages to pace sending to a receiving store. These send token pools + // are stricter than per range limits. Additionally, these are byte sized + // pools, which is a better way to protect the receiver than a count + // limit. Hence, we turn off maxLen enforcement in that case, since it is + // unnecessary extra protection, and to respect this on the sender side + // (in RACv2 code) would result in unnecessary code complexity. + s.raftRecvQueues.SetEnforceMaxLen(wc != rac2.AllWorkWaitsForEval) + }) s.cfg.RangeLogWriter = newWrappedRangeLogWriter( s.metrics.getCounterForRangeLogEventType, diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 79977d9d2846..dc1959517f9b 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -8,6 +8,7 @@ package kvserver import ( "context" "math" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -43,7 +44,8 @@ type raftReceiveQueue struct { mu struct { // not to be locked directly destroyed bool syncutil.Mutex - infos []raftRequestInfo + infos []raftRequestInfo + enforceMaxLen bool } maxLen int acc mon.BoundAccount @@ -106,7 +108,7 @@ func (q *raftReceiveQueue) Append( size = int64(req.Size()) q.mu.Lock() defer q.mu.Unlock() - if q.mu.destroyed || len(q.mu.infos) >= q.maxLen { + if q.mu.destroyed || (q.mu.enforceMaxLen && len(q.mu.infos) >= q.maxLen) { return false, size, false } if q.acc.Grow(context.Background(), size) != nil { @@ -122,9 +124,22 @@ func (q *raftReceiveQueue) Append( return len(q.mu.infos) == 1, size, true } +func (q *raftReceiveQueue) SetEnforceMaxLen(enforceMaxLen bool) { + q.mu.Lock() + defer q.mu.Unlock() + q.mu.enforceMaxLen = enforceMaxLen +} + +func (q *raftReceiveQueue) getEnforceMaxLenForTesting() bool { + q.mu.Lock() + defer q.mu.Unlock() + return q.mu.enforceMaxLen +} + type raftReceiveQueues struct { - mon *mon.BytesMonitor - m syncutil.Map[roachpb.RangeID, raftReceiveQueue] + mon *mon.BytesMonitor + m syncutil.Map[roachpb.RangeID, raftReceiveQueue] + enforceMaxLen atomic.Bool } func (qs *raftReceiveQueues) Load(rangeID roachpb.RangeID) (*raftReceiveQueue, bool) { @@ -139,7 +154,33 @@ func (qs *raftReceiveQueues) LoadOrCreate( } q := &raftReceiveQueue{maxLen: maxLen} q.acc.Init(context.Background(), qs.mon) - return qs.m.LoadOrStore(rangeID, q) + q, loaded = qs.m.LoadOrStore(rangeID, q) + if !loaded { + // The sampling of enforceMaxLen can be concurrent with a call to + // SetEnforceMaxLen. We can sample a stale value, then SetEnforceMaxLen + // can fully execute, and then set the stale value here. Since + // qs.SetEnforceMaxLen sets the latest value first, before iterating over + // the map, it suffices to check after setting the value here that it has + // not changed. There are two cases: + // + // - Has changed: it is possible that our call to q.SetEnforceMaxLen + // occurred after the corresponding call in qs.SetEnforceMaxLen, so we + // have to loop back and correct it. + // + // - Has not changed: there may be a concurrent call to + // qs.SetEnforceMaxLen with a different bool parameter, but it has not + // yet updated qs.enforceMaxLen. Which is fine -- that call will iterate + // over the map and do what is necessary. + for { + enforceBefore := qs.enforceMaxLen.Load() + q.SetEnforceMaxLen(enforceBefore) + enforceAfter := qs.enforceMaxLen.Load() + if enforceAfter == enforceBefore { + break + } + } + } + return q, loaded } // Delete drains the queue and marks it as deleted. Future Appends @@ -151,6 +192,18 @@ func (qs *raftReceiveQueues) Delete(rangeID roachpb.RangeID) { } } +// SetEnforceMaxLen specifies the latest state of whether maxLen needs to be +// enforced or not. Calls to this method are serialized by the caller. +func (qs *raftReceiveQueues) SetEnforceMaxLen(enforceMaxLen bool) { + // Store the latest value first. A concurrent creation of raftReceiveQueue + // can race with this method -- see how this is handled in LoadOrCreate. + qs.enforceMaxLen.Store(enforceMaxLen) + qs.m.Range(func(_ roachpb.RangeID, q *raftReceiveQueue) bool { + q.SetEnforceMaxLen(enforceMaxLen) + return true + }) +} + // HandleDelegatedSnapshot reads the incoming delegated snapshot message and // throttles sending snapshots before passing the request to the sender replica. func (s *Store) HandleDelegatedSnapshot( diff --git a/pkg/kv/kvserver/store_raft_test.go b/pkg/kv/kvserver/store_raft_test.go index e7eb6c21b072..8e147627f232 100644 --- a/pkg/kv/kvserver/store_raft_test.go +++ b/pkg/kv/kvserver/store_raft_test.go @@ -9,7 +9,9 @@ package kvserver import ( "context" "fmt" + "sync" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -19,12 +21,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" @@ -221,3 +226,86 @@ func TestRaftCrossLocalityMetrics(t *testing.T) { }) } } + +func TestRaftReceiveQueuesEnforceMaxLenConcurrency(t *testing.T) { + defer leaktest.AfterTest(t)() + + skip.UnderStress(t, "slow test") + skip.UnderDuress(t, "slow test") + st := cluster.MakeTestingClusterSettings() + g := metric.NewGauge(metric.Metadata{}) + m := mon.NewUnlimitedMonitor(context.Background(), mon.Options{ + Name: mon.MakeMonitorName("test"), + CurCount: g, + Settings: st, + }) + qs := raftReceiveQueues{mon: m} + // checkingMu is locked in write mode when checking that the values of + // enforceMaxLen across all the queues is the expected value. + var checkingMu syncutil.RWMutex + // doneCh is used to tell the goroutines to stop, and wg is used to wait + // until they are done. + doneCh := make(chan struct{}) + var wg sync.WaitGroup + // Spin up goroutines to create queues. + for i := 0; i < 20; i++ { + wg.Add(1) + go func() { + rng, _ := randutil.NewTestRand() + defer wg.Done() + for { + checkingMu.RLock() + // Most queues will be newly created due to lack of collision in the + // random numbers. Newly created queues have their enforceMaxLen set + // in LoadOrCreate. + qs.LoadOrCreate(roachpb.RangeID(rng.Int63()), 10) + checkingMu.RUnlock() + select { + case <-doneCh: + return + default: + } + } + }() + } + // Iterate and set different values of enforceMaxLen. + enforceMaxLen := false + for i := 0; i < 200; i++ { + // NB: SetEnforceMaxLen runs concurrently with LoadOrCreate calls. + qs.SetEnforceMaxLen(enforceMaxLen) + // Exclude all LoadOrCreate calls while checking is happening. + checkingMu.Lock() + qs.m.Range(func(_ roachpb.RangeID, q *raftReceiveQueue) bool { + require.Equal(t, enforceMaxLen, q.getEnforceMaxLenForTesting()) + return true + }) + checkingMu.Unlock() + enforceMaxLen = !enforceMaxLen + time.Sleep(time.Millisecond) + } + close(doneCh) + wg.Wait() +} + +func TestRaftReceiveQueuesEnforceMaxLen(t *testing.T) { + defer leaktest.AfterTest(t)() + + st := cluster.MakeTestingClusterSettings() + g := metric.NewGauge(metric.Metadata{}) + m := mon.NewUnlimitedMonitor(context.Background(), mon.Options{ + Name: mon.MakeMonitorName("test"), + CurCount: g, + Settings: st, + }) + qs := raftReceiveQueues{mon: m} + qs.SetEnforceMaxLen(false) + q, _ := qs.LoadOrCreate(1, 2) + var s RaftMessageResponseStream + for i := 0; i < 10; i++ { + _, _, appended := q.Append(&kvserverpb.RaftMessageRequest{}, s) + require.True(t, appended) + } + qs.SetEnforceMaxLen(true) + _, _, appended := q.Append(&kvserverpb.RaftMessageRequest{}, s) + require.False(t, appended) +}