From d2b2bc6a60fcdb5f773be11be4bdb96c2801dbc4 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 15 Jun 2021 15:56:07 -0700 Subject: [PATCH 1/2] colexec: extend external sort benchmark for top K case Release note: None --- pkg/sql/colexec/external_sort_test.go | 92 +++++++++++++++------------ 1 file changed, 52 insertions(+), 40 deletions(-) diff --git a/pkg/sql/colexec/external_sort_test.go b/pkg/sql/colexec/external_sort_test.go index 3a61f89da5cd..9a5c32153ef3 100644 --- a/pkg/sql/colexec/external_sort_test.go +++ b/pkg/sql/colexec/external_sort_test.go @@ -392,50 +392,62 @@ func BenchmarkExternalSort(b *testing.B) { for _, nBatches := range []int{1 << 1, 1 << 4, 1 << 8} { for _, nCols := range []int{1, 2, 4} { - for _, spillForced := range []bool{false, true} { - flowCtx.Cfg.TestingKnobs.ForceDiskSpill = spillForced - name := fmt.Sprintf("rows=%d/cols=%d/spilled=%t", nBatches*coldata.BatchSize(), nCols, spillForced) - b.Run(name, func(b *testing.B) { - // 8 (bytes / int64) * nBatches (number of batches) * coldata.BatchSize() (rows / - // batch) * nCols (number of columns / row). - b.SetBytes(int64(8 * nBatches * coldata.BatchSize() * nCols)) - typs := make([]*types.T, nCols) - for i := range typs { - typs[i] = types.Int + for _, topK := range []bool{false, true} { + for _, spillForced := range []bool{false, true} { + flowCtx.Cfg.TestingKnobs.ForceDiskSpill = spillForced + var topKSubstring string + if topK { + topKSubstring = "topK/" } - batch := testAllocator.NewMemBatchWithMaxCapacity(typs) - batch.SetLength(coldata.BatchSize()) - ordCols := make([]execinfrapb.Ordering_Column, nCols) - for i := range ordCols { - ordCols[i].ColIdx = uint32(i) - ordCols[i].Direction = execinfrapb.Ordering_Column_Direction(rng.Int() % 2) - col := batch.ColVec(i).Int64() - for j := 0; j < coldata.BatchSize(); j++ { - col[j] = rng.Int63() % int64((i*1024)+1) + name := fmt.Sprintf("rows=%d/cols=%d/%sspilled=%t", nBatches*coldata.BatchSize(), nCols, topKSubstring, spillForced) + b.Run(name, func(b *testing.B) { + // 8 (bytes / int64) * nBatches (number of batches) * coldata.BatchSize() (rows / + // batch) * nCols (number of columns / row). + b.SetBytes(int64(8 * nBatches * coldata.BatchSize() * nCols)) + typs := make([]*types.T, nCols) + for i := range typs { + typs[i] = types.Int } - } - b.ResetTimer() - for n := 0; n < b.N; n++ { - source := colexectestutils.NewFiniteBatchSource(testAllocator, batch, typs, nBatches) - var spilled bool - sorter, accounts, monitors, _, err := createDiskBackedSorter( - ctx, flowCtx, []colexecop.Operator{source}, typs, ordCols, - 0 /* matchLen */, 0 /* k */, func() { spilled = true }, - 0 /* numForcedRepartitions */, false /* delegateFDAcquisitions */, queueCfg, &colexecop.TestingSemaphore{}, - ) - memAccounts = append(memAccounts, accounts...) - memMonitors = append(memMonitors, monitors...) - if err != nil { - b.Fatal(err) + batch := testAllocator.NewMemBatchWithMaxCapacity(typs) + batch.SetLength(coldata.BatchSize()) + ordCols := make([]execinfrapb.Ordering_Column, nCols) + for i := range ordCols { + ordCols[i].ColIdx = uint32(i) + ordCols[i].Direction = execinfrapb.Ordering_Column_Direction(rng.Int() % 2) + col := batch.ColVec(i).Int64() + for j := 0; j < coldata.BatchSize(); j++ { + col[j] = rng.Int63() % int64((i*1024)+1) + } } - sorter.Init(ctx) - for out := sorter.Next(); out.Length() != 0; out = sorter.Next() { + b.ResetTimer() + for n := 0; n < b.N; n++ { + source := colexectestutils.NewFiniteBatchSource(testAllocator, batch, typs, nBatches) + var spilled bool + k := uint64(0) + if topK { + // Pick the same value for K as we do in the + // in-memory top K sort benchmark. + k = 128 + } + sorter, accounts, monitors, _, err := createDiskBackedSorter( + ctx, flowCtx, []colexecop.Operator{source}, typs, ordCols, + 0 /* matchLen */, k, func() { spilled = true }, + 0 /* numForcedRepartitions */, false /* delegateFDAcquisitions */, queueCfg, &colexecop.TestingSemaphore{}, + ) + memAccounts = append(memAccounts, accounts...) + memMonitors = append(memMonitors, monitors...) + if err != nil { + b.Fatal(err) + } + sorter.Init(ctx) + for out := sorter.Next(); out.Length() != 0; out = sorter.Next() { + } + require.Equal(b, spillForced, spilled, fmt.Sprintf( + "expected: spilled=%t\tactual: spilled=%t", spillForced, spilled, + )) } - require.Equal(b, spillForced, spilled, fmt.Sprintf( - "expected: spilled=%t\tactual: spilled=%t", spillForced, spilled, - )) - } - }) + }) + } } } } From 49e4f73abfc0e603084a5a8d20eb151081e43037 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 9 Jun 2021 18:55:11 -0700 Subject: [PATCH 2/2] colexec: optimize the external sort for top K case Previously, if the top K sort spilled to disk, we used the general external sort. However, we could easily optimize that case with the knowledge that only K tuples are needed by the output. Namely, we can use the in-memory top K sort (in order to create each new partition of the desired size) and also limit the size of each merged partition by K tuples. This commit adds these optimizations. Release note: None --- pkg/sql/colexec/colbuilder/execplan.go | 7 +- pkg/sql/colexec/external_sort.go | 153 ++++++++++++++++++------- pkg/sql/colexec/sorttopk.go | 19 ++- 3 files changed, 130 insertions(+), 49 deletions(-) diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index fc74252595e9..c93003d153da 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -362,6 +362,7 @@ func (r opResult) createDiskBackedSort( sorterMemMonitorName string inMemorySorter colexecop.Operator err error + topK uint64 ) if len(ordering.Columns) == int(matchLen) { // The input is already fully ordered, so there is nothing to sort. @@ -400,10 +401,10 @@ func (r opResult) createDiskBackedSort( ctx, flowCtx, opNamePrefix+"topk-sort", processorID, ) } - k := post.Limit + post.Offset + topK = post.Limit + post.Offset inMemorySorter = colexec.NewTopKSorter( colmem.NewAllocator(ctx, topKSorterMemAccount, factory), input, inputTypes, - ordering.Columns, k, + ordering.Columns, topK, ) } else { // No optimizations possible. Default to the standard sort operator. @@ -459,7 +460,7 @@ func (r opResult) createDiskBackedSort( sortUnlimitedAllocator, mergeUnlimitedAllocator, outputUnlimitedAllocator, - input, inputTypes, ordering, + input, inputTypes, ordering, topK, execinfra.GetWorkMemLimit(flowCtx), maxNumberPartitions, args.TestingKnobs.NumForcedRepartitions, diff --git a/pkg/sql/colexec/external_sort.go b/pkg/sql/colexec/external_sort.go index 5ff2e42827fe..03b3b7f52e22 100644 --- a/pkg/sql/colexec/external_sort.go +++ b/pkg/sql/colexec/external_sort.go @@ -12,6 +12,7 @@ package colexec import ( "context" + "fmt" "strings" "github.com/cockroachdb/cockroach/pkg/col/coldata" @@ -129,6 +130,9 @@ type externalSorter struct { state externalSorterState inputTypes []*types.T ordering execinfrapb.Ordering + // topK, if non-zero, indicates the number of tuples needed by the output. + // Each partition will be limited by this number in size. + topK uint64 // columnOrdering is the same as ordering used when creating mergers. columnOrdering colinfo.ColumnOrdering inMemSorter colexecop.ResettableOperator @@ -156,10 +160,12 @@ type externalSorter struct { // partitionsInfo tracks some information about all current partitions // (those in currentPartitionIdxs). partitionsInfo struct { + // tupleCount stores the number of tuples in each partition. + tupleCount []uint64 // totalSize is used for logging purposes. totalSize []int64 - // maxBatchMemSize decides how many partitions to have at once, - // potentially reducing maxNumberPartitions + // maxBatchMemSize is used when determining how many partitions to have + // at once, potentially reducing maxNumberPartitions. maxBatchMemSize []int64 } @@ -197,6 +203,7 @@ var _ colexecop.ClosableOperator = &externalSorter{} // from an unlimited memory monitor. They will be used by several internal // components of the external sort which is responsible for making sure that // the components stay within the memory limit. +// - topK, if non-zero, indicates the number of tuples needed by the output. // - maxNumberPartitions (when non-zero) overrides the semi-dynamically // computed maximum number of partitions to have at once. // - numForcedMerges (when non-zero) specifies the number of times the repeated @@ -212,6 +219,7 @@ func NewExternalSorter( input colexecop.Operator, inputTypes []*types.T, ordering execinfrapb.Ordering, + topK uint64, memoryLimit int64, maxNumberPartitions int, numForcedMerges int, @@ -256,12 +264,18 @@ func NewExternalSorter( mergeMemoryLimit = 1 } inputPartitioner := newInputPartitioningOperator(input, inMemSortMemoryLimit) - inMemSorter, err := newSorter( - sortUnlimitedAllocator, newAllSpooler(sortUnlimitedAllocator, inputPartitioner, inputTypes), - inputTypes, ordering.Columns, - ) - if err != nil { - colexecerror.InternalError(err) + var inMemSorter colexecop.ResettableOperator + if topK > 0 { + inMemSorter = NewTopKSorter(sortUnlimitedAllocator, inputPartitioner, inputTypes, ordering.Columns, topK) + } else { + var err error + inMemSorter, err = newSorter( + sortUnlimitedAllocator, newAllSpooler(sortUnlimitedAllocator, inputPartitioner, inputTypes), + inputTypes, ordering.Columns, + ) + if err != nil { + colexecerror.InternalError(err) + } } partitionedDiskQueueSemaphore := fdSemaphore if !delegateFDAcquisitions { @@ -283,12 +297,14 @@ func NewExternalSorter( }, inputTypes: inputTypes, ordering: ordering, + topK: topK, columnOrdering: execinfrapb.ConvertToColumnOrdering(ordering), maxNumberPartitions: maxNumberPartitions, numForcedMerges: numForcedMerges, currentPartitionIdxs: make([]int, maxNumberPartitions), maxMerged: make([]int, maxNumberPartitions), } + es.partitionsInfo.tupleCount = make([]uint64, maxNumberPartitions) es.partitionsInfo.totalSize = make([]int64, maxNumberPartitions) es.partitionsInfo.maxBatchMemSize = make([]int64, maxNumberPartitions) es.fdState.fdSemaphore = fdSemaphore @@ -296,6 +312,31 @@ func NewExternalSorter( return es } +// doneWithCurrentPartition should be called whenever all tuples needed for the +// current partition have been enqueued in order to prepare the external sorter +// for the next partition. +func (s *externalSorter) doneWithCurrentPartition() { + // The current partition has been fully processed, so we reset the in-memory + // sorter (which will do the "shallow" reset of inputPartitioningOperator). + s.inMemSorterInput.interceptReset = true + s.inMemSorter.Reset(s.Ctx) + s.currentPartitionIdxs[s.numPartitions] = s.currentPartitionIdx + s.maxMerged[s.numPartitions] = 0 + s.numPartitions++ + s.currentPartitionIdx++ + if s.shouldMergeSomePartitions() { + s.state = externalSorterRepeatedMerging + } else { + s.state = externalSorterNewPartition + } +} + +func (s *externalSorter) resetPartitionsInfoForCurrentPartition() { + s.partitionsInfo.tupleCount[s.numPartitions] = 0 + s.partitionsInfo.totalSize[s.numPartitions] = 0 + s.partitionsInfo.maxBatchMemSize[s.numPartitions] = 0 +} + func (s *externalSorter) Next() coldata.Batch { for { switch s.state { @@ -320,30 +361,19 @@ func (s *externalSorter) Next() coldata.Batch { s.fdState.acquiredFDs = toAcquire } } - s.partitionsInfo.totalSize[s.numPartitions] = 0 - s.partitionsInfo.maxBatchMemSize[s.numPartitions] = 0 - s.enqueue(b) - s.state = externalSorterSpillPartition + s.resetPartitionsInfoForCurrentPartition() + partitionDone := s.enqueue(b) + if partitionDone { + s.doneWithCurrentPartition() + } else { + s.state = externalSorterSpillPartition + } case externalSorterSpillPartition: b := s.Input.Next() - s.enqueue(b) - if b.Length() == 0 { - // The partition has been fully spilled, so we reset the - // in-memory sorter (which will do the "shallow" reset of - // inputPartitioningOperator). - s.inMemSorterInput.interceptReset = true - s.inMemSorter.Reset(s.Ctx) - s.currentPartitionIdxs[s.numPartitions] = s.currentPartitionIdx - s.maxMerged[s.numPartitions] = 0 - s.numPartitions++ - s.currentPartitionIdx++ - if s.shouldMergeSomePartitions() { - s.state = externalSorterRepeatedMerging - continue - } - s.state = externalSorterNewPartition - continue + partitionDone := s.enqueue(b) + if b.Length() == 0 || partitionDone { + s.doneWithCurrentPartition() } case externalSorterRepeatedMerging: @@ -373,11 +403,10 @@ func (s *externalSorter) Next() coldata.Batch { merger := s.createMergerForPartitions(n) merger.Init(s.Ctx) s.numPartitions -= n - s.partitionsInfo.totalSize[s.numPartitions] = 0 - s.partitionsInfo.maxBatchMemSize[s.numPartitions] = 0 + s.resetPartitionsInfoForCurrentPartition() for b := merger.Next(); ; b = merger.Next() { - s.enqueue(b) - if b.Length() == 0 { + partitionDone := s.enqueue(b) + if b.Length() == 0 || partitionDone { break } } @@ -389,9 +418,19 @@ func (s *externalSorter) Next() coldata.Batch { // used for the output batches (all of which have been enqueued into // the new partition). s.outputUnlimitedAllocator.ReleaseMemory(s.outputUnlimitedAllocator.Used()) - // Reclaim disk space by closing the inactive read partitions. Since - // the merger must have exhausted all inputs, this is all the - // partitions just read from. + // Make sure to close out all partitions we have just read from. + // + // Note that this operation is a noop for the general sort and is + // only needed for the top K sort. In the former case we have fully + // exhausted all old partitions, and they have been closed for + // reading automatically; in the latter case we stop reading once we + // have at least K tuples in the new partition, and we have to + // manually close all old partitions for reading in order for + // resources to be properly released in CloseInactiveReadPartitions + // call below. + if err := s.partitioner.CloseAllOpenReadFileDescriptors(); err != nil { + colexecerror.InternalError(err) + } if err := s.partitioner.CloseInactiveReadPartitions(s.Ctx); err != nil { colexecerror.InternalError(err) } @@ -431,11 +470,24 @@ func (s *externalSorter) Next() coldata.Batch { } // enqueue enqueues b to the current partition (which has index -// currentPartitionIdx) as well as updates the information about -// the partition. -func (s *externalSorter) enqueue(b coldata.Batch) { +// currentPartitionIdx) as well as updates the information about the partition. +// +// If the current partition reaches the desired topK number of tuples, a zero +// batch is enqueued and true is returned indicating that the current partition +// is done. +// +// The following observation is what allows us to stop enqueueing into the +// current partition once topK number of tuples is reached: `b` is not coming +// from the input to the sort operation as a whole (when tuples can be in an +// arbitrary order) - `b` is coming to us either from the in-memory top K sorter +// (which has already performed the sort over a subset of tuples, with +// inputPartitioningOperator defining the boundaries of that subset) or from the +// merger (which performs the merge of N already sorted partitions while +// preserving the order of tuples). +func (s *externalSorter) enqueue(b coldata.Batch) bool { if b.Length() > 0 { batchMemSize := colmem.GetBatchMemSize(b) + s.partitionsInfo.tupleCount[s.numPartitions] += uint64(b.Length()) s.partitionsInfo.totalSize[s.numPartitions] += batchMemSize if batchMemSize > s.partitionsInfo.maxBatchMemSize[s.numPartitions] { s.partitionsInfo.maxBatchMemSize[s.numPartitions] = batchMemSize @@ -447,6 +499,16 @@ func (s *externalSorter) enqueue(b coldata.Batch) { if err := s.partitioner.Enqueue(s.Ctx, s.currentPartitionIdx, b); err != nil { colexecutils.HandleErrorFromDiskQueue(err) } + if s.topK > 0 && s.topK <= s.partitionsInfo.tupleCount[s.numPartitions] { + // We have a top K sort and already have at least K tuples in the + // current partition. Enqueue a zero-length batch and tell the caller + // that the partition is done. + if err := s.partitioner.Enqueue(s.Ctx, s.currentPartitionIdx, coldata.ZeroBatch); err != nil { + colexecutils.HandleErrorFromDiskQueue(err) + } + return true + } + return false } // shouldMergeSomePartitions returns true if we need to merge some current @@ -574,16 +636,19 @@ func (s *externalSorter) createMergerForPartitions(n int) colexecop.Operator { syncInputs[i].Root = s.partitionerToOperators[i] } if log.V(2) { - var b strings.Builder + var counts, sizes strings.Builder for i := 0; i < n; i++ { if i > 0 { - b.WriteString(", ") + counts.WriteString(", ") + sizes.WriteString(", ") } - b.WriteString(humanizeutil.IBytes(s.partitionsInfo.totalSize[s.numPartitions-n+i])) + partitionOrdinal := s.numPartitions - n + i + counts.WriteString(fmt.Sprintf("%d", s.partitionsInfo.tupleCount[partitionOrdinal])) + sizes.WriteString(humanizeutil.IBytes(s.partitionsInfo.totalSize[partitionOrdinal])) } log.Infof(s.Ctx, - "external sorter is merging partitions with partition indices %v with sizes [%s]", - s.currentPartitionIdxs[s.numPartitions-n:s.numPartitions], b.String(), + "external sorter is merging partitions with partition indices %v with counts [%s] and sizes [%s]", + s.currentPartitionIdxs[s.numPartitions-n:s.numPartitions], counts.String(), sizes.String(), ) } diff --git a/pkg/sql/colexec/sorttopk.go b/pkg/sql/colexec/sorttopk.go index c29d1ed8c22d..8c1f001e0ac9 100644 --- a/pkg/sql/colexec/sorttopk.go +++ b/pkg/sql/colexec/sorttopk.go @@ -39,7 +39,7 @@ func NewTopKSorter( inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column, k uint64, -) colexecop.Operator { +) colexecop.ResettableOperator { return &topKSorter{ allocator: allocator, OneInputNode: colexecop.NewOneInputNode(input), @@ -50,6 +50,7 @@ func NewTopKSorter( } var _ colexecop.BufferingInMemoryOperator = &topKSorter{} +var _ colexecop.Resetter = &topKSorter{} // topKSortState represents the state of the sort operator. type topKSortState int @@ -138,6 +139,16 @@ func (t *topKSorter) Next() coldata.Batch { } } +func (t *topKSorter) Reset(ctx context.Context) { + if r, ok := t.Input.(colexecop.Resetter); ok { + r.Reset(ctx) + } + t.state = topKSortSpooling + t.firstUnprocessedTupleIdx = 0 + t.topK.ResetInternalBatch() + t.emitted = 0 +} + // spool reads in the entire input, always storing the top K rows it has seen so // far in o.topK. This is done by maintaining a max heap of indices into o.topK. // Whenever we encounter a row which is smaller than the max row in the heap, @@ -169,7 +180,11 @@ func (t *topKSorter) spool() { t.updateComparators(topKVecIdx, t.topK) // Initialize the heap. - t.heap = make([]int, t.topK.Length()) + if cap(t.heap) < t.topK.Length() { + t.heap = make([]int, t.topK.Length()) + } else { + t.heap = t.heap[:t.topK.Length()] + } for i := range t.heap { t.heap[i] = i }