-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathhash_row_container.go
555 lines (476 loc) · 18.3 KB
/
hash_row_container.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
// Copyright 2017 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package distsqlrun
import (
"bytes"
"unsafe"
"golang.org/x/net/context"
"github.com/cockroachdb/cockroach/pkg/sql/mon"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
)
// rowMarkerIterator is a rowIterator that can be used to mark rows.
type rowMarkerIterator interface {
rowIterator
Mark(ctx context.Context, mark bool) error
}
// hashRowContainer is a container used to store rows according to an encoding
// of given equality columns. The stored rows can then be probed to return a
// bucket of matching rows. Additionally, each stored row can be marked and all
// rows that are unmarked can be iterated over. An example of where this is
// useful is in full/outer joins. The caller can mark all matched rows and
// iterate over the unmarked rows to produce a result.
type hashRowContainer interface {
// Init initializes the hashRowContainer with the given equality columns.
// - shouldMark specifies whether the caller cares about marking rows. If
// not, the hashRowContainer will not perform any row marking logic. This
// is meant to optimize space usage and runtime.
// - types is the schema of rows that will be added to this container.
// - storedEqCols are the equality columns of rows stored in this
// container.
// i.e. when adding a row, the columns specified by storedEqCols are used
// to get the bucket that the row should be added to.
Init(
ctx context.Context, shouldMark bool, types []sqlbase.ColumnType, storedEqCols columns,
) error
AddRow(context.Context, sqlbase.EncDatumRow) error
// NewBucketIterator returns a rowMarkerIterator that iterates over a bucket
// of rows that match the given row on equality columns. This iterator can
// also be used to mark rows.
// Rows are marked because of the use of this interface by the hashJoiner.
// Given a row, the hashJoiner does not necessarily want to emit all rows
// that match on equality columns. There is an additional `ON` clause that
// specifies an arbitrary expression that matching rows must pass to be
// emitted. For full/outer joins, this is tracked through marking rows if
// they match and then iterating over all unmarked rows to emit those that
// did not match.
// - probeEqCols are the equality columns of the given row that are used to
// get the bucket of matching rows.
NewBucketIterator(
ctx context.Context, row sqlbase.EncDatumRow, probeEqCols columns,
) (rowMarkerIterator, error)
// NewUnmarkedIterator returns a rowIterator that iterates over unmarked
// rows. If shouldMark was false in Init(), this iterator iterates over all
// rows.
NewUnmarkedIterator(context.Context) rowIterator
// Close frees up resources held by the hashRowContainer.
Close(context.Context)
}
// columnEncoder is a utility struct used by implementations of hashRowContainer
// to encode equality columns, the result of which is used as a key to a bucket.
type columnEncoder struct {
scratch []byte
datumAlloc sqlbase.DatumAlloc
}
// encodeEqualityCols returns the encoding of the specified columns of the given
// row. The returned byte slice is only valid until the next call to
// encodeEqualityColumns().
// TODO(asubiotto): This logic could be shared with the diskRowContainer.
func (e columnEncoder) encodeEqualityCols(
ctx context.Context, row sqlbase.EncDatumRow, eqCols columns,
) ([]byte, error) {
encoded, hasNull, err := encodeColumnsOfRow(
&e.datumAlloc, e.scratch, row, eqCols, false, /* encodeNull */
)
if err != nil {
return nil, err
}
e.scratch = encoded[:0]
if hasNull {
log.Fatal(ctx, "cannot process rows with NULL in an equality column")
}
return encoded, nil
}
const sizeOfBucket = int64(unsafe.Sizeof([]int{}))
const sizeOfRowIdx = int64(unsafe.Sizeof(int(0)))
const sizeOfBoolSlice = int64(unsafe.Sizeof([]bool{}))
const sizeOfBool = int64(unsafe.Sizeof(false))
// hashMemRowContainer is an in-memory implementation of a hashRowContainer.
// The rows are stored in an underlying memRowContainer and an accompanying
// map stores the mapping from equality column encodings to indices in the
// memRowContainer corresponding to matching rows.
// NOTE: Once a row is marked, adding more rows to the hashMemRowContainer
// results in undefined behavior. It is not necessary to do otherwise for the
// current usage of hashMemRowContainer and allows us to assume that a memory
// error can only occur at the start of the marking phase, thus not having to
// deal with half-emitted buckets and marks when falling back to disk.
type hashMemRowContainer struct {
*memRowContainer
columnEncoder
// shouldMark specifies whether the caller cares about marking rows. If not,
// marked is never initialized.
shouldMark bool
// marked specifies for each row in memRowContainer whether that row has
// been marked. Used for iterating over unmarked rows.
marked []bool
// buckets contains the indices into memRowContainer for a given group
// key (which is the encoding of storedEqCols).
buckets map[string][]int
// bucketsAcc is the memory account for the buckets. The datums themselves
// are all in the memRowContainer.
bucketsAcc mon.BoundAccount
// storedEqCols contains the indices of the columns of a row that are
// encoded and used as a key into buckets when adding a row.
storedEqCols columns
}
var _ hashRowContainer = &hashMemRowContainer{}
// makeHashMemRowContainer creates a hashMemRowContainer from the given
// rowContainer. This rowContainer must still be Close()d by the caller.
func makeHashMemRowContainer(rowContainer *memRowContainer) hashMemRowContainer {
return hashMemRowContainer{
memRowContainer: rowContainer,
buckets: make(map[string][]int),
bucketsAcc: rowContainer.evalCtx.Mon.MakeBoundAccount(),
}
}
// Init implements the hashRowContainer interface. types is ignored because the
// schema is inferred from the memRowContainer.
func (h *hashMemRowContainer) Init(
ctx context.Context, shouldMark bool, _ []sqlbase.ColumnType, storedEqCols columns,
) error {
if h.storedEqCols != nil {
return errors.New("hashMemRowContainer has already been initialized")
}
h.shouldMark = shouldMark
h.storedEqCols = storedEqCols
// Build buckets from the rowContainer.
for rowIdx := 0; rowIdx < h.Len(); rowIdx++ {
if err := h.addRowToBucket(ctx, h.EncRow(rowIdx), rowIdx); err != nil {
return err
}
}
return nil
}
// AddRow adds a row to the hashMemRowContainer. This row is unmarked by default.
func (h *hashMemRowContainer) AddRow(ctx context.Context, row sqlbase.EncDatumRow) error {
rowIdx := h.Len()
if err := h.memRowContainer.AddRow(ctx, row); err != nil {
return err
}
return h.addRowToBucket(ctx, row, rowIdx)
}
// Close implements the hashRowContainer interface.
func (h *hashMemRowContainer) Close(ctx context.Context) {
h.bucketsAcc.Close(ctx)
}
// addRowToBucket is a helper function that encodes the equality columns of the
// given row and appends the rowIdx to the matching bucket.
func (h *hashMemRowContainer) addRowToBucket(
ctx context.Context, row sqlbase.EncDatumRow, rowIdx int,
) error {
encoded, err := h.encodeEqualityCols(ctx, row, h.storedEqCols)
if err != nil {
return err
}
_, ok := h.buckets[string(encoded)]
usage := sizeOfRowIdx
if !ok {
usage += int64(len(encoded))
usage += sizeOfBucket
}
if err := h.bucketsAcc.Grow(ctx, usage); err != nil {
return err
}
h.buckets[string(encoded)] = append(h.buckets[string(encoded)], rowIdx)
return nil
}
// hashMemRowBucketIterator iterates over the rows in a bucket.
type hashMemRowBucketIterator struct {
*hashMemRowContainer
// rowIdxs are the indices of rows in the bucket.
rowIdxs []int
curIdx int
}
var _ rowMarkerIterator = &hashMemRowBucketIterator{}
// NewBucketIterator implements the hashRowContainer interface.
func (h *hashMemRowContainer) NewBucketIterator(
ctx context.Context, row sqlbase.EncDatumRow, probeEqCols columns,
) (rowMarkerIterator, error) {
encoded, err := h.encodeEqualityCols(ctx, row, probeEqCols)
if err != nil {
return nil, err
}
return &hashMemRowBucketIterator{hashMemRowContainer: h, rowIdxs: h.buckets[string(encoded)]}, nil
}
// Rewind implements the rowIterator interface.
func (i *hashMemRowBucketIterator) Rewind() {
i.curIdx = 0
}
// Valid implements the rowIterator interface.
func (i *hashMemRowBucketIterator) Valid() (bool, error) {
return i.curIdx < len(i.rowIdxs), nil
}
// Next implements the rowIterator interface.
func (i *hashMemRowBucketIterator) Next() {
i.curIdx++
}
// Row implements the rowIterator interface.
func (i *hashMemRowBucketIterator) Row() (sqlbase.EncDatumRow, error) {
return i.EncRow(i.rowIdxs[i.curIdx]), nil
}
// Mark implements the rowMarkerIterator interface.
func (i *hashMemRowBucketIterator) Mark(ctx context.Context, mark bool) error {
if !i.shouldMark {
log.Fatal(ctx, "hash mem row container not set up for marking")
}
if i.marked == nil {
if err := i.bucketsAcc.Grow(ctx, sizeOfBoolSlice+(sizeOfBool*int64(i.Len()))); err != nil {
return err
}
i.marked = make([]bool, i.Len())
}
i.marked[i.rowIdxs[i.curIdx]] = mark
return nil
}
// Close implements the rowIterator interface.
func (i *hashMemRowBucketIterator) Close() {}
// hashMemRowIterator iterates over all unmarked rows in a hashMemRowContainer.
type hashMemRowIterator struct {
*hashMemRowContainer
curIdx int
}
var _ rowIterator = &hashMemRowIterator{}
// NewUnmarkedIterator implements the hashRowContainer interface.
func (h *hashMemRowContainer) NewUnmarkedIterator(ctx context.Context) rowIterator {
return &hashMemRowIterator{hashMemRowContainer: h}
}
// Rewind implements the rowIterator interface.
func (i *hashMemRowIterator) Rewind() {
i.curIdx = -1
// Next will advance curIdx to the first unmarked row.
i.Next()
}
// Valid implements the rowIterator interface.
func (i *hashMemRowIterator) Valid() (bool, error) {
return i.curIdx < i.Len(), nil
}
// Next implements the rowIterator interface.
func (i *hashMemRowIterator) Next() {
// Move the curIdx to the next unmarked row.
i.curIdx++
if i.marked != nil {
for ; i.curIdx < len(i.marked) && i.marked[i.curIdx]; i.curIdx++ {
}
}
}
// Row implements the rowIterator interface.
func (i *hashMemRowIterator) Row() (sqlbase.EncDatumRow, error) {
return i.EncRow(i.curIdx), nil
}
// Close implements the rowIterator interface.
func (i *hashMemRowIterator) Close() {}
// hashDiskRowContainer is an on-disk implementation of a hashRowContainer.
// The rows are stored in an underlying diskRowContainer with an extra boolean
// column to keep track of that row's mark.
type hashDiskRowContainer struct {
diskRowContainer
columnEncoder
diskMonitor *mon.BytesMonitor
// shouldMark specifies whether the caller cares about marking rows. If not,
// rows are stored with one less column (which usually specifies that row's
// mark).
shouldMark bool
engine engine.Engine
scratchEncRow sqlbase.EncDatumRow
}
var _ hashRowContainer = &hashDiskRowContainer{}
var (
encodedTrue = encoding.EncodeBoolValue(nil, encoding.NoColumnID, true)
encodedFalse = encoding.EncodeBoolValue(nil, encoding.NoColumnID, false)
)
// makeHashDiskRowContainer creates a hashDiskRowContainer with the given engine
// as the underlying store that rows are stored on. shouldMark specifies whether
// the hashDiskRowContainer should set itself up to mark rows.
func makeHashDiskRowContainer(diskMonitor *mon.BytesMonitor, e engine.Engine) hashDiskRowContainer {
return hashDiskRowContainer{diskMonitor: diskMonitor, engine: e}
}
// Init implements the hashRowContainer interface.
func (h *hashDiskRowContainer) Init(
ctx context.Context, shouldMark bool, types []sqlbase.ColumnType, storedEqCols columns,
) error {
// Provide the diskRowContainer with an ordering on the equality columns of
// the rows that we will store. This will result in rows with the
// same equality columns ocurring contiguously in the keyspace.
ordering := make(sqlbase.ColumnOrdering, len(storedEqCols))
for i := range ordering {
ordering[i] = sqlbase.ColumnOrderInfo{
ColIdx: int(storedEqCols[i]),
Direction: encoding.Ascending,
}
}
h.shouldMark = shouldMark
storedTypes := types
if h.shouldMark {
// Add a boolean column to the end of the rows to implement marking rows.
storedTypes = make([]sqlbase.ColumnType, len(types)+1)
copy(storedTypes, types)
storedTypes[len(storedTypes)-1] = sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_BOOL}
h.scratchEncRow = make(sqlbase.EncDatumRow, len(storedTypes))
// Initialize the last column of the scratch row we use in AddRow() to
// be unmarked.
h.scratchEncRow[len(h.scratchEncRow)-1] = sqlbase.DatumToEncDatum(
sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_BOOL},
parser.MakeDBool(false),
)
}
h.diskRowContainer = makeDiskRowContainer(ctx, h.diskMonitor, storedTypes, ordering, h.engine)
return nil
}
// AddRow adds a row to the hashDiskRowContainer. This row is unmarked by
// default.
func (h *hashDiskRowContainer) AddRow(ctx context.Context, row sqlbase.EncDatumRow) error {
var err error
if h.shouldMark {
// len(h.scratchEncRow) == len(row) + 1 if h.shouldMark == true. The
// last column has been initialized to a false mark in Init().
copy(h.scratchEncRow, row)
err = h.diskRowContainer.AddRow(ctx, h.scratchEncRow)
} else {
err = h.diskRowContainer.AddRow(ctx, row)
}
return err
}
// hashDiskRowBucketIterator iterates over the rows in a bucket.
type hashDiskRowBucketIterator struct {
diskRowIterator
hashDiskRowContainer *hashDiskRowContainer
// encodedEqCols is the encoding of the equality columns of the rows in the
// bucket that this iterator iterates over.
encodedEqCols []byte
}
var _ rowMarkerIterator = hashDiskRowBucketIterator{}
// NewBucketIterator implements the hashRowContainer interface.
func (h *hashDiskRowContainer) NewBucketIterator(
ctx context.Context, row sqlbase.EncDatumRow, probeEqCols columns,
) (rowMarkerIterator, error) {
encoded, err := h.encodeEqualityCols(ctx, row, probeEqCols)
if err != nil {
return nil, err
}
encodedEqCols := make([]byte, len(encoded))
copy(encodedEqCols, encoded)
return hashDiskRowBucketIterator{
diskRowIterator: h.NewIterator(ctx).(diskRowIterator),
hashDiskRowContainer: h,
encodedEqCols: encodedEqCols,
}, nil
}
// Rewind implements the rowIterator interface.
func (i hashDiskRowBucketIterator) Rewind() {
i.Seek(i.encodedEqCols)
}
// Valid implements the rowIterator interface.
func (i hashDiskRowBucketIterator) Valid() (bool, error) {
ok, err := i.diskRowIterator.Valid()
if !ok || err != nil {
return ok, err
}
// Since the underlying map is sorted, once the key prefix does not equal
// the encoded equality columns, we have gone past the end of the bucket.
// TODO(asubiotto): Make UnsafeKey() and UnsafeValue() part of the
// SortedDiskMapIterator interface to avoid allocation here, in Mark(), and
// isRowMarked().
return bytes.HasPrefix(i.Key(), i.encodedEqCols), nil
}
// Row implements the rowIterator interface.
func (i hashDiskRowBucketIterator) Row() (sqlbase.EncDatumRow, error) {
row, err := i.diskRowIterator.Row()
if err != nil {
return nil, err
}
// Remove the mark from the end of the row.
if i.hashDiskRowContainer.shouldMark {
row = row[:len(row)-1]
}
return row, nil
}
// Mark implements the rowMarkerIterator interface.
func (i hashDiskRowBucketIterator) Mark(ctx context.Context, mark bool) error {
if !i.hashDiskRowContainer.shouldMark {
log.Fatal(ctx, "hash disk row container not set up for marking")
}
markBytes := encodedFalse
if mark {
markBytes = encodedTrue
}
// rowVal are the non-equality encoded columns, the last of which is the
// column we use to mark a row.
rowVal := i.Value()
originalLen := len(rowVal)
rowVal = append(rowVal, markBytes...)
// Write the new encoding of mark over the old encoding of mark and truncate
// the extra bytes.
copy(rowVal[originalLen-len(markBytes):], rowVal[originalLen:])
rowVal = rowVal[:originalLen]
// These marks only matter when using a hashDiskRowIterator to iterate over
// unmarked rows. The writes are flushed when creating a NewIterator() in
// NewUnmarkedIterator().
return i.hashDiskRowContainer.bufferedRows.Put(i.Key(), rowVal)
}
// hashDiskRowIterator iterates over all unmarked rows in a
// hashDiskRowContainer.
type hashDiskRowIterator struct {
diskRowIterator
}
var _ rowIterator = hashDiskRowIterator{}
// NewUnmarkedIterator implements the hashRowContainer interface.
func (h *hashDiskRowContainer) NewUnmarkedIterator(ctx context.Context) rowIterator {
if h.shouldMark {
return hashDiskRowIterator{
diskRowIterator: h.NewIterator(ctx).(diskRowIterator),
}
}
return h.NewIterator(ctx)
}
// Rewind implements the rowIterator interface.
func (i hashDiskRowIterator) Rewind() {
i.diskRowIterator.Rewind()
// If the current row is marked, move the iterator to the next unmarked row.
if i.isRowMarked() {
i.Next()
}
}
// Next implements the rowIterator interface.
func (i hashDiskRowIterator) Next() {
i.diskRowIterator.Next()
for i.isRowMarked() {
i.diskRowIterator.Next()
}
}
// Row implements the rowIterator interface.
func (i hashDiskRowIterator) Row() (sqlbase.EncDatumRow, error) {
row, err := i.diskRowIterator.Row()
if err != nil {
return nil, err
}
// Remove the mark from the end of the row.
row = row[:len(row)-1]
return row, nil
}
// isRowMarked returns true if the current row is marked or false if it wasn't
// marked or there was an error establishing the row's validity. Subsequent
// calls to Valid() will uncover this error.
func (i hashDiskRowIterator) isRowMarked() bool {
// isRowMarked is not necessarily called after Valid().
ok, err := i.diskRowIterator.Valid()
if !ok || err != nil {
return false
}
rowVal := i.Value()
return bytes.Equal(rowVal[len(rowVal)-len(encodedTrue):], encodedTrue)
}