forked from cockroachdb/pebble
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinternal_iterator.go
332 lines (300 loc) · 11.5 KB
/
internal_iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.
package pebble
import (
"context"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
)
// scanInternalIterator is an iterator that returns all internal keys instead of
// collapsing them by user keys. For instance, an InternalKeyKindDelete would be
// returned as an InternalKeyKindDelete instead of the iterator skipping over to
// the next key. Useful if an external user of Pebble needs to observe and
// rebuild Pebble's history of internal keys, such as in node-to-node
// replication. For use with {db,snapshot}.ScanInternal().
//
// scanInternalIterator is allowed to ignorepoint keys deleted by range deletions,
// and range keys shadowed by a range key unset or delete, however it *must*
// return the range delete as well as the range key unset/delete that did the
// shadowing.
type scanInternalIterator struct {
opts IterOptions
comparer *base.Comparer
iter internalIterator
readState *readState
rangeKey *iteratorRangeKeyState
pointKeyIter keyspan.InterleavingIter
iterKey *InternalKey
iterValue LazyValue
alloc *iterAlloc
newIters tableNewIters
newIterRangeKey keyspan.TableNewSpanIter
seqNum uint64
// boundsBuf holds two buffers used to store the lower and upper bounds.
// Whenever the InternalIterator's bounds change, the new bounds are copied
// into boundsBuf[boundsBufIdx]. The two bounds share a slice to reduce
// allocations. opts.LowerBound and opts.UpperBound point into this slice.
boundsBuf [2][]byte
boundsBufIdx int
}
// TODO(sumeer): scanInternalImpl is the implementation for the user-facing
// ScanInternal. Make it accept a context parameter, and plumb it through for
// tracing support.
func scanInternalImpl(
lower []byte,
iter *scanInternalIterator,
visitPointKey func(key *InternalKey, value LazyValue) error,
visitRangeDel func(start, end []byte, seqNum uint64) error,
visitRangeKey func(start, end []byte, keys []keyspan.Key) error,
) error {
for valid := iter.seekGE(lower); valid && iter.error() == nil; valid = iter.next() {
key := iter.unsafeKey()
switch key.Kind() {
case InternalKeyKindRangeKeyDelete, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeySet:
span := iter.unsafeSpan()
if err := visitRangeKey(span.Start, span.End, span.Keys); err != nil {
return err
}
case InternalKeyKindRangeDelete:
rangeDel := iter.unsafeRangeDel()
if err := visitRangeDel(rangeDel.Start, rangeDel.End, rangeDel.LargestSeqNum()); err != nil {
return err
}
default:
val := iter.lazyValue()
if s := iter.pointKeyIter.Span(); s != nil && s.CoversAt(iter.seqNum, key.SeqNum()) {
// Key deleted by a range deletion. Skip it.
continue
}
if err := visitPointKey(key, val); err != nil {
return err
}
}
}
return nil
}
// constructPointIter constructs a merging iterator and sets i.iter to it.
func (i *scanInternalIterator) constructPointIter(memtables flushableList, buf *iterAlloc) {
// Merging levels and levels from iterAlloc.
mlevels := buf.mlevels[:0]
levels := buf.levels[:0]
// We compute the number of levels needed ahead of time and reallocate a slice if
// the array from the iterAlloc isn't large enough. Doing this allocation once
// should improve the performance.
numMergingLevels := len(memtables)
numLevelIters := 0
current := i.readState.current
numMergingLevels += len(current.L0SublevelFiles)
numLevelIters += len(current.L0SublevelFiles)
for level := 1; level < len(current.Levels); level++ {
if current.Levels[level].Empty() {
continue
}
numMergingLevels++
numLevelIters++
}
if numMergingLevels > cap(mlevels) {
mlevels = make([]mergingIterLevel, 0, numMergingLevels)
}
if numLevelIters > cap(levels) {
levels = make([]levelIter, 0, numLevelIters)
}
// TODO(bilal): Push these into the iterAlloc buf.
var rangeDelMiter keyspan.MergingIter
rangeDelIters := make([]keyspan.FragmentIterator, 0, numMergingLevels)
rangeDelLevels := make([]keyspan.LevelIter, 0, numLevelIters)
// Next are the memtables.
for j := len(memtables) - 1; j >= 0; j-- {
mem := memtables[j]
mlevels = append(mlevels, mergingIterLevel{
iter: mem.newIter(&i.opts),
})
if rdi := mem.newRangeDelIter(&i.opts); rdi != nil {
rangeDelIters = append(rangeDelIters, rdi)
}
}
// Next are the file levels: L0 sub-levels followed by lower levels.
mlevelsIndex := len(mlevels)
levelsIndex := len(levels)
mlevels = mlevels[:numMergingLevels]
levels = levels[:numLevelIters]
rangeDelLevels = rangeDelLevels[:numLevelIters]
addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Level) {
li := &levels[levelsIndex]
rli := &rangeDelLevels[levelsIndex]
li.init(
context.Background(), i.opts, i.comparer.Compare, i.comparer.Split, i.newIters, files, level,
internalIterOpts{})
li.initBoundaryContext(&mlevels[mlevelsIndex].levelIterBoundaryContext)
mlevels[mlevelsIndex].iter = li
rli.Init(keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters},
i.comparer.Compare, tableNewRangeDelIter(context.Background(), i.newIters), files, level,
manifest.KeyTypePoint)
rangeDelIters = append(rangeDelIters, rli)
levelsIndex++
mlevelsIndex++
}
// Add level iterators for the L0 sublevels, iterating from newest to
// oldest.
for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
addLevelIterForFiles(current.L0SublevelFiles[i].Iter(), manifest.L0Sublevel(i))
}
// Add level iterators for the non-empty non-L0 levels.
for level := 1; level < numLevels; level++ {
if current.Levels[level].Empty() {
continue
}
addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
}
buf.merging.init(&i.opts, &InternalIteratorStats{}, i.comparer.Compare, i.comparer.Split, mlevels...)
buf.merging.snapshot = i.seqNum
rangeDelMiter.Init(i.comparer.Compare, keyspan.VisibleTransform(i.seqNum), new(keyspan.MergingBuffers), rangeDelIters...)
i.pointKeyIter.Init(i.comparer, &buf.merging, &rangeDelMiter, nil /* mask */, i.opts.LowerBound, i.opts.UpperBound)
i.iter = &i.pointKeyIter
}
// constructRangeKeyIter constructs the range-key iterator stack, populating
// i.rangeKey.rangeKeyIter with the resulting iterator. This is similar to
// Iterator.constructRangeKeyIter, except it doesn't handle batches and ensures
// iterConfig does *not* elide unsets/deletes.
func (i *scanInternalIterator) constructRangeKeyIter() {
// We want the bounded iter from iterConfig, but not the collapsing of
// RangeKeyUnsets and RangeKeyDels.
i.rangeKey.rangeKeyIter = i.rangeKey.iterConfig.Init(
i.comparer, i.seqNum, i.opts.LowerBound, i.opts.UpperBound,
nil /* hasPrefix */, nil /* prefix */, false, /* onlySets */
&i.rangeKey.rangeKeyBuffers.internal)
// Next are the flushables: memtables and large batches.
for j := len(i.readState.memtables) - 1; j >= 0; j-- {
mem := i.readState.memtables[j]
// We only need to read from memtables which contain sequence numbers older
// than seqNum.
if logSeqNum := mem.logSeqNum; logSeqNum >= i.seqNum {
continue
}
if rki := mem.newRangeKeyIter(&i.opts); rki != nil {
i.rangeKey.iterConfig.AddLevel(rki)
}
}
current := i.readState.current
// Next are the file levels: L0 sub-levels followed by lower levels.
//
// Add file-specific iterators for L0 files containing range keys. This is less
// efficient than using levelIters for sublevels of L0 files containing
// range keys, but range keys are expected to be sparse anyway, reducing the
// cost benefit of maintaining a separate L0Sublevels instance for range key
// files and then using it here.
//
// NB: We iterate L0's files in reverse order. They're sorted by
// LargestSeqNum ascending, and we need to add them to the merging iterator
// in LargestSeqNum descending to preserve the merging iterator's invariants
// around Key Trailer order.
iter := current.RangeKeyLevels[0].Iter()
for f := iter.Last(); f != nil; f = iter.Prev() {
spanIterOpts := &keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters}
spanIter, err := i.newIterRangeKey(f, spanIterOpts)
if err != nil {
i.rangeKey.iterConfig.AddLevel(&errorKeyspanIter{err: err})
continue
}
i.rangeKey.iterConfig.AddLevel(spanIter)
}
// Add level iterators for the non-empty non-L0 levels.
for level := 1; level < len(current.RangeKeyLevels); level++ {
if current.RangeKeyLevels[level].Empty() {
continue
}
li := i.rangeKey.iterConfig.NewLevelIter()
spanIterOpts := keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters}
li.Init(spanIterOpts, i.comparer.Compare, i.newIterRangeKey, current.RangeKeyLevels[level].Iter(),
manifest.Level(level), manifest.KeyTypeRange)
i.rangeKey.iterConfig.AddLevel(li)
}
}
// seekGE seeks this iterator to the first key that's greater than or equal
// to the specified user key.
func (i *scanInternalIterator) seekGE(key []byte) bool {
i.iterKey, i.iterValue = i.iter.SeekGE(key, base.SeekGEFlagsNone)
return i.iterKey != nil
}
// unsafeKey returns the unsafe InternalKey at the current position. The value
// is nil if the iterator is invalid or exhausted.
func (i *scanInternalIterator) unsafeKey() *InternalKey {
return i.iterKey
}
// lazyValue returns a value pointer to the value at the current iterator
// position. Behaviour undefined if unsafeKey() returns a Range key or Rangedel
// kind key.
func (i *scanInternalIterator) lazyValue() LazyValue {
return i.iterValue
}
// unsafeRangeDel returns a range key span. Behaviour undefined if UnsafeKey returns
// a non-rangedel kind.
func (i *scanInternalIterator) unsafeRangeDel() *keyspan.Span {
return i.pointKeyIter.Span()
}
// unsafeSpan returns a range key span. Behaviour undefined if UnsafeKey returns
// a non-rangekey type.
func (i *scanInternalIterator) unsafeSpan() *keyspan.Span {
return i.rangeKey.iiter.Span()
}
// next advances the iterator in the forward direction, and returns the
// iterator's new validity state.
func (i *scanInternalIterator) next() bool {
i.iterKey, i.iterValue = i.iter.Next()
return i.iterKey != nil
}
// error returns an error from the internal iterator, if there's any.
func (i *scanInternalIterator) error() error {
return i.iter.Error()
}
// close closes this iterator, and releases any pooled objects.
func (i *scanInternalIterator) close() error {
if err := i.iter.Close(); err != nil {
return err
}
i.readState.unref()
if i.rangeKey != nil {
i.rangeKey.PrepareForReuse()
*i.rangeKey = iteratorRangeKeyState{
rangeKeyBuffers: i.rangeKey.rangeKeyBuffers,
}
iterRangeKeyStateAllocPool.Put(i.rangeKey)
i.rangeKey = nil
}
if alloc := i.alloc; alloc != nil {
for j := range i.boundsBuf {
if cap(i.boundsBuf[j]) >= maxKeyBufCacheSize {
alloc.boundsBuf[j] = nil
} else {
alloc.boundsBuf[j] = i.boundsBuf[j]
}
}
*alloc = iterAlloc{
keyBuf: alloc.keyBuf[:0],
boundsBuf: alloc.boundsBuf,
prefixOrFullSeekKey: alloc.prefixOrFullSeekKey[:0],
}
iterAllocPool.Put(alloc)
i.alloc = nil
}
return nil
}
func (i *scanInternalIterator) initializeBoundBufs(lower, upper []byte) {
buf := i.boundsBuf[i.boundsBufIdx][:0]
if lower != nil {
buf = append(buf, lower...)
i.opts.LowerBound = buf
} else {
i.opts.LowerBound = nil
}
if upper != nil {
buf = append(buf, upper...)
i.opts.UpperBound = buf[len(buf)-len(upper):]
} else {
i.opts.UpperBound = nil
}
i.boundsBuf[i.boundsBufIdx] = buf
i.boundsBufIdx = 1 - i.boundsBufIdx
}