Skip to content

Commit

Permalink
kvserver,rac2: turn off raftReceiveQueue.maxLen enforcement in apply_…
Browse files Browse the repository at this point in the history
…to_all mode

The existing maxLen enforcement is already dubious:
- Length does not equal bytes, so offers limited protection from OOMs.
- The limit is per replica and not an aggregate.
- We run a cooperative system, and historically the sender has respected
  RaftConfig.RaftMaxInflightBytes, which is a byte limit. The only reason
  for additional protection on the receiver is when there are rapid repeated
  leader changes for a large number of ranges for which the receiver has
  replicas. Even in this case, the behavior is surprising since the receive
  queue overflows even though the sender has done nothing wrong -- and it
  is very unlikely that this overflow is actually protecting against an
  OOM.

With RACv2 in apply_to_all mode, the senders have a 16MiB regular token
pool that applies to a whole (store, tenant) pair. This is stricter than
the per range defaultRaftMaxInflightBytes (32MiB), both in value, and
because it is an aggregate limit. The aforementioned "only reason" is even
more unnecessary. So we remove this in the case of apply_to_all.

An alternative would be to have replicaSendStream respect the receiver
limit in apply_to_all mode. The complexity there is that replicaSendStream
grabs a bunch of tokens equal to the byte size of the send-queue, and
expects to send all those messages. To respect a count limit, it will
need to quickly return tokens it can't use (since they are a shared
resource), which adds complexity to the already complex token management
logic.

Fixes #135851

Epic: none

Release note: None
  • Loading branch information
sumeerbhola committed Dec 11, 2024
1 parent 9f28bc0 commit 49fbd0e
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 34 deletions.
78 changes: 54 additions & 24 deletions pkg/kv/kvserver/kvflowcontrol/rac2/wait_for_eval_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,15 @@ type WaitForEvalConfig struct {
waitCategory WaitForEvalCategory
waitCategoryDecreasedCh chan struct{}
}
// watcherMu is ordered before mu. cbs are executed while holding watcherMu.
watcherMu struct {
syncutil.Mutex
cbs []WatcherCallback
}
}

type WatcherCallback func(wc WaitForEvalCategory)

// NewWaitForEvalConfig constructs WaitForEvalConfig.
func NewWaitForEvalConfig(st *cluster.Settings) *WaitForEvalConfig {
w := &WaitForEvalConfig{st: st}
Expand All @@ -59,32 +66,42 @@ func NewWaitForEvalConfig(st *cluster.Settings) *WaitForEvalConfig {
// notifyChanged is called whenever any of the cluster settings that affect
// WaitForEval change. It is also called for initialization.
func (w *WaitForEvalConfig) notifyChanged() {
w.mu.Lock()
defer w.mu.Unlock()
// Call computeCategory while holding w.mu to serialize the computation in
// case of concurrent callbacks. This ensures the latest settings are used
// to set the current state, and we don't have a situation where a slow
// goroutine samples the settings, then after some arbitrary duration
// acquires the mutex and sets a stale state.
waitCategory := w.computeCategory()
if w.mu.waitCategoryDecreasedCh == nil {
// Initialization.
w.mu.waitCategoryDecreasedCh = make(chan struct{})
func() {
w.mu.Lock()
defer w.mu.Unlock()
// Call computeCategory while holding w.mu to serialize the computation in
// case of concurrent callbacks. This ensures the latest settings are used
// to set the current state, and we don't have a situation where a slow
// goroutine samples the settings, then after some arbitrary duration
// acquires the mutex and sets a stale state.
waitCategory := w.computeCategory()
if w.mu.waitCategoryDecreasedCh == nil {
// Initialization.
w.mu.waitCategoryDecreasedCh = make(chan struct{})
w.mu.waitCategory = waitCategory
return
}
// Not initialization.
if w.mu.waitCategory > waitCategory {
close(w.mu.waitCategoryDecreasedCh)
w.mu.waitCategoryDecreasedCh = make(chan struct{})
}
// Else w.mu.waitCategory <= waitCategory. Since the set of requests that
// are subject to replication admission/flow control is growing (or staying
// the same), we don't need to tell the existing waiting requests to restart
// their wait, using the latest value of waitCategory, since they are
// unaffected by the change.

w.mu.waitCategory = waitCategory
return
}
// Not initialization.
if w.mu.waitCategory > waitCategory {
close(w.mu.waitCategoryDecreasedCh)
w.mu.waitCategoryDecreasedCh = make(chan struct{})
}()
w.watcherMu.Lock()
defer w.watcherMu.Unlock()
w.mu.RLock()
wc := w.mu.waitCategory
w.mu.RUnlock()
for _, cb := range w.watcherMu.cbs {
cb(wc)
}
// Else w.mu.waitCategory <= waitCategory. Since the set of requests that
// are subject to replication admission/flow control is growing (or staying
// the same), we don't need to tell the existing waiting requests to restart
// their wait, using the latest value of waitCategory, since they are
// unaffected by the change.

w.mu.waitCategory = waitCategory
}

// Current returns the current category, and a channel that will be closed if
Expand All @@ -110,3 +127,16 @@ func (w *WaitForEvalConfig) computeCategory() WaitForEvalCategory {
}
panic(errors.AssertionFailedf("unknown mode %v", mode))
}

// RegisterWatcher registers a callback that provides the latest state of
// WaitForEvalCategory. The first call happens within this method, before
// returning.
func (w *WaitForEvalConfig) RegisterWatcher(cb WatcherCallback) {
w.watcherMu.Lock()
defer w.watcherMu.Unlock()
w.mu.RLock()
wc := w.mu.waitCategory
w.mu.RUnlock()
cb(wc)
w.watcherMu.cbs = append(w.watcherMu.cbs, cb)
}
28 changes: 23 additions & 5 deletions pkg/kv/kvserver/kvflowcontrol/rac2/wait_for_eval_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,47 +26,65 @@ func TestWaitForEvalConfig(t *testing.T) {
kvflowcontrol.Enabled.Override(ctx, &st.SV, true)
kvflowcontrol.Mode.Override(ctx, &st.SV, kvflowcontrol.ApplyToAll)

var expectedWC WaitForEvalCategory

// All work waits for eval.
w := NewWaitForEvalConfig(st)
expectedWC = AllWorkWaitsForEval
wec, ch1 := w.Current()
require.Equal(t, AllWorkWaitsForEval, wec)
require.Equal(t, expectedWC, wec)
require.NotNil(t, ch1)
require.False(t, wec.Bypass(admissionpb.ElasticWorkClass))
require.False(t, wec.Bypass(admissionpb.RegularWorkClass))
cbCount := 0
cb := func(wc WaitForEvalCategory) {
cbCount++
require.Equal(t, expectedWC, wc)
}
w.RegisterWatcher(cb)
require.Equal(t, 1, cbCount)

// No work waits for eval.
expectedWC = NoWorkWaitsForEval
kvflowcontrol.Enabled.Override(ctx, &st.SV, false)
var ch2 <-chan struct{}
wec, ch2 = w.Current()
require.Equal(t, NoWorkWaitsForEval, wec)
require.Equal(t, expectedWC, wec)
require.NotNil(t, ch2)
require.NotEqual(t, ch1, ch2)
require.True(t, wec.Bypass(admissionpb.ElasticWorkClass))
require.True(t, wec.Bypass(admissionpb.RegularWorkClass))
require.Equal(t, 2, cbCount)

// All work waits for eval.
expectedWC = AllWorkWaitsForEval
kvflowcontrol.Enabled.Override(ctx, &st.SV, true)
var ch3 <-chan struct{}
wec, ch3 = w.Current()
require.Equal(t, AllWorkWaitsForEval, wec)
require.Equal(t, expectedWC, wec)
// Channel has not changed.
require.Equal(t, ch2, ch3)
require.Equal(t, 3, cbCount)

// Elastic work waits for eval.
expectedWC = OnlyElasticWorkWaitsForEval
kvflowcontrol.Mode.Override(ctx, &st.SV, kvflowcontrol.ApplyToElastic)
var ch4 <-chan struct{}
wec, ch4 = w.Current()
require.Equal(t, OnlyElasticWorkWaitsForEval, wec)
require.Equal(t, expectedWC, wec)
require.NotNil(t, ch4)
require.NotEqual(t, ch3, ch4)
require.False(t, wec.Bypass(admissionpb.ElasticWorkClass))
require.True(t, wec.Bypass(admissionpb.RegularWorkClass))
require.Equal(t, 4, cbCount)

// All work waits for eval.
expectedWC = AllWorkWaitsForEval
kvflowcontrol.Mode.Override(ctx, &st.SV, kvflowcontrol.ApplyToAll)
var ch5 <-chan struct{}
wec, ch5 = w.Current()
require.Equal(t, AllWorkWaitsForEval, wec)
require.Equal(t, expectedWC, wec)
// Channel has not changed.
require.Equal(t, ch4, ch5)
require.Equal(t, 5, cbCount)
}
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ func testStoreConfig(clock *hlc.Clock, version roachpb.Version) StoreConfig {
),
KVFlowAdmittedPiggybacker: node_rac2.NewAdmittedPiggybacker(),
KVFlowStreamTokenProvider: rac2.NewStreamTokenCounterProvider(st, clock),
KVFlowWaitForEvalConfig: rac2.NewWaitForEvalConfig(st),
KVFlowEvalWaitMetrics: rac2.NewEvalWaitMetrics(),
KVFlowRangeControllerMetrics: rac2.NewRangeControllerMetrics(),
}
Expand Down Expand Up @@ -1550,6 +1551,17 @@ func NewStore(
CurCount: s.metrics.RaftRcvdQueuedBytes,
Settings: cfg.Settings,
})
s.cfg.KVFlowWaitForEvalConfig.RegisterWatcher(func(wc rac2.WaitForEvalCategory) {
// When the system is configured with rac2.AllWorkWaitsForEval, RACv2 is
// running in a mode where all senders are using send token pools for all
// messages to pace sending to a receiving store. These send token pools
// are stricter than per range limits. Additionally, these are byte sized
// pools, which is a better way to protect the receiver than a count
// limit. Hence, we turn off maxLen enforcement in that case, since it is
// unnecessary extra protection, and to respect this on the sender side
// (in RACv2 code) would result in unnecessary code complexity.
s.raftRecvQueues.SetEnforceMaxLen(wc != rac2.AllWorkWaitsForEval)
})

s.cfg.RangeLogWriter = newWrappedRangeLogWriter(
s.metrics.getCounterForRangeLogEventType,
Expand Down
63 changes: 58 additions & 5 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package kvserver
import (
"context"
"math"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -43,7 +44,8 @@ type raftReceiveQueue struct {
mu struct { // not to be locked directly
destroyed bool
syncutil.Mutex
infos []raftRequestInfo
infos []raftRequestInfo
enforceMaxLen bool
}
maxLen int
acc mon.BoundAccount
Expand Down Expand Up @@ -106,7 +108,7 @@ func (q *raftReceiveQueue) Append(
size = int64(req.Size())
q.mu.Lock()
defer q.mu.Unlock()
if q.mu.destroyed || len(q.mu.infos) >= q.maxLen {
if q.mu.destroyed || (q.mu.enforceMaxLen && len(q.mu.infos) >= q.maxLen) {
return false, size, false
}
if q.acc.Grow(context.Background(), size) != nil {
Expand All @@ -122,9 +124,22 @@ func (q *raftReceiveQueue) Append(
return len(q.mu.infos) == 1, size, true
}

func (q *raftReceiveQueue) SetEnforceMaxLen(enforceMaxLen bool) {
q.mu.Lock()
defer q.mu.Unlock()
q.mu.enforceMaxLen = enforceMaxLen
}

func (q *raftReceiveQueue) getEnforceMaxLenForTesting() bool {
q.mu.Lock()
defer q.mu.Unlock()
return q.mu.enforceMaxLen
}

type raftReceiveQueues struct {
mon *mon.BytesMonitor
m syncutil.Map[roachpb.RangeID, raftReceiveQueue]
mon *mon.BytesMonitor
m syncutil.Map[roachpb.RangeID, raftReceiveQueue]
enforceMaxLen atomic.Bool
}

func (qs *raftReceiveQueues) Load(rangeID roachpb.RangeID) (*raftReceiveQueue, bool) {
Expand All @@ -139,7 +154,33 @@ func (qs *raftReceiveQueues) LoadOrCreate(
}
q := &raftReceiveQueue{maxLen: maxLen}
q.acc.Init(context.Background(), qs.mon)
return qs.m.LoadOrStore(rangeID, q)
q, loaded = qs.m.LoadOrStore(rangeID, q)
if !loaded {
// The sampling of enforceMaxLen can be concurrent with a call to
// SetEnforceMaxLen. We can sample a stale value, then SetEnforceMaxLen
// can fully execute, and then set the stale value here. Since
// qs.SetEnforceMaxLen sets the latest value first, before iterating over
// the map, it suffices to check after setting the value here that it has
// not changed. There are two cases:
//
// - Has changed: it is possible that our call to q.SetEnforceMaxLen
// occurred after the corresponding call in qs.SetEnforceMaxLen, so we
// have to loop back and correct it.
//
// - Has not changed: there may be a concurrent call to
// qs.SetEnforceMaxLen with a different bool parameter, but it has not
// yet updated qs.enforceMaxLen. Which is fine -- that call will iterate
// over the map and do what is necessary.
for {
enforceBefore := qs.enforceMaxLen.Load()
q.SetEnforceMaxLen(enforceBefore)
enforceAfter := qs.enforceMaxLen.Load()
if enforceAfter == enforceBefore {
break
}
}
}
return q, loaded
}

// Delete drains the queue and marks it as deleted. Future Appends
Expand All @@ -151,6 +192,18 @@ func (qs *raftReceiveQueues) Delete(rangeID roachpb.RangeID) {
}
}

// SetEnforceMaxLen specifies the latest state of whether maxLen needs to be
// enforced or not. Calls to this method are serialized by the caller.
func (qs *raftReceiveQueues) SetEnforceMaxLen(enforceMaxLen bool) {
// Store the latest value first. A concurrent creation of raftReceiveQueue
// can race with this method -- see how this is handled in LoadOrCreate.
qs.enforceMaxLen.Store(enforceMaxLen)
qs.m.Range(func(_ roachpb.RangeID, q *raftReceiveQueue) bool {
q.SetEnforceMaxLen(enforceMaxLen)
return true
})
}

// HandleDelegatedSnapshot reads the incoming delegated snapshot message and
// throttles sending snapshots before passing the request to the sender replica.
func (s *Store) HandleDelegatedSnapshot(
Expand Down
Loading

0 comments on commit 49fbd0e

Please sign in to comment.