-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathkvflowdispatch.go
230 lines (205 loc) · 6.97 KB
/
kvflowdispatch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvflowdispatch
import (
"context"
"sync"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)
// AdmittedRaftLogEntriesBytes is an estimate that comes from
// kvflowdispatch.TestDispatchSize().
const AdmittedRaftLogEntriesBytes = 50
// Dispatch is a concrete implementation of the kvflowcontrol.Dispatch
// interface. It's used to (i) dispatch information about admitted raft log
// entries to specific nodes, and (ii) to read pending dispatches.
type Dispatch struct {
mu struct {
syncutil.Mutex
// outbox maintains pending dispatches on a per-node basis.
outbox sync.Map // roachpb.NodeID -> *dispatches
}
metrics *metrics
// handles is used to dispatch tokens locally. Remote token dispatches are
// driven by the RaftTransport.
handles kvflowcontrol.Handles
nodeID *base.NodeIDContainer
}
// dispatchKey is used to coalesce dispatches bound for a given node. If
// transmitting two kvflowcontrolpb.AdmittedRaftLogEntries with the same
// <RangeID,StoreID,WorkPriority> triple, with UpToRaftLogPositions L1 and L2
// where L1 < L2, we can simply dispatch the one with L2.
type dispatchKey struct {
roachpb.RangeID
roachpb.StoreID
admissionpb.WorkPriority
}
type dispatches struct {
mu struct {
syncutil.Mutex
items map[dispatchKey]kvflowcontrolpb.RaftLogPosition
}
}
var _ kvflowcontrol.Dispatch = &Dispatch{}
// New constructs a new Dispatch.
func New(
registry *metric.Registry, handles kvflowcontrol.Handles, nodeID *base.NodeIDContainer,
) *Dispatch {
d := &Dispatch{
handles: handles,
nodeID: nodeID,
}
d.mu.outbox = sync.Map{}
d.metrics = newMetrics()
registry.AddMetricStruct(d.metrics)
return d
}
// Dispatch is part of the kvflowcontrol.Dispatch interface.
func (d *Dispatch) Dispatch(
ctx context.Context, nodeID roachpb.NodeID, entries kvflowcontrolpb.AdmittedRaftLogEntries,
) {
if log.V(1) {
log.Infof(ctx, "dispatching %s to n%s", entries, nodeID)
}
pri := admissionpb.WorkPriority(entries.AdmissionPriority)
wc := admissionpb.WorkClassFromPri(pri)
if nodeID == d.nodeID.Get() { // local fast-path
handle, found := d.handles.Lookup(entries.RangeID)
if found {
handle.ReturnTokensUpto(
ctx,
admissionpb.WorkPriority(entries.AdmissionPriority),
entries.UpToRaftLogPosition, kvflowcontrol.Stream{
StoreID: entries.StoreID,
})
}
// If we've not found the local kvflowcontrol.Handle, it's because the
// range leaseholder/leader has recently been moved elsewhere. It's ok
// to drop these tokens on the floor since we already returned it when
// moving the leaseholder/leader.
d.metrics.LocalDispatch[wc].Inc(1)
return
}
d.metrics.RemoteDispatch[wc].Inc(1)
dispatchMap := d.getDispatchMap(nodeID)
var existing kvflowcontrolpb.RaftLogPosition
var found bool
func() {
dispatchMap.mu.Lock()
defer dispatchMap.mu.Unlock()
if len(dispatchMap.mu.items) == 0 {
d.metrics.PendingNodes.Inc(1)
}
dk := dispatchKey{
entries.RangeID,
entries.StoreID,
pri,
}
existing, found = dispatchMap.mu.items[dk]
if !found || existing.Less(entries.UpToRaftLogPosition) {
dispatchMap.mu.items[dk] = entries.UpToRaftLogPosition
}
}()
if found {
if existing.Less(entries.UpToRaftLogPosition) {
// We're replacing an existing dispatch with one with a higher log
// position. Increment the coalesced metric.
d.metrics.CoalescedDispatches[wc].Inc(1)
} else {
// We're dropping a dispatch given we already have a pending one with a
// higher log position. Increment the coalesced metric.
d.metrics.CoalescedDispatches[wc].Inc(1)
}
} else {
d.metrics.PendingDispatches[wc].Inc(1)
}
}
// PendingDispatch is part of the kvflowcontrol.Dispatch interface.
func (d *Dispatch) PendingDispatch() []roachpb.NodeID {
// NB: We're holding the Dispatch mutex here, which guards against new buckets
// being added, synchronization we don't get out of sync.Map.Range() directly.
d.mu.Lock()
defer d.mu.Unlock()
var nodes []roachpb.NodeID
d.mu.outbox.Range(func(key, value any) bool {
dispatchMap := value.(*dispatches)
node := key.(roachpb.NodeID)
dispatchMap.mu.Lock()
defer dispatchMap.mu.Unlock()
if len(dispatchMap.mu.items) > 0 {
nodes = append(nodes, node)
}
return true
})
return nodes
}
// PendingDispatchFor is part of the kvflowcontrol.Dispatch interface.
func (d *Dispatch) PendingDispatchFor(
nodeID roachpb.NodeID, maxBytes int64,
) ([]kvflowcontrolpb.AdmittedRaftLogEntries, int) {
dispatchMap := d.getDispatchMap(nodeID)
dispatchMap.mu.Lock()
defer dispatchMap.mu.Unlock()
if len(dispatchMap.mu.items) == 0 {
return nil, 0
}
var entries []kvflowcontrolpb.AdmittedRaftLogEntries
maxEntries := maxBytes / AdmittedRaftLogEntriesBytes
for key, dispatch := range dispatchMap.mu.items {
if maxEntries == 0 {
break
}
// TODO(irfansharif,aaditya): This contributes to 0.5% of alloc_objects
// under kv0/enc=false/nodes=3/cpu=96. Maybe address it as part of
// #104154; we're simply copying things over. Maybe use a sync.Pool here
// and around the outbox map?
entries = append(entries, kvflowcontrolpb.AdmittedRaftLogEntries{
RangeID: key.RangeID,
StoreID: key.StoreID,
AdmissionPriority: int32(key.WorkPriority),
UpToRaftLogPosition: dispatch,
})
wc := admissionpb.WorkClassFromPri(key.WorkPriority)
d.metrics.PendingDispatches[wc].Dec(1)
maxEntries -= 1
delete(dispatchMap.mu.items, key)
}
remainingDispatches := len(dispatchMap.mu.items)
if remainingDispatches == 0 {
d.metrics.PendingNodes.Dec(1)
}
return entries, remainingDispatches
}
func (d *Dispatch) getDispatchMap(nodeID roachpb.NodeID) *dispatches {
// We hold the d.mu.Lock every time we add to d.mu.outbox to avoid a race
// condition in initializing dispatchMap.mu.items and reading from it after
// returning from this function.
dispatchMap, ok := d.mu.outbox.Load(nodeID)
if !ok {
d.mu.Lock()
defer d.mu.Unlock()
var loaded bool
dispatchMap, loaded = d.mu.outbox.LoadOrStore(nodeID, &dispatches{})
if !loaded {
dispatchMap.(*dispatches).mu.items = make(map[dispatchKey]kvflowcontrolpb.RaftLogPosition)
}
}
return dispatchMap.(*dispatches)
}
// testingMetrics returns the underlying metrics struct for testing purposes.
func (d *Dispatch) testingMetrics() *metrics {
return d.metrics
}