Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: optimize the external sort for top K case #66303

Merged
merged 2 commits into from
Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
colexec: optimize the external sort for top K case
Previously, if the top K sort spilled to disk, we used the general
external sort. However, we could easily optimize that case with the
knowledge that only K tuples are needed by the output.

Namely, we can use the in-memory top K sort (in order to create each new
partition of the desired size) and also limit the size of each merged
partition by K tuples. This commit adds these optimizations.

Release note: None
  • Loading branch information
yuzefovich committed Jun 16, 2021
commit 49e4f73abfc0e603084a5a8d20eb151081e43037
7 changes: 4 additions & 3 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ func (r opResult) createDiskBackedSort(
sorterMemMonitorName string
inMemorySorter colexecop.Operator
err error
topK uint64
)
if len(ordering.Columns) == int(matchLen) {
// The input is already fully ordered, so there is nothing to sort.
Expand Down Expand Up @@ -400,10 +401,10 @@ func (r opResult) createDiskBackedSort(
ctx, flowCtx, opNamePrefix+"topk-sort", processorID,
)
}
k := post.Limit + post.Offset
topK = post.Limit + post.Offset
inMemorySorter = colexec.NewTopKSorter(
colmem.NewAllocator(ctx, topKSorterMemAccount, factory), input, inputTypes,
ordering.Columns, k,
ordering.Columns, topK,
)
} else {
// No optimizations possible. Default to the standard sort operator.
Expand Down Expand Up @@ -459,7 +460,7 @@ func (r opResult) createDiskBackedSort(
sortUnlimitedAllocator,
mergeUnlimitedAllocator,
outputUnlimitedAllocator,
input, inputTypes, ordering,
input, inputTypes, ordering, topK,
execinfra.GetWorkMemLimit(flowCtx),
maxNumberPartitions,
args.TestingKnobs.NumForcedRepartitions,
Expand Down
153 changes: 109 additions & 44 deletions pkg/sql/colexec/external_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colexec

import (
"context"
"fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
Expand Down Expand Up @@ -129,6 +130,9 @@ type externalSorter struct {
state externalSorterState
inputTypes []*types.T
ordering execinfrapb.Ordering
// topK, if non-zero, indicates the number of tuples needed by the output.
// Each partition will be limited by this number in size.
topK uint64
// columnOrdering is the same as ordering used when creating mergers.
columnOrdering colinfo.ColumnOrdering
inMemSorter colexecop.ResettableOperator
Expand Down Expand Up @@ -156,10 +160,12 @@ type externalSorter struct {
// partitionsInfo tracks some information about all current partitions
// (those in currentPartitionIdxs).
partitionsInfo struct {
// tupleCount stores the number of tuples in each partition.
tupleCount []uint64
// totalSize is used for logging purposes.
totalSize []int64
// maxBatchMemSize decides how many partitions to have at once,
// potentially reducing maxNumberPartitions
// maxBatchMemSize is used when determining how many partitions to have
// at once, potentially reducing maxNumberPartitions.
maxBatchMemSize []int64
}

Expand Down Expand Up @@ -197,6 +203,7 @@ var _ colexecop.ClosableOperator = &externalSorter{}
// from an unlimited memory monitor. They will be used by several internal
// components of the external sort which is responsible for making sure that
// the components stay within the memory limit.
// - topK, if non-zero, indicates the number of tuples needed by the output.
// - maxNumberPartitions (when non-zero) overrides the semi-dynamically
// computed maximum number of partitions to have at once.
// - numForcedMerges (when non-zero) specifies the number of times the repeated
Expand All @@ -212,6 +219,7 @@ func NewExternalSorter(
input colexecop.Operator,
inputTypes []*types.T,
ordering execinfrapb.Ordering,
topK uint64,
memoryLimit int64,
maxNumberPartitions int,
numForcedMerges int,
Expand Down Expand Up @@ -256,12 +264,18 @@ func NewExternalSorter(
mergeMemoryLimit = 1
}
inputPartitioner := newInputPartitioningOperator(input, inMemSortMemoryLimit)
inMemSorter, err := newSorter(
sortUnlimitedAllocator, newAllSpooler(sortUnlimitedAllocator, inputPartitioner, inputTypes),
inputTypes, ordering.Columns,
)
if err != nil {
colexecerror.InternalError(err)
var inMemSorter colexecop.ResettableOperator
if topK > 0 {
inMemSorter = NewTopKSorter(sortUnlimitedAllocator, inputPartitioner, inputTypes, ordering.Columns, topK)
} else {
var err error
inMemSorter, err = newSorter(
sortUnlimitedAllocator, newAllSpooler(sortUnlimitedAllocator, inputPartitioner, inputTypes),
inputTypes, ordering.Columns,
)
if err != nil {
colexecerror.InternalError(err)
}
}
partitionedDiskQueueSemaphore := fdSemaphore
if !delegateFDAcquisitions {
Expand All @@ -283,19 +297,46 @@ func NewExternalSorter(
},
inputTypes: inputTypes,
ordering: ordering,
topK: topK,
columnOrdering: execinfrapb.ConvertToColumnOrdering(ordering),
maxNumberPartitions: maxNumberPartitions,
numForcedMerges: numForcedMerges,
currentPartitionIdxs: make([]int, maxNumberPartitions),
maxMerged: make([]int, maxNumberPartitions),
}
es.partitionsInfo.tupleCount = make([]uint64, maxNumberPartitions)
es.partitionsInfo.totalSize = make([]int64, maxNumberPartitions)
es.partitionsInfo.maxBatchMemSize = make([]int64, maxNumberPartitions)
es.fdState.fdSemaphore = fdSemaphore
es.testingKnobs.delegateFDAcquisitions = delegateFDAcquisitions
return es
}

// doneWithCurrentPartition should be called whenever all tuples needed for the
// current partition have been enqueued in order to prepare the external sorter
// for the next partition.
func (s *externalSorter) doneWithCurrentPartition() {
// The current partition has been fully processed, so we reset the in-memory
// sorter (which will do the "shallow" reset of inputPartitioningOperator).
s.inMemSorterInput.interceptReset = true
s.inMemSorter.Reset(s.Ctx)
s.currentPartitionIdxs[s.numPartitions] = s.currentPartitionIdx
s.maxMerged[s.numPartitions] = 0
s.numPartitions++
s.currentPartitionIdx++
if s.shouldMergeSomePartitions() {
s.state = externalSorterRepeatedMerging
} else {
s.state = externalSorterNewPartition
}
}

func (s *externalSorter) resetPartitionsInfoForCurrentPartition() {
s.partitionsInfo.tupleCount[s.numPartitions] = 0
s.partitionsInfo.totalSize[s.numPartitions] = 0
s.partitionsInfo.maxBatchMemSize[s.numPartitions] = 0
}

func (s *externalSorter) Next() coldata.Batch {
for {
switch s.state {
Expand All @@ -320,30 +361,19 @@ func (s *externalSorter) Next() coldata.Batch {
s.fdState.acquiredFDs = toAcquire
}
}
s.partitionsInfo.totalSize[s.numPartitions] = 0
s.partitionsInfo.maxBatchMemSize[s.numPartitions] = 0
s.enqueue(b)
s.state = externalSorterSpillPartition
s.resetPartitionsInfoForCurrentPartition()
partitionDone := s.enqueue(b)
if partitionDone {
s.doneWithCurrentPartition()
} else {
s.state = externalSorterSpillPartition
}

case externalSorterSpillPartition:
b := s.Input.Next()
s.enqueue(b)
if b.Length() == 0 {
// The partition has been fully spilled, so we reset the
// in-memory sorter (which will do the "shallow" reset of
// inputPartitioningOperator).
s.inMemSorterInput.interceptReset = true
s.inMemSorter.Reset(s.Ctx)
s.currentPartitionIdxs[s.numPartitions] = s.currentPartitionIdx
s.maxMerged[s.numPartitions] = 0
s.numPartitions++
s.currentPartitionIdx++
if s.shouldMergeSomePartitions() {
s.state = externalSorterRepeatedMerging
continue
}
s.state = externalSorterNewPartition
continue
partitionDone := s.enqueue(b)
if b.Length() == 0 || partitionDone {
s.doneWithCurrentPartition()
}

case externalSorterRepeatedMerging:
Expand Down Expand Up @@ -373,11 +403,10 @@ func (s *externalSorter) Next() coldata.Batch {
merger := s.createMergerForPartitions(n)
merger.Init(s.Ctx)
s.numPartitions -= n
s.partitionsInfo.totalSize[s.numPartitions] = 0
s.partitionsInfo.maxBatchMemSize[s.numPartitions] = 0
s.resetPartitionsInfoForCurrentPartition()
for b := merger.Next(); ; b = merger.Next() {
s.enqueue(b)
if b.Length() == 0 {
partitionDone := s.enqueue(b)
if b.Length() == 0 || partitionDone {
break
}
}
Expand All @@ -389,9 +418,19 @@ func (s *externalSorter) Next() coldata.Batch {
// used for the output batches (all of which have been enqueued into
// the new partition).
s.outputUnlimitedAllocator.ReleaseMemory(s.outputUnlimitedAllocator.Used())
// Reclaim disk space by closing the inactive read partitions. Since
// the merger must have exhausted all inputs, this is all the
// partitions just read from.
// Make sure to close out all partitions we have just read from.
//
// Note that this operation is a noop for the general sort and is
// only needed for the top K sort. In the former case we have fully
// exhausted all old partitions, and they have been closed for
// reading automatically; in the latter case we stop reading once we
// have at least K tuples in the new partition, and we have to
// manually close all old partitions for reading in order for
// resources to be properly released in CloseInactiveReadPartitions
// call below.
if err := s.partitioner.CloseAllOpenReadFileDescriptors(); err != nil {
colexecerror.InternalError(err)
}
if err := s.partitioner.CloseInactiveReadPartitions(s.Ctx); err != nil {
colexecerror.InternalError(err)
}
Expand Down Expand Up @@ -431,11 +470,24 @@ func (s *externalSorter) Next() coldata.Batch {
}

// enqueue enqueues b to the current partition (which has index
// currentPartitionIdx) as well as updates the information about
// the partition.
func (s *externalSorter) enqueue(b coldata.Batch) {
// currentPartitionIdx) as well as updates the information about the partition.
//
// If the current partition reaches the desired topK number of tuples, a zero
// batch is enqueued and true is returned indicating that the current partition
// is done.
//
// The following observation is what allows us to stop enqueueing into the
// current partition once topK number of tuples is reached: `b` is not coming
// from the input to the sort operation as a whole (when tuples can be in an
// arbitrary order) - `b` is coming to us either from the in-memory top K sorter
// (which has already performed the sort over a subset of tuples, with
// inputPartitioningOperator defining the boundaries of that subset) or from the
// merger (which performs the merge of N already sorted partitions while
// preserving the order of tuples).
func (s *externalSorter) enqueue(b coldata.Batch) bool {
if b.Length() > 0 {
batchMemSize := colmem.GetBatchMemSize(b)
s.partitionsInfo.tupleCount[s.numPartitions] += uint64(b.Length())
s.partitionsInfo.totalSize[s.numPartitions] += batchMemSize
if batchMemSize > s.partitionsInfo.maxBatchMemSize[s.numPartitions] {
s.partitionsInfo.maxBatchMemSize[s.numPartitions] = batchMemSize
Expand All @@ -447,6 +499,16 @@ func (s *externalSorter) enqueue(b coldata.Batch) {
if err := s.partitioner.Enqueue(s.Ctx, s.currentPartitionIdx, b); err != nil {
colexecutils.HandleErrorFromDiskQueue(err)
}
if s.topK > 0 && s.topK <= s.partitionsInfo.tupleCount[s.numPartitions] {
// We have a top K sort and already have at least K tuples in the
// current partition. Enqueue a zero-length batch and tell the caller
// that the partition is done.
if err := s.partitioner.Enqueue(s.Ctx, s.currentPartitionIdx, coldata.ZeroBatch); err != nil {
colexecutils.HandleErrorFromDiskQueue(err)
}
return true
}
return false
}

// shouldMergeSomePartitions returns true if we need to merge some current
Expand Down Expand Up @@ -574,16 +636,19 @@ func (s *externalSorter) createMergerForPartitions(n int) colexecop.Operator {
syncInputs[i].Root = s.partitionerToOperators[i]
}
if log.V(2) {
var b strings.Builder
var counts, sizes strings.Builder
for i := 0; i < n; i++ {
if i > 0 {
b.WriteString(", ")
counts.WriteString(", ")
sizes.WriteString(", ")
}
b.WriteString(humanizeutil.IBytes(s.partitionsInfo.totalSize[s.numPartitions-n+i]))
partitionOrdinal := s.numPartitions - n + i
counts.WriteString(fmt.Sprintf("%d", s.partitionsInfo.tupleCount[partitionOrdinal]))
sizes.WriteString(humanizeutil.IBytes(s.partitionsInfo.totalSize[partitionOrdinal]))
}
log.Infof(s.Ctx,
"external sorter is merging partitions with partition indices %v with sizes [%s]",
s.currentPartitionIdxs[s.numPartitions-n:s.numPartitions], b.String(),
"external sorter is merging partitions with partition indices %v with counts [%s] and sizes [%s]",
s.currentPartitionIdxs[s.numPartitions-n:s.numPartitions], counts.String(), sizes.String(),
)
}

Expand Down
19 changes: 17 additions & 2 deletions pkg/sql/colexec/sorttopk.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewTopKSorter(
inputTypes []*types.T,
orderingCols []execinfrapb.Ordering_Column,
k uint64,
) colexecop.Operator {
) colexecop.ResettableOperator {
return &topKSorter{
allocator: allocator,
OneInputNode: colexecop.NewOneInputNode(input),
Expand All @@ -50,6 +50,7 @@ func NewTopKSorter(
}

var _ colexecop.BufferingInMemoryOperator = &topKSorter{}
var _ colexecop.Resetter = &topKSorter{}

// topKSortState represents the state of the sort operator.
type topKSortState int
Expand Down Expand Up @@ -138,6 +139,16 @@ func (t *topKSorter) Next() coldata.Batch {
}
}

func (t *topKSorter) Reset(ctx context.Context) {
if r, ok := t.Input.(colexecop.Resetter); ok {
r.Reset(ctx)
}
t.state = topKSortSpooling
t.firstUnprocessedTupleIdx = 0
t.topK.ResetInternalBatch()
t.emitted = 0
}

// spool reads in the entire input, always storing the top K rows it has seen so
// far in o.topK. This is done by maintaining a max heap of indices into o.topK.
// Whenever we encounter a row which is smaller than the max row in the heap,
Expand Down Expand Up @@ -169,7 +180,11 @@ func (t *topKSorter) spool() {
t.updateComparators(topKVecIdx, t.topK)

// Initialize the heap.
t.heap = make([]int, t.topK.Length())
if cap(t.heap) < t.topK.Length() {
t.heap = make([]int, t.topK.Length())
} else {
t.heap = t.heap[:t.topK.Length()]
}
for i := range t.heap {
t.heap[i] = i
}
Expand Down