Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvflowdispatch: move down dispatch mutex to reduce contention #110933

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 76 additions & 42 deletions pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvflowdispatch

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
Expand All @@ -32,13 +33,9 @@ const AdmittedRaftLogEntriesBytes = 50
// entries to specific nodes, and (ii) to read pending dispatches.
type Dispatch struct {
mu struct {
// TODO(irfansharif,aaditya): On kv0/enc=false/nodes=3/cpu=96 this mutex
// is responsible for ~3.7% of the mutex contention. Look to address it
// as part of #104154. Perhaps shard this mutex by node ID? Or use a
// sync.Map instead?
syncutil.Mutex
// outbox maintains pending dispatches on a per-node basis.
outbox map[roachpb.NodeID]dispatches
outbox sync.Map // roachpb.NodeID -> *dispatches
}
metrics *metrics
// handles is used to dispatch tokens locally. Remote token dispatches are
Expand All @@ -57,7 +54,12 @@ type dispatchKey struct {
admissionpb.WorkPriority
}

type dispatches map[dispatchKey]kvflowcontrolpb.RaftLogPosition
type dispatches struct {
mu struct {
syncutil.Mutex
items map[dispatchKey]kvflowcontrolpb.RaftLogPosition
}
}

var _ kvflowcontrol.Dispatch = &Dispatch{}

Expand All @@ -69,7 +71,7 @@ func New(
handles: handles,
nodeID: nodeID,
}
d.mu.outbox = make(map[roachpb.NodeID]dispatches)
d.mu.outbox = sync.Map{}
d.metrics = newMetrics()
registry.AddMetricStruct(d.metrics)
return d
Expand Down Expand Up @@ -101,67 +103,84 @@ func (d *Dispatch) Dispatch(
d.metrics.LocalDispatch[wc].Inc(1)
return
}
d.metrics.RemoteDispatch[wc].Inc(1)

d.mu.Lock()
defer d.mu.Unlock()
d.metrics.RemoteDispatch[wc].Inc(1)
dispatchMap := d.getDispatchMap(nodeID)
var existing kvflowcontrolpb.RaftLogPosition
var found bool
func() {
dispatchMap.mu.Lock()
defer dispatchMap.mu.Unlock()

if len(dispatchMap.mu.items) == 0 {
d.metrics.PendingNodes.Inc(1)
}

if _, ok := d.mu.outbox[nodeID]; !ok {
d.mu.outbox[nodeID] = dispatches{}
d.metrics.PendingNodes.Inc(1)
}
dk := dispatchKey{
entries.RangeID,
entries.StoreID,
pri,
}

dk := dispatchKey{
entries.RangeID,
entries.StoreID,
pri,
}
existing, found = dispatchMap.mu.items[dk]

existing, found := d.mu.outbox[nodeID][dk]
if !found || existing.Less(entries.UpToRaftLogPosition) {
d.mu.outbox[nodeID][dk] = entries.UpToRaftLogPosition
if !found || existing.Less(entries.UpToRaftLogPosition) {
dispatchMap.mu.items[dk] = entries.UpToRaftLogPosition
}
}()

if !found {
d.metrics.PendingDispatches[wc].Inc(1)
} else {
if found {
if existing.Less(entries.UpToRaftLogPosition) {
// We're replacing an existing dispatch with one with a higher log
// position. Increment the coalesced metric.
d.metrics.CoalescedDispatches[wc].Inc(1)
} else {
// We're dropping a dispatch given we already have a pending one with a
// higher log position. Increment the coalesced metric.
d.metrics.CoalescedDispatches[wc].Inc(1)
}
}
if found && !existing.Less(entries.UpToRaftLogPosition) {
// We're dropping a dispatch given we already have a pending one with a
// higher log position. Increment the coalesced metric.
d.metrics.CoalescedDispatches[wc].Inc(1)
} else {
d.metrics.PendingDispatches[wc].Inc(1)
}
}

// PendingDispatch is part of the kvflowcontrol.Dispatch interface.
func (d *Dispatch) PendingDispatch() []roachpb.NodeID {
// NB: We're holding the Dispatch mutex here, which guards against new buckets
// being added, synchronization we don't get out of sync.Map.Range() directly.
d.mu.Lock()
defer d.mu.Unlock()

nodes := make([]roachpb.NodeID, 0, len(d.mu.outbox))
for node := range d.mu.outbox {
nodes = append(nodes, node)
}
var nodes []roachpb.NodeID
d.mu.outbox.Range(func(key, value any) bool {
dispatchMap := value.(*dispatches)
node := key.(roachpb.NodeID)
dispatchMap.mu.Lock()
defer dispatchMap.mu.Unlock()
if len(dispatchMap.mu.items) > 0 {
nodes = append(nodes, node)
}
return true
})
return nodes
}

// PendingDispatchFor is part of the kvflowcontrol.Dispatch interface.
func (d *Dispatch) PendingDispatchFor(
nodeID roachpb.NodeID, maxBytes int64,
) ([]kvflowcontrolpb.AdmittedRaftLogEntries, int) {
d.mu.Lock()
defer d.mu.Unlock()
dispatchMap := d.getDispatchMap(nodeID)

if _, ok := d.mu.outbox[nodeID]; !ok {
dispatchMap.mu.Lock()
defer dispatchMap.mu.Unlock()

if len(dispatchMap.mu.items) == 0 {
return nil, 0
}

var entries []kvflowcontrolpb.AdmittedRaftLogEntries
maxEntries := maxBytes / AdmittedRaftLogEntriesBytes
for key, dispatch := range d.mu.outbox[nodeID] {
for key, dispatch := range dispatchMap.mu.items {
if maxEntries == 0 {
break
}
Expand All @@ -178,18 +197,33 @@ func (d *Dispatch) PendingDispatchFor(
wc := admissionpb.WorkClassFromPri(key.WorkPriority)
d.metrics.PendingDispatches[wc].Dec(1)
maxEntries -= 1
delete(d.mu.outbox[nodeID], key)
delete(dispatchMap.mu.items, key)
}

remainingDispatches := len(d.mu.outbox[nodeID])
remainingDispatches := len(dispatchMap.mu.items)
if remainingDispatches == 0 {
delete(d.mu.outbox, nodeID)
d.metrics.PendingNodes.Dec(1)
}

return entries, remainingDispatches
}

func (d *Dispatch) getDispatchMap(nodeID roachpb.NodeID) *dispatches {
// We hold the d.mu.Lock every time we add to d.mu.outbox to avoid a race
// condition in initializing dispatchMap.mu.items and reading from it after
// returning from this function.
dispatchMap, ok := d.mu.outbox.Load(nodeID)
if !ok {
d.mu.Lock()
defer d.mu.Unlock()
var loaded bool
dispatchMap, loaded = d.mu.outbox.LoadOrStore(nodeID, &dispatches{})
if !loaded {
dispatchMap.(*dispatches).mu.items = make(map[dispatchKey]kvflowcontrolpb.RaftLogPosition)
}
}
return dispatchMap.(*dispatches)
}

// testingMetrics returns the underlying metrics struct for testing purposes.
func (d *Dispatch) testingMetrics() *metrics {
return d.metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,34 @@ func TestDispatchSize(t *testing.T) {
require.LessOrEqual(t, len(entry), AdmittedRaftLogEntriesBytes, "consider adjusting kvadmission.flow_control.dispatch.max_bytes")
}

func BenchmarkDispatch(b *testing.B) {
ctx := context.Background()
nodeIDContainer := &base.NodeIDContainer{}
nodeID := roachpb.NodeID(1)
nodeID2 := roachpb.NodeID(2)
storeID := roachpb.StoreID(1)
rangeID := roachpb.RangeID(1)
nodeIDContainer.Set(ctx, nodeID)

dispatch := New(metric.NewRegistry(), dummyHandles{}, nodeIDContainer)

b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < 5; j++ {
dispatch.Dispatch(ctx, nodeID2, kvflowcontrolpb.AdmittedRaftLogEntries{
RangeID: rangeID,
AdmissionPriority: int32(admissionpb.NormalPri),
UpToRaftLogPosition: kvflowcontrolpb.RaftLogPosition{
Term: 1,
Index: 10,
},
StoreID: storeID,
})
}
dispatch.PendingDispatchFor(nodeID2, math.MaxInt64)
}
}

func parseLogPosition(t *testing.T, input string) kvflowcontrolpb.RaftLogPosition {
inner := strings.Split(input, "/")
require.Len(t, inner, 2)
Expand Down