-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathexternal_sort.go
460 lines (435 loc) · 17.1 KB
/
external_sort.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package colexec
import (
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
"github.com/marusama/semaphore"
)
// externalSorterState indicates the current state of the external sorter.
type externalSorterState int
const (
// externalSorterNewPartition indicates that the next batch we read should
// start a new partition. A zero-length batch in this state indicates that
// the input to the external sorter has been fully consumed and we should
// proceed to merging the partitions.
externalSorterNewPartition externalSorterState = iota
// externalSorterSpillPartition indicates that the next batch we read should
// be added to the last partition so far. A zero-length batch in this state
// indicates that the end of the partition has been reached and we should
// transition to starting a new partition. If maxNumberPartitions is reached
// in this state, the sorter will transition to externalSorterRepeatedMerging
// to reduce the number of partitions.
externalSorterSpillPartition
// externalSorterRepeatedMerging indicates that we need to merge
// maxNumberPartitions into one and spill that new partition to disk. When
// finished, the sorter will transition to externalSorterNewPartition.
externalSorterRepeatedMerging
// externalSorterFinalMerging indicates that we have fully consumed the input
// and can merge all of the partitions in one step. We then transition to
// externalSorterEmitting state.
externalSorterFinalMerging
// externalSorterEmitting indicates that we are ready to emit output. A zero-
// length batch in this state indicates that we have emitted all tuples and
// should transition to externalSorterFinished state.
externalSorterEmitting
// externalSorterFinished indicates that all tuples from all partitions have
// been emitted and from now on only a zero-length batch will be emitted by
// the external sorter. This state is also responsible for closing the
// partitions.
externalSorterFinished
)
// In order to make progress when merging we have to merge at least two
// partitions into a new third one.
const externalSorterMinPartitions = 3
// externalSorter is an Operator that performs external merge sort. It works in
// two stages:
// 1. it will use a combination of an input partitioner and in-memory sorter to
// divide up all batches from the input into partitions, sort each partition in
// memory, and write sorted partitions to disk
// 2. it will use OrderedSynchronizer to merge the partitions.
//
// The (simplified) diagram of the components involved is as follows:
//
// input
// |
// ↓
// input partitioner
// |
// ↓
// in-memory sorter
// |
// ↓
// ------------------------------------------
// | external sorter |
// | --------------- |
// | |
// | partition1 partition2 ... partitionN |
// | | | | |
// | ↓ ↓ ↓ |
// | merger (ordered synchronizer) |
// ------------------------------------------
// |
// ↓
// output
//
// There are a couple of implicit upstream links in the setup:
// - input partitioner checks the allocator used by the in-memory sorter to see
// whether a new partition must be started
// - external sorter resets in-memory sorter (which, in turn, resets input
// partitioner) once the full partition has been spilled to disk.
//
// What is hidden in the diagram is the fact that at some point we might need
// to merge several partitions into a new one that we spill to disk in order to
// reduce the number of "active" partitions. This requirement comes from the
// need to limit the number of "active" partitions because each partition uses
// some amount of RAM for its buffer. This is determined by
// maxNumberPartitions variable.
type externalSorter struct {
OneInputNode
NonExplainable
closed bool
unlimitedAllocator *Allocator
memoryLimit int64
state externalSorterState
inputTypes []coltypes.T
ordering execinfrapb.Ordering
inMemSorter resettableOperator
partitioner colcontainer.PartitionedQueue
partitionerCreator func() colcontainer.PartitionedQueue
// numPartitions is the current number of partitions.
numPartitions int
// firstPartitionIdx is the index of the first partition to merge next.
firstPartitionIdx int
maxNumberPartitions int
// fdState is used to acquire file descriptors up front.
fdState struct {
fdSemaphore semaphore.Semaphore
acquiredFDs int
}
emitter Operator
testingKnobs struct {
// delegateFDAcquisitions if true, means that a test wants to force the
// PartitionedDiskQueues to track the number of file descriptors the hash
// joiner will open/close. This disables the default behavior of acquiring
// all file descriptors up front in Next.
delegateFDAcquisitions bool
}
}
var _ resettableOperator = &externalSorter{}
// newExternalSorter returns a disk-backed general sort operator.
// - ctx is the same context that standaloneMemAccount was created with.
// - unlimitedAllocator must have been created with a memory account derived
// from an unlimited memory monitor. It will be used by several internal
// components of the external sort which is responsible for making sure that
// the components stay within the memory limit.
// - standaloneMemAccount must be a memory account derived from an unlimited
// memory monitor with a standalone budget. It will be used by
// inputPartitioningOperator to "partition" the input according to memory
// limit. The budget *must* be standalone because we don't want to double
// count the memory (the memory under the batches will be accounted for with
// the unlimitedAllocator).
// - maxNumberPartitions (when non-zero) overrides the semi-dynamically
// computed maximum number of partitions to have at once.
// - delegateFDAcquisitions specifies whether the external sorter should let
// the partitioned disk queue acquire file descriptors instead of acquiring
// them up front in Next.
func newExternalSorter(
ctx context.Context,
unlimitedAllocator *Allocator,
standaloneMemAccount *mon.BoundAccount,
input Operator,
inputTypes []coltypes.T,
ordering execinfrapb.Ordering,
memoryLimit int64,
maxNumberPartitions int,
delegateFDAcquisitions bool,
diskQueueCfg colcontainer.DiskQueueCfg,
fdSemaphore semaphore.Semaphore,
) Operator {
if diskQueueCfg.CacheMode != colcontainer.DiskQueueCacheModeReuseCache {
execerror.VectorizedInternalPanic(errors.Errorf("external sorter instantiated with suboptimal disk queue cache mode: %d", diskQueueCfg.CacheMode))
}
inputPartitioner := newInputPartitioningOperator(ctx, input, standaloneMemAccount, memoryLimit)
inMemSorter, err := newSorter(
unlimitedAllocator, newAllSpooler(unlimitedAllocator, inputPartitioner, inputTypes),
inputTypes, ordering.Columns,
)
if err != nil {
execerror.VectorizedInternalPanic(err)
}
if diskQueueCfg.BufferSizeBytes > 0 && maxNumberPartitions == 0 {
// Each disk queue will use up to BufferSizeBytes of RAM, so we will give
// it almost all of the available memory (except for a single output batch
// that mergers will use).
batchMemSize := estimateBatchSizeBytes(inputTypes, coldata.BatchSize())
maxNumberPartitions = (int(memoryLimit) - batchMemSize) / diskQueueCfg.BufferSizeBytes
// With the default limit of 256 file descriptors, this results in 16
// partitions. This is a hard maximum of partitions that will be used by the
// external sorter
// TODO(asubiotto): this number should be tuned.
fmt.Println("maxNumberPartitions", maxNumberPartitions)
maxPartitionsAllowed := fdSemaphore.GetLimit() / 16
if maxNumberPartitions > maxPartitionsAllowed {
maxNumberPartitions = maxPartitionsAllowed
}
}
if maxNumberPartitions < externalSorterMinPartitions {
maxNumberPartitions = externalSorterMinPartitions
}
partitionedDiskQueueSemaphore := fdSemaphore
if !delegateFDAcquisitions {
// To avoid deadlocks with other disk queues, we manually attempt to acquire
// the maximum number of descriptors all at once in Next. Passing in a nil
// semaphore indicates that the caller will do the acquiring.
partitionedDiskQueueSemaphore = nil
}
es := &externalSorter{
OneInputNode: NewOneInputNode(inMemSorter),
unlimitedAllocator: unlimitedAllocator,
memoryLimit: memoryLimit,
inMemSorter: inMemSorter,
partitionerCreator: func() colcontainer.PartitionedQueue {
return colcontainer.NewPartitionedDiskQueue(inputTypes, diskQueueCfg, partitionedDiskQueueSemaphore, colcontainer.PartitionerStrategyCloseOnNewPartition)
},
inputTypes: inputTypes,
ordering: ordering,
maxNumberPartitions: maxNumberPartitions,
}
es.fdState.fdSemaphore = fdSemaphore
es.testingKnobs.delegateFDAcquisitions = delegateFDAcquisitions
return es
}
func (s *externalSorter) Init() {
s.input.Init()
s.state = externalSorterNewPartition
}
func (s *externalSorter) Next(ctx context.Context) coldata.Batch {
for {
switch s.state {
case externalSorterNewPartition:
b := s.input.Next(ctx)
if b.Length() == 0 {
// The input has been fully exhausted, and it is always the case that
// the number of partitions is less than the maximum number since
// externalSorterSpillPartition will check and re-merge if not.
// Proceed to the final merging state.
s.state = externalSorterFinalMerging
continue
}
newPartitionIdx := s.firstPartitionIdx + s.numPartitions
if s.partitioner == nil {
s.partitioner = s.partitionerCreator()
}
if err := s.partitioner.Enqueue(ctx, newPartitionIdx, b); err != nil {
execerror.VectorizedInternalPanic(err)
}
s.state = externalSorterSpillPartition
continue
case externalSorterSpillPartition:
curPartitionIdx := s.firstPartitionIdx + s.numPartitions
b := s.input.Next(ctx)
if b.Length() == 0 {
// The partition has been fully spilled, so we reset the in-memory
// sorter (which will reset inputPartitioningOperator).
s.inMemSorter.reset()
s.numPartitions++
if s.numPartitions == s.maxNumberPartitions-1 {
// We have reached the maximum number of active partitions that we
// know that we'll be able to merge without exceeding the limit, so
// we need to merge all of them and spill the new partition to disk
// before we can proceed on consuming the input.
s.state = externalSorterRepeatedMerging
continue
}
s.state = externalSorterNewPartition
continue
}
if !s.testingKnobs.delegateFDAcquisitions && s.fdState.fdSemaphore != nil && s.fdState.acquiredFDs == 0 {
toAcquire := s.maxNumberPartitions
if err := s.fdState.fdSemaphore.Acquire(ctx, toAcquire); err != nil {
execerror.VectorizedInternalPanic(err)
}
s.fdState.acquiredFDs = toAcquire
}
if err := s.partitioner.Enqueue(ctx, curPartitionIdx, b); err != nil {
execerror.VectorizedInternalPanic(err)
}
continue
case externalSorterRepeatedMerging:
// We will merge all partitions in range [s.firstPartitionIdx,
// s.firstPartitionIdx+s.numPartitions) and will spill all the resulting
// batches into a new partition with the next available index.
merger := s.createMergerForPartitions(s.firstPartitionIdx, s.numPartitions)
merger.Init()
newPartitionIdx := s.firstPartitionIdx + s.numPartitions
for b := merger.Next(ctx); b.Length() > 0; b = merger.Next(ctx) {
if err := s.partitioner.Enqueue(ctx, newPartitionIdx, b); err != nil {
execerror.VectorizedInternalPanic(err)
}
}
// Reclaim disk space by closing the inactive read partitions. Since the
// merger must have exhausted all inputs, this is all the partitions just
// read from.
if err := s.partitioner.CloseInactiveReadPartitions(); err != nil {
execerror.VectorizedInternalPanic(err)
}
s.firstPartitionIdx += s.numPartitions
s.numPartitions = 1
s.state = externalSorterNewPartition
continue
case externalSorterFinalMerging:
if s.numPartitions == 0 {
s.state = externalSorterFinished
continue
} else if s.numPartitions == 1 {
s.emitter = newPartitionerToOperator(
s.unlimitedAllocator, s.inputTypes, s.partitioner, s.firstPartitionIdx,
)
} else {
s.emitter = s.createMergerForPartitions(s.firstPartitionIdx, s.numPartitions)
}
s.emitter.Init()
s.state = externalSorterEmitting
continue
case externalSorterEmitting:
b := s.emitter.Next(ctx)
if b.Length() == 0 {
s.state = externalSorterFinished
continue
}
return b
case externalSorterFinished:
if err := s.Close(); err != nil {
execerror.VectorizedInternalPanic(err)
}
return coldata.ZeroBatch
default:
execerror.VectorizedInternalPanic(fmt.Sprintf("unexpected externalSorterState %d", s.state))
}
}
}
func (s *externalSorter) reset() {
if r, ok := s.input.(resetter); ok {
r.reset()
}
s.state = externalSorterNewPartition
if err := s.Close(); err != nil {
execerror.VectorizedInternalPanic(err)
}
s.closed = false
s.firstPartitionIdx = 0
s.numPartitions = 0
}
func (s *externalSorter) Close() error {
if s.closed {
return nil
}
var err error
if s.partitioner != nil {
err = s.partitioner.Close()
s.partitioner = nil
}
if !s.testingKnobs.delegateFDAcquisitions && s.fdState.fdSemaphore != nil && s.fdState.acquiredFDs > 0 {
s.fdState.fdSemaphore.Release(s.fdState.acquiredFDs)
}
s.closed = true
return err
}
// createMergerForPartitions creates an ordered synchronizer that will merge
// partitions in [firstIdx, firstIdx+numPartitions) range.
func (s *externalSorter) createMergerForPartitions(firstIdx, numPartitions int) Operator {
syncInputs := make([]Operator, numPartitions)
for i := range syncInputs {
syncInputs[i] = newPartitionerToOperator(
s.unlimitedAllocator, s.inputTypes, s.partitioner, firstIdx+i,
)
}
return NewOrderedSynchronizer(
s.unlimitedAllocator,
syncInputs,
s.inputTypes,
execinfrapb.ConvertToColumnOrdering(s.ordering),
)
}
func newInputPartitioningOperator(
ctx context.Context, input Operator, standaloneMemAccount *mon.BoundAccount, memoryLimit int64,
) resettableOperator {
return &inputPartitioningOperator{
OneInputNode: NewOneInputNode(input),
ctx: ctx,
standaloneMemAccount: standaloneMemAccount,
memoryLimit: memoryLimit,
}
}
// inputPartitioningOperator is an operator that returns the batches from its
// input until the standalone allocator reaches the memory limit. From that
// point, the operator returns a zero-length batch (until it is reset).
type inputPartitioningOperator struct {
OneInputNode
NonExplainable
ctx context.Context
standaloneMemAccount *mon.BoundAccount
memoryLimit int64
}
var _ resettableOperator = &inputPartitioningOperator{}
func (o *inputPartitioningOperator) Init() {
o.input.Init()
}
func (o *inputPartitioningOperator) Next(ctx context.Context) coldata.Batch {
if o.standaloneMemAccount.Used() >= o.memoryLimit {
return coldata.ZeroBatch
}
b := o.input.Next(ctx)
if b.Length() == 0 {
return b
}
// We cannot use Allocator.RetainBatch here because that method looks at the
// capacities of the vectors. However, this operator is an input to sortOp
// which will spool all the tuples and buffer them (by appending into the
// buffered batch), so we need to account for memory proportionally to the
// length of the batch. (Note: this is not exactly true for Bytes type, but
// it's ok if we have some deviation. This numbers matter only to understand
// when to start a new partition, and the memory will be actually accounted
// for correctly.)
length := int64(b.Length())
usesSel := b.Selection() != nil
b.SetSelection(true)
selCapacity := cap(b.Selection())
b.SetSelection(usesSel)
batchMemSize := int64(0)
if selCapacity > 0 {
batchMemSize = selVectorSize(selCapacity) * length / int64(selCapacity)
}
for _, vec := range b.ColVecs() {
if vec.Type() == coltypes.Bytes {
batchMemSize += int64(vec.Bytes().ProportionalSize(length))
} else {
batchMemSize += getVecMemoryFootprint(vec) * length / int64(vec.Capacity())
}
}
if err := o.standaloneMemAccount.Grow(o.ctx, batchMemSize); err != nil {
execerror.VectorizedInternalPanic(err)
}
return b
}
func (o *inputPartitioningOperator) reset() {
if r, ok := o.input.(resetter); ok {
r.reset()
}
o.standaloneMemAccount.Shrink(o.ctx, o.standaloneMemAccount.Used())
}