Skip to content

Commit

Permalink
colexec: optimize the external sort for top K case
Browse files Browse the repository at this point in the history
Previously, if the top K sort spilled to disk, we used the general
external sort. However, we could easily optimize that case with the
knowledge that only K tuples are needed by the output.

Namely, we can use the in-memory top K sort (in order to create each new
partition of the desired size) and also limit the size of each merged
partition by K tuples. This commit adds these optimizations.

Release note: None
  • Loading branch information
yuzefovich committed Jun 16, 2021
1 parent d2b2bc6 commit 49e4f73
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 49 deletions.
7 changes: 4 additions & 3 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ func (r opResult) createDiskBackedSort(
sorterMemMonitorName string
inMemorySorter colexecop.Operator
err error
topK uint64
)
if len(ordering.Columns) == int(matchLen) {
// The input is already fully ordered, so there is nothing to sort.
Expand Down Expand Up @@ -400,10 +401,10 @@ func (r opResult) createDiskBackedSort(
ctx, flowCtx, opNamePrefix+"topk-sort", processorID,
)
}
k := post.Limit + post.Offset
topK = post.Limit + post.Offset
inMemorySorter = colexec.NewTopKSorter(
colmem.NewAllocator(ctx, topKSorterMemAccount, factory), input, inputTypes,
ordering.Columns, k,
ordering.Columns, topK,
)
} else {
// No optimizations possible. Default to the standard sort operator.
Expand Down Expand Up @@ -459,7 +460,7 @@ func (r opResult) createDiskBackedSort(
sortUnlimitedAllocator,
mergeUnlimitedAllocator,
outputUnlimitedAllocator,
input, inputTypes, ordering,
input, inputTypes, ordering, topK,
execinfra.GetWorkMemLimit(flowCtx),
maxNumberPartitions,
args.TestingKnobs.NumForcedRepartitions,
Expand Down
153 changes: 109 additions & 44 deletions pkg/sql/colexec/external_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colexec

import (
"context"
"fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
Expand Down Expand Up @@ -129,6 +130,9 @@ type externalSorter struct {
state externalSorterState
inputTypes []*types.T
ordering execinfrapb.Ordering
// topK, if non-zero, indicates the number of tuples needed by the output.
// Each partition will be limited by this number in size.
topK uint64
// columnOrdering is the same as ordering used when creating mergers.
columnOrdering colinfo.ColumnOrdering
inMemSorter colexecop.ResettableOperator
Expand Down Expand Up @@ -156,10 +160,12 @@ type externalSorter struct {
// partitionsInfo tracks some information about all current partitions
// (those in currentPartitionIdxs).
partitionsInfo struct {
// tupleCount stores the number of tuples in each partition.
tupleCount []uint64
// totalSize is used for logging purposes.
totalSize []int64
// maxBatchMemSize decides how many partitions to have at once,
// potentially reducing maxNumberPartitions
// maxBatchMemSize is used when determining how many partitions to have
// at once, potentially reducing maxNumberPartitions.
maxBatchMemSize []int64
}

Expand Down Expand Up @@ -197,6 +203,7 @@ var _ colexecop.ClosableOperator = &externalSorter{}
// from an unlimited memory monitor. They will be used by several internal
// components of the external sort which is responsible for making sure that
// the components stay within the memory limit.
// - topK, if non-zero, indicates the number of tuples needed by the output.
// - maxNumberPartitions (when non-zero) overrides the semi-dynamically
// computed maximum number of partitions to have at once.
// - numForcedMerges (when non-zero) specifies the number of times the repeated
Expand All @@ -212,6 +219,7 @@ func NewExternalSorter(
input colexecop.Operator,
inputTypes []*types.T,
ordering execinfrapb.Ordering,
topK uint64,
memoryLimit int64,
maxNumberPartitions int,
numForcedMerges int,
Expand Down Expand Up @@ -256,12 +264,18 @@ func NewExternalSorter(
mergeMemoryLimit = 1
}
inputPartitioner := newInputPartitioningOperator(input, inMemSortMemoryLimit)
inMemSorter, err := newSorter(
sortUnlimitedAllocator, newAllSpooler(sortUnlimitedAllocator, inputPartitioner, inputTypes),
inputTypes, ordering.Columns,
)
if err != nil {
colexecerror.InternalError(err)
var inMemSorter colexecop.ResettableOperator
if topK > 0 {
inMemSorter = NewTopKSorter(sortUnlimitedAllocator, inputPartitioner, inputTypes, ordering.Columns, topK)
} else {
var err error
inMemSorter, err = newSorter(
sortUnlimitedAllocator, newAllSpooler(sortUnlimitedAllocator, inputPartitioner, inputTypes),
inputTypes, ordering.Columns,
)
if err != nil {
colexecerror.InternalError(err)
}
}
partitionedDiskQueueSemaphore := fdSemaphore
if !delegateFDAcquisitions {
Expand All @@ -283,19 +297,46 @@ func NewExternalSorter(
},
inputTypes: inputTypes,
ordering: ordering,
topK: topK,
columnOrdering: execinfrapb.ConvertToColumnOrdering(ordering),
maxNumberPartitions: maxNumberPartitions,
numForcedMerges: numForcedMerges,
currentPartitionIdxs: make([]int, maxNumberPartitions),
maxMerged: make([]int, maxNumberPartitions),
}
es.partitionsInfo.tupleCount = make([]uint64, maxNumberPartitions)
es.partitionsInfo.totalSize = make([]int64, maxNumberPartitions)
es.partitionsInfo.maxBatchMemSize = make([]int64, maxNumberPartitions)
es.fdState.fdSemaphore = fdSemaphore
es.testingKnobs.delegateFDAcquisitions = delegateFDAcquisitions
return es
}

// doneWithCurrentPartition should be called whenever all tuples needed for the
// current partition have been enqueued in order to prepare the external sorter
// for the next partition.
func (s *externalSorter) doneWithCurrentPartition() {
// The current partition has been fully processed, so we reset the in-memory
// sorter (which will do the "shallow" reset of inputPartitioningOperator).
s.inMemSorterInput.interceptReset = true
s.inMemSorter.Reset(s.Ctx)
s.currentPartitionIdxs[s.numPartitions] = s.currentPartitionIdx
s.maxMerged[s.numPartitions] = 0
s.numPartitions++
s.currentPartitionIdx++
if s.shouldMergeSomePartitions() {
s.state = externalSorterRepeatedMerging
} else {
s.state = externalSorterNewPartition
}
}

func (s *externalSorter) resetPartitionsInfoForCurrentPartition() {
s.partitionsInfo.tupleCount[s.numPartitions] = 0
s.partitionsInfo.totalSize[s.numPartitions] = 0
s.partitionsInfo.maxBatchMemSize[s.numPartitions] = 0
}

func (s *externalSorter) Next() coldata.Batch {
for {
switch s.state {
Expand All @@ -320,30 +361,19 @@ func (s *externalSorter) Next() coldata.Batch {
s.fdState.acquiredFDs = toAcquire
}
}
s.partitionsInfo.totalSize[s.numPartitions] = 0
s.partitionsInfo.maxBatchMemSize[s.numPartitions] = 0
s.enqueue(b)
s.state = externalSorterSpillPartition
s.resetPartitionsInfoForCurrentPartition()
partitionDone := s.enqueue(b)
if partitionDone {
s.doneWithCurrentPartition()
} else {
s.state = externalSorterSpillPartition
}

case externalSorterSpillPartition:
b := s.Input.Next()
s.enqueue(b)
if b.Length() == 0 {
// The partition has been fully spilled, so we reset the
// in-memory sorter (which will do the "shallow" reset of
// inputPartitioningOperator).
s.inMemSorterInput.interceptReset = true
s.inMemSorter.Reset(s.Ctx)
s.currentPartitionIdxs[s.numPartitions] = s.currentPartitionIdx
s.maxMerged[s.numPartitions] = 0
s.numPartitions++
s.currentPartitionIdx++
if s.shouldMergeSomePartitions() {
s.state = externalSorterRepeatedMerging
continue
}
s.state = externalSorterNewPartition
continue
partitionDone := s.enqueue(b)
if b.Length() == 0 || partitionDone {
s.doneWithCurrentPartition()
}

case externalSorterRepeatedMerging:
Expand Down Expand Up @@ -373,11 +403,10 @@ func (s *externalSorter) Next() coldata.Batch {
merger := s.createMergerForPartitions(n)
merger.Init(s.Ctx)
s.numPartitions -= n
s.partitionsInfo.totalSize[s.numPartitions] = 0
s.partitionsInfo.maxBatchMemSize[s.numPartitions] = 0
s.resetPartitionsInfoForCurrentPartition()
for b := merger.Next(); ; b = merger.Next() {
s.enqueue(b)
if b.Length() == 0 {
partitionDone := s.enqueue(b)
if b.Length() == 0 || partitionDone {
break
}
}
Expand All @@ -389,9 +418,19 @@ func (s *externalSorter) Next() coldata.Batch {
// used for the output batches (all of which have been enqueued into
// the new partition).
s.outputUnlimitedAllocator.ReleaseMemory(s.outputUnlimitedAllocator.Used())
// Reclaim disk space by closing the inactive read partitions. Since
// the merger must have exhausted all inputs, this is all the
// partitions just read from.
// Make sure to close out all partitions we have just read from.
//
// Note that this operation is a noop for the general sort and is
// only needed for the top K sort. In the former case we have fully
// exhausted all old partitions, and they have been closed for
// reading automatically; in the latter case we stop reading once we
// have at least K tuples in the new partition, and we have to
// manually close all old partitions for reading in order for
// resources to be properly released in CloseInactiveReadPartitions
// call below.
if err := s.partitioner.CloseAllOpenReadFileDescriptors(); err != nil {
colexecerror.InternalError(err)
}
if err := s.partitioner.CloseInactiveReadPartitions(s.Ctx); err != nil {
colexecerror.InternalError(err)
}
Expand Down Expand Up @@ -431,11 +470,24 @@ func (s *externalSorter) Next() coldata.Batch {
}

// enqueue enqueues b to the current partition (which has index
// currentPartitionIdx) as well as updates the information about
// the partition.
func (s *externalSorter) enqueue(b coldata.Batch) {
// currentPartitionIdx) as well as updates the information about the partition.
//
// If the current partition reaches the desired topK number of tuples, a zero
// batch is enqueued and true is returned indicating that the current partition
// is done.
//
// The following observation is what allows us to stop enqueueing into the
// current partition once topK number of tuples is reached: `b` is not coming
// from the input to the sort operation as a whole (when tuples can be in an
// arbitrary order) - `b` is coming to us either from the in-memory top K sorter
// (which has already performed the sort over a subset of tuples, with
// inputPartitioningOperator defining the boundaries of that subset) or from the
// merger (which performs the merge of N already sorted partitions while
// preserving the order of tuples).
func (s *externalSorter) enqueue(b coldata.Batch) bool {
if b.Length() > 0 {
batchMemSize := colmem.GetBatchMemSize(b)
s.partitionsInfo.tupleCount[s.numPartitions] += uint64(b.Length())
s.partitionsInfo.totalSize[s.numPartitions] += batchMemSize
if batchMemSize > s.partitionsInfo.maxBatchMemSize[s.numPartitions] {
s.partitionsInfo.maxBatchMemSize[s.numPartitions] = batchMemSize
Expand All @@ -447,6 +499,16 @@ func (s *externalSorter) enqueue(b coldata.Batch) {
if err := s.partitioner.Enqueue(s.Ctx, s.currentPartitionIdx, b); err != nil {
colexecutils.HandleErrorFromDiskQueue(err)
}
if s.topK > 0 && s.topK <= s.partitionsInfo.tupleCount[s.numPartitions] {
// We have a top K sort and already have at least K tuples in the
// current partition. Enqueue a zero-length batch and tell the caller
// that the partition is done.
if err := s.partitioner.Enqueue(s.Ctx, s.currentPartitionIdx, coldata.ZeroBatch); err != nil {
colexecutils.HandleErrorFromDiskQueue(err)
}
return true
}
return false
}

// shouldMergeSomePartitions returns true if we need to merge some current
Expand Down Expand Up @@ -574,16 +636,19 @@ func (s *externalSorter) createMergerForPartitions(n int) colexecop.Operator {
syncInputs[i].Root = s.partitionerToOperators[i]
}
if log.V(2) {
var b strings.Builder
var counts, sizes strings.Builder
for i := 0; i < n; i++ {
if i > 0 {
b.WriteString(", ")
counts.WriteString(", ")
sizes.WriteString(", ")
}
b.WriteString(humanizeutil.IBytes(s.partitionsInfo.totalSize[s.numPartitions-n+i]))
partitionOrdinal := s.numPartitions - n + i
counts.WriteString(fmt.Sprintf("%d", s.partitionsInfo.tupleCount[partitionOrdinal]))
sizes.WriteString(humanizeutil.IBytes(s.partitionsInfo.totalSize[partitionOrdinal]))
}
log.Infof(s.Ctx,
"external sorter is merging partitions with partition indices %v with sizes [%s]",
s.currentPartitionIdxs[s.numPartitions-n:s.numPartitions], b.String(),
"external sorter is merging partitions with partition indices %v with counts [%s] and sizes [%s]",
s.currentPartitionIdxs[s.numPartitions-n:s.numPartitions], counts.String(), sizes.String(),
)
}

Expand Down
19 changes: 17 additions & 2 deletions pkg/sql/colexec/sorttopk.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewTopKSorter(
inputTypes []*types.T,
orderingCols []execinfrapb.Ordering_Column,
k uint64,
) colexecop.Operator {
) colexecop.ResettableOperator {
return &topKSorter{
allocator: allocator,
OneInputNode: colexecop.NewOneInputNode(input),
Expand All @@ -50,6 +50,7 @@ func NewTopKSorter(
}

var _ colexecop.BufferingInMemoryOperator = &topKSorter{}
var _ colexecop.Resetter = &topKSorter{}

// topKSortState represents the state of the sort operator.
type topKSortState int
Expand Down Expand Up @@ -138,6 +139,16 @@ func (t *topKSorter) Next() coldata.Batch {
}
}

func (t *topKSorter) Reset(ctx context.Context) {
if r, ok := t.Input.(colexecop.Resetter); ok {
r.Reset(ctx)
}
t.state = topKSortSpooling
t.firstUnprocessedTupleIdx = 0
t.topK.ResetInternalBatch()
t.emitted = 0
}

// spool reads in the entire input, always storing the top K rows it has seen so
// far in o.topK. This is done by maintaining a max heap of indices into o.topK.
// Whenever we encounter a row which is smaller than the max row in the heap,
Expand Down Expand Up @@ -169,7 +180,11 @@ func (t *topKSorter) spool() {
t.updateComparators(topKVecIdx, t.topK)

// Initialize the heap.
t.heap = make([]int, t.topK.Length())
if cap(t.heap) < t.topK.Length() {
t.heap = make([]int, t.topK.Length())
} else {
t.heap = t.heap[:t.topK.Length()]
}
for i := range t.heap {
t.heap[i] = i
}
Expand Down

0 comments on commit 49e4f73

Please sign in to comment.