-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathmanager.go
400 lines (367 loc) · 12.6 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package spanlatch
import (
"context"
"unsafe"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)
// A Manager maintains an interval tree of key and key range latches. Latch
// acquitions affecting keys or key ranges must wait on already-acquired latches
// which overlap their key ranges to be released.
//
// Latch acquisition attempts invoke Manager.Acquire and provide details about
// the spans that they plan to touch and the timestamps they plan to touch them
// at. Acquire inserts the latch into the Manager's tree and waits on
// prerequisite latch attempts that are already tracked by the Manager.
// Manager.Acquire blocks until the latch acquisition completes, at which point
// it returns a Guard, which is scoped to the lifetime of the latch ownership.
//
// When the latches are no longer needed, they are released by invoking
// Manager.Release with the Guard returned when the latches were originally
// acquired. Doing so removes the latches from the Manager's tree and signals to
// dependent latch acquisitions that they no longer need to wait on the released
// latches.
//
// Manager is safe for concurrent use by multiple goroutines. Concurrent access
// is made efficient using a copy-on-write technique to capture immutable
// snapshots of the type's inner btree structures. Using this strategy, tasks
// requiring mutual exclusion are limited to updating the type's trees and
// grabbing snapshots. Notably, scanning for and waiting on prerequisite latches
// is performed outside of the mutual exclusion zone. This means that the work
// performed under lock is linear with respect to the number of spans that a
// latch acquisition declares but NOT linear with respect to the number of other
// latch attempts that it will wait on.
//
// Manager's zero value can be used directly.
type Manager struct {
mu syncutil.Mutex
idAlloc uint64
scopes [spanset.NumSpanScope]scopedManager
}
// scopedManager is a latch manager scoped to either local or global keys.
// See spanset.SpanScope.
type scopedManager struct {
readSet latchList
trees [spanset.NumSpanAccess]btree
}
// latches are stored in the Manager's btrees. They represent the latching
// of a single key span.
type latch struct {
id uint64
span roachpb.Span
ts hlc.Timestamp
done *signal
next, prev *latch // readSet linked-list.
}
func (la *latch) inReadSet() bool {
return la.next != nil
}
// Guard is a handle to a set of acquired latches. It is returned by
// Manager.Acquire and accepted by Manager.Release.
type Guard struct {
done signal
// latches [spanset.NumSpanScope][spanset.NumSpanAccess][]latch, but half the size.
latchesPtrs [spanset.NumSpanScope][spanset.NumSpanAccess]unsafe.Pointer
latchesLens [spanset.NumSpanScope][spanset.NumSpanAccess]int32
}
func (lg *Guard) latches(s spanset.SpanScope, a spanset.SpanAccess) []latch {
len := lg.latchesLens[s][a]
if len == 0 {
return nil
}
const maxArrayLen = 1 << 31
return (*[maxArrayLen]latch)(lg.latchesPtrs[s][a])[:len:len]
}
func (lg *Guard) setLatches(s spanset.SpanScope, a spanset.SpanAccess, latches []latch) {
lg.latchesPtrs[s][a] = unsafe.Pointer(&latches[0])
lg.latchesLens[s][a] = int32(len(latches))
}
func allocGuardAndLatches(nLatches int) (*Guard, []latch) {
// Guard would be an ideal candidate for object pooling, but without
// reference counting its latches we can't know whether they're still
// referenced by other tree snapshots. The latches hold a reference to
// the signal living on the Guard, so the guard can't be recycled while
// latches still point to it.
if nLatches <= 1 {
alloc := new(struct {
g Guard
latches [1]latch
})
return &alloc.g, alloc.latches[:nLatches]
} else if nLatches <= 2 {
alloc := new(struct {
g Guard
latches [2]latch
})
return &alloc.g, alloc.latches[:nLatches]
} else if nLatches <= 4 {
alloc := new(struct {
g Guard
latches [4]latch
})
return &alloc.g, alloc.latches[:nLatches]
} else if nLatches <= 8 {
alloc := new(struct {
g Guard
latches [8]latch
})
return &alloc.g, alloc.latches[:nLatches]
}
return new(Guard), make([]latch, nLatches)
}
func newGuard(spans *spanset.SpanSet, ts hlc.Timestamp) *Guard {
nLatches := 0
for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ {
for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ {
nLatches += len(spans.GetSpans(a, s))
}
}
guard, latches := allocGuardAndLatches(nLatches)
for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ {
for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ {
ss := spans.GetSpans(a, s)
n := len(ss)
if n == 0 {
continue
}
ssLatches := latches[:n]
for i := range ssLatches {
latch := &latches[i]
latch.span = ss[i]
latch.ts = ifGlobal(ts, s)
latch.done = &guard.done
// latch.setID() in Manager.insert, under lock.
}
guard.setLatches(s, a, ssLatches)
latches = latches[n:]
}
}
if len(latches) != 0 {
panic("alloc too large")
}
return guard
}
// Acquire acquires latches from the Manager for each of the provided spans, at
// the specified timestamp. In doing so, it waits for latches over all
// overlapping spans to be released before returning. If the provided context
// is canceled before the method is done waiting for overlapping latches to
// be released, it stops waiting and releases all latches that it has already
// acquired.
//
// It returns a Guard which must be provided to Release.
func (m *Manager) Acquire(
ctx context.Context, spans *spanset.SpanSet, ts hlc.Timestamp,
) (*Guard, error) {
lg, snap := m.sequence(spans, ts)
defer snap.close()
err := m.wait(ctx, lg, ts, snap)
if err != nil {
m.Release(lg)
return nil, err
}
return lg, nil
}
// sequence locks the manager, captures an immutable snapshot, inserts latches
// for each of the specified spans into the manager's interval trees, and
// unlocks the manager. The role of the method is to sequence latch acquisition
// attempts.
func (m *Manager) sequence(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, snapshot) {
lg := newGuard(spans, ts)
m.mu.Lock()
snap := m.snapshotLocked(spans)
m.insertLocked(lg)
m.mu.Unlock()
return lg, snap
}
// snapshot is an immutable view into the latch manager's state.
type snapshot struct {
trees [spanset.NumSpanScope][spanset.NumSpanAccess]btree
}
// close closes the snapshot and releases any associated resources.
func (sn *snapshot) close() {
for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ {
for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ {
sn.trees[s][a].Reset()
}
}
}
// snapshotLocked captures an immutable snapshot of the latch manager. It takes
// a spanset to limit the amount of state captured.
func (m *Manager) snapshotLocked(spans *spanset.SpanSet) snapshot {
var snap snapshot
for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ {
sm := &m.scopes[s]
reading := len(spans.GetSpans(spanset.SpanReadOnly, s)) > 0
writing := len(spans.GetSpans(spanset.SpanReadWrite, s)) > 0
if writing {
sm.flushReadSetLocked()
snap.trees[s][spanset.SpanReadOnly] = sm.trees[spanset.SpanReadOnly].Clone()
}
if writing || reading {
snap.trees[s][spanset.SpanReadWrite] = sm.trees[spanset.SpanReadWrite].Clone()
}
}
return snap
}
// flushReadSetLocked flushes the read set into the read interval tree.
func (sm *scopedManager) flushReadSetLocked() {
for sm.readSet.len > 0 {
latch := sm.readSet.front()
sm.readSet.remove(latch)
sm.trees[spanset.SpanReadOnly].Set(latch)
}
}
// insertLocked inserts the latches owned by the provided Guard into the
// Manager.
func (m *Manager) insertLocked(lg *Guard) {
for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ {
sm := &m.scopes[s]
for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ {
latches := lg.latches(s, a)
for i := range latches {
latch := &latches[i]
latch.id = m.nextID()
switch a {
case spanset.SpanReadOnly:
// Add reads to the rSet. They only need to enter the read
// tree if they're flushed by a write capturing a snapshot.
sm.readSet.pushBack(latch)
case spanset.SpanReadWrite:
// Add writes directly to the write tree.
sm.trees[spanset.SpanReadWrite].Set(latch)
default:
panic("unknown access")
}
}
}
}
}
func (m *Manager) nextID() uint64 {
m.idAlloc++
return m.idAlloc
}
// ignoreFn is used for non-interference of earlier reads with later writes.
//
// However, this is only desired for the global scope. Reads and writes to local
// keys are specified to always interfere, regardless of their timestamp. This
// is done to avoid confusion with local keys declared as part of proposer
// evaluated KV.
//
// This is also disabled in the global scope if either of the timestamps are
// empty. In those cases, we consider the latch without a timestamp to be a
// non-MVCC operation that affects all timestamps in the key range.
type ignoreFn func(ts, other hlc.Timestamp) bool
func ignoreLater(ts, other hlc.Timestamp) bool { return !ts.IsEmpty() && ts.Less(other) }
func ignoreEarlier(ts, other hlc.Timestamp) bool { return !other.IsEmpty() && other.Less(ts) }
func ignoreNothing(ts, other hlc.Timestamp) bool { return false }
func ifGlobal(ts hlc.Timestamp, s spanset.SpanScope) hlc.Timestamp {
switch s {
case spanset.SpanGlobal:
return ts
case spanset.SpanLocal:
// All local latches interfere.
return hlc.Timestamp{}
default:
panic("unknown scope")
}
}
// wait waits for all interfering latches in the provided snapshot to complete
// before returning.
func (m *Manager) wait(ctx context.Context, lg *Guard, ts hlc.Timestamp, snap snapshot) error {
for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ {
tr := &snap.trees[s]
for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ {
latches := lg.latches(s, a)
for i := range latches {
latch := &latches[i]
switch a {
case spanset.SpanReadOnly:
// Wait for writes at equal or lower timestamps.
it := tr[spanset.SpanReadWrite].MakeIter()
if err := iterAndWait(ctx, &it, latch, ts, ignoreLater); err != nil {
return err
}
case spanset.SpanReadWrite:
// Wait for reads at equal or higher timestamps.
it := tr[spanset.SpanReadOnly].MakeIter()
if err := iterAndWait(ctx, &it, latch, ts, ignoreEarlier); err != nil {
return err
}
// Wait for all other writes.
it = tr[spanset.SpanReadWrite].MakeIter()
if err := iterAndWait(ctx, &it, latch, ts, ignoreNothing); err != nil {
return err
}
default:
panic("unknown access")
}
}
}
}
return nil
}
// iterAndWait uses the provided iterator to wait on all latches that overlap
// with the search latch and which should not be ignored given their timestamp
// and the supplied ignoreFn.
func iterAndWait(
ctx context.Context, it *iterator, search *latch, ts hlc.Timestamp, ignore ignoreFn,
) error {
done := ctx.Done()
for it.FirstOverlap(search); it.Valid(); it.NextOverlap() {
latch := it.Cur()
if latch.done.signaled() {
continue
}
if ignore(ts, latch.ts) {
continue
}
select {
case <-latch.done.signalChan():
case <-done:
return ctx.Err()
}
}
return nil
}
// Release releases the latches held by the provided Guard. After being called,
// dependent latch acquisition attempts can complete if not blocked on any other
// owned latches.
func (m *Manager) Release(lg *Guard) {
lg.done.signal()
m.mu.Lock()
m.removeLocked(lg)
m.mu.Unlock()
}
// removeLocked removes the latches owned by the provided Guard from the
// Manager. Must be called with mu held.
func (m *Manager) removeLocked(lg *Guard) {
for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ {
sm := &m.scopes[s]
for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ {
latches := lg.latches(s, a)
for i := range latches {
latch := &latches[i]
if latch.inReadSet() {
sm.readSet.remove(latch)
} else {
sm.trees[a].Delete(latch)
}
}
}
}
}