From 119fb63d0f914f896aaff5cef568f0e106d91062 Mon Sep 17 00:00:00 2001 From: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com> Date: Tue, 3 Oct 2023 16:51:14 -0400 Subject: [PATCH 1/2] kvflowdispatch: add benchmark for dispatch Informs: #104154. Release note: None --- .../kvflowdispatch/kvflowdispatch_test.go | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go index 3102cca82e06..6c2172e1ee8c 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go @@ -187,6 +187,34 @@ func TestDispatchSize(t *testing.T) { require.LessOrEqual(t, len(entry), AdmittedRaftLogEntriesBytes, "consider adjusting kvadmission.flow_control.dispatch.max_bytes") } +func BenchmarkDispatch(b *testing.B) { + ctx := context.Background() + nodeIDContainer := &base.NodeIDContainer{} + nodeID := roachpb.NodeID(1) + nodeID2 := roachpb.NodeID(2) + storeID := roachpb.StoreID(1) + rangeID := roachpb.RangeID(1) + nodeIDContainer.Set(ctx, nodeID) + + dispatch := New(metric.NewRegistry(), dummyHandles{}, nodeIDContainer) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + for j := 0; j < 5; j++ { + dispatch.Dispatch(ctx, nodeID2, kvflowcontrolpb.AdmittedRaftLogEntries{ + RangeID: rangeID, + AdmissionPriority: int32(admissionpb.NormalPri), + UpToRaftLogPosition: kvflowcontrolpb.RaftLogPosition{ + Term: 1, + Index: 10, + }, + StoreID: storeID, + }) + } + dispatch.PendingDispatchFor(nodeID2, math.MaxInt64) + } +} + func parseLogPosition(t *testing.T, input string) kvflowcontrolpb.RaftLogPosition { inner := strings.Split(input, "/") require.Len(t, inner, 2) From a30f9eb84ecef409d0ca3cd56d1e419be8ac011a Mon Sep 17 00:00:00 2001 From: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com> Date: Tue, 19 Sep 2023 16:04:27 -0400 Subject: [PATCH 2/2] kvflowdispatch: move down dispatch mutex to reduce contention In `kv0/enc=false/nodes=3/cpu=96`, we noticed mutex contention around the `outbox` map. This patch tries to alleviate that by moving the mutex down into each individual dispatch map (sharding by NodeID). Informs: #104154. Release note: None --- .../kvflowdispatch/kvflowdispatch.go | 118 +++++++++++------- 1 file changed, 76 insertions(+), 42 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go index 6b476cc73acb..1af808641b92 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go @@ -12,6 +12,7 @@ package kvflowdispatch import ( "context" + "sync" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" @@ -32,13 +33,9 @@ const AdmittedRaftLogEntriesBytes = 50 // entries to specific nodes, and (ii) to read pending dispatches. type Dispatch struct { mu struct { - // TODO(irfansharif,aaditya): On kv0/enc=false/nodes=3/cpu=96 this mutex - // is responsible for ~3.7% of the mutex contention. Look to address it - // as part of #104154. Perhaps shard this mutex by node ID? Or use a - // sync.Map instead? syncutil.Mutex // outbox maintains pending dispatches on a per-node basis. - outbox map[roachpb.NodeID]dispatches + outbox sync.Map // roachpb.NodeID -> *dispatches } metrics *metrics // handles is used to dispatch tokens locally. Remote token dispatches are @@ -57,7 +54,12 @@ type dispatchKey struct { admissionpb.WorkPriority } -type dispatches map[dispatchKey]kvflowcontrolpb.RaftLogPosition +type dispatches struct { + mu struct { + syncutil.Mutex + items map[dispatchKey]kvflowcontrolpb.RaftLogPosition + } +} var _ kvflowcontrol.Dispatch = &Dispatch{} @@ -69,7 +71,7 @@ func New( handles: handles, nodeID: nodeID, } - d.mu.outbox = make(map[roachpb.NodeID]dispatches) + d.mu.outbox = sync.Map{} d.metrics = newMetrics() registry.AddMetricStruct(d.metrics) return d @@ -101,50 +103,65 @@ func (d *Dispatch) Dispatch( d.metrics.LocalDispatch[wc].Inc(1) return } - d.metrics.RemoteDispatch[wc].Inc(1) - d.mu.Lock() - defer d.mu.Unlock() + d.metrics.RemoteDispatch[wc].Inc(1) + dispatchMap := d.getDispatchMap(nodeID) + var existing kvflowcontrolpb.RaftLogPosition + var found bool + func() { + dispatchMap.mu.Lock() + defer dispatchMap.mu.Unlock() + + if len(dispatchMap.mu.items) == 0 { + d.metrics.PendingNodes.Inc(1) + } - if _, ok := d.mu.outbox[nodeID]; !ok { - d.mu.outbox[nodeID] = dispatches{} - d.metrics.PendingNodes.Inc(1) - } + dk := dispatchKey{ + entries.RangeID, + entries.StoreID, + pri, + } - dk := dispatchKey{ - entries.RangeID, - entries.StoreID, - pri, - } + existing, found = dispatchMap.mu.items[dk] - existing, found := d.mu.outbox[nodeID][dk] - if !found || existing.Less(entries.UpToRaftLogPosition) { - d.mu.outbox[nodeID][dk] = entries.UpToRaftLogPosition + if !found || existing.Less(entries.UpToRaftLogPosition) { + dispatchMap.mu.items[dk] = entries.UpToRaftLogPosition + } + }() - if !found { - d.metrics.PendingDispatches[wc].Inc(1) - } else { + if found { + if existing.Less(entries.UpToRaftLogPosition) { // We're replacing an existing dispatch with one with a higher log // position. Increment the coalesced metric. d.metrics.CoalescedDispatches[wc].Inc(1) + } else { + // We're dropping a dispatch given we already have a pending one with a + // higher log position. Increment the coalesced metric. + d.metrics.CoalescedDispatches[wc].Inc(1) } - } - if found && !existing.Less(entries.UpToRaftLogPosition) { - // We're dropping a dispatch given we already have a pending one with a - // higher log position. Increment the coalesced metric. - d.metrics.CoalescedDispatches[wc].Inc(1) + } else { + d.metrics.PendingDispatches[wc].Inc(1) } } // PendingDispatch is part of the kvflowcontrol.Dispatch interface. func (d *Dispatch) PendingDispatch() []roachpb.NodeID { + // NB: We're holding the Dispatch mutex here, which guards against new buckets + // being added, synchronization we don't get out of sync.Map.Range() directly. d.mu.Lock() defer d.mu.Unlock() - nodes := make([]roachpb.NodeID, 0, len(d.mu.outbox)) - for node := range d.mu.outbox { - nodes = append(nodes, node) - } + var nodes []roachpb.NodeID + d.mu.outbox.Range(func(key, value any) bool { + dispatchMap := value.(*dispatches) + node := key.(roachpb.NodeID) + dispatchMap.mu.Lock() + defer dispatchMap.mu.Unlock() + if len(dispatchMap.mu.items) > 0 { + nodes = append(nodes, node) + } + return true + }) return nodes } @@ -152,16 +169,18 @@ func (d *Dispatch) PendingDispatch() []roachpb.NodeID { func (d *Dispatch) PendingDispatchFor( nodeID roachpb.NodeID, maxBytes int64, ) ([]kvflowcontrolpb.AdmittedRaftLogEntries, int) { - d.mu.Lock() - defer d.mu.Unlock() + dispatchMap := d.getDispatchMap(nodeID) - if _, ok := d.mu.outbox[nodeID]; !ok { + dispatchMap.mu.Lock() + defer dispatchMap.mu.Unlock() + + if len(dispatchMap.mu.items) == 0 { return nil, 0 } var entries []kvflowcontrolpb.AdmittedRaftLogEntries maxEntries := maxBytes / AdmittedRaftLogEntriesBytes - for key, dispatch := range d.mu.outbox[nodeID] { + for key, dispatch := range dispatchMap.mu.items { if maxEntries == 0 { break } @@ -178,18 +197,33 @@ func (d *Dispatch) PendingDispatchFor( wc := admissionpb.WorkClassFromPri(key.WorkPriority) d.metrics.PendingDispatches[wc].Dec(1) maxEntries -= 1 - delete(d.mu.outbox[nodeID], key) + delete(dispatchMap.mu.items, key) } - remainingDispatches := len(d.mu.outbox[nodeID]) + remainingDispatches := len(dispatchMap.mu.items) if remainingDispatches == 0 { - delete(d.mu.outbox, nodeID) d.metrics.PendingNodes.Dec(1) } - return entries, remainingDispatches } +func (d *Dispatch) getDispatchMap(nodeID roachpb.NodeID) *dispatches { + // We hold the d.mu.Lock every time we add to d.mu.outbox to avoid a race + // condition in initializing dispatchMap.mu.items and reading from it after + // returning from this function. + dispatchMap, ok := d.mu.outbox.Load(nodeID) + if !ok { + d.mu.Lock() + defer d.mu.Unlock() + var loaded bool + dispatchMap, loaded = d.mu.outbox.LoadOrStore(nodeID, &dispatches{}) + if !loaded { + dispatchMap.(*dispatches).mu.items = make(map[dispatchKey]kvflowcontrolpb.RaftLogPosition) + } + } + return dispatchMap.(*dispatches) +} + // testingMetrics returns the underlying metrics struct for testing purposes. func (d *Dispatch) testingMetrics() *metrics { return d.metrics