Skip to content

Commit

Permalink
colexec: fix external sort
Browse files Browse the repository at this point in the history
Release justification: bug fixes and low-risk updates to new
functionality.

This commit fixes the resetting behavior of inputPartitioningOperator
(used by the external sort to figure out when to start a new partition
when spilling the input) that was recently broken when we added the
fallback from the external hash joiner to external sort plus merge
joiner. The complication here is that we need two kinds of behaviors
when inputPartitioningOperator is being reset:
1. ("shallow" reset) we need to clear the memory account because the
external sorter is moving on spilling the data into a new partition.
However, we *cannot* propagate the reset further up because it might
delete the data that the external sorter has not yet spilled. This
behavior is needed in externalSorter when resetting the in-memory sorter
when spilling the next "chunk" of data into the new partition.
2. ("deep" reset) we need to do the full reset of the whole chain of
operators. This behavior is needed when the whole external sorter is
being reset.

Release note: None.
  • Loading branch information
yuzefovich committed Mar 13, 2020
1 parent f3b2175 commit 0cc326d
Showing 1 changed file with 28 additions and 3 deletions.
31 changes: 28 additions & 3 deletions pkg/sql/colexec/external_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type externalSorter struct {
inputTypes []coltypes.T
ordering execinfrapb.Ordering
inMemSorter resettableOperator
inMemSorterInput *inputPartitioningOperator
partitioner colcontainer.PartitionedQueue
partitionerCreator func() colcontainer.PartitionedQueue
// numPartitions is the current number of partitions.
Expand Down Expand Up @@ -180,6 +181,7 @@ func newExternalSorter(
unlimitedAllocator: unlimitedAllocator,
memoryLimit: memoryLimit,
inMemSorter: inMemSorter,
inMemSorterInput: inputPartitioner.(*inputPartitioningOperator),
partitionerCreator: func() colcontainer.PartitionedQueue {
return colcontainer.NewPartitionedDiskQueue(inputTypes, diskQueueCfg, fdSemaphore, colcontainer.PartitionerStrategyCloseOnNewPartition)
},
Expand Down Expand Up @@ -221,7 +223,9 @@ func (s *externalSorter) Next(ctx context.Context) coldata.Batch {
b := s.input.Next(ctx)
if b.Length() == 0 {
// The partition has been fully spilled, so we reset the in-memory
// sorter (which will reset inputPartitioningOperator).
// sorter (which will do the "shallow" reset of
// inputPartitioningOperator).
s.inMemSorterInput.interceptReset = true
s.inMemSorter.reset()
s.numPartitions++
if s.numPartitions == s.maxNumberPartitions-1 {
Expand Down Expand Up @@ -357,6 +361,24 @@ type inputPartitioningOperator struct {
ctx context.Context
standaloneMemAccount *mon.BoundAccount
memoryLimit int64
// interceptReset determines whether the reset method will be called on
// the input to this operator when the latter is being reset. This field is
// managed by externalSorter.
// NOTE: this field itself is set to 'false' when inputPartitioningOperator
// is being reset, regardless of the original value.
//
// The reason for having this knob is that we need two kinds of behaviors
// when resetting the inputPartitioningOperator:
// 1. ("shallow" reset) we need to clear the memory account because the
// external sorter is moving on spilling the data into a new partition.
// However, we *cannot* propagate the reset further up because it might
// delete the data that the external sorter has not yet spilled. This
// behavior is needed in externalSorter when resetting the in-memory sorter
// when spilling the next "chunk" of data into the new partition.
// 2. ("deep" reset) we need to do the full reset of the whole chain of
// operators. This behavior is needed when the whole external sorter is
// being reset.
interceptReset bool
}

var _ resettableOperator = &inputPartitioningOperator{}
Expand Down Expand Up @@ -404,8 +426,11 @@ func (o *inputPartitioningOperator) Next(ctx context.Context) coldata.Batch {
}

func (o *inputPartitioningOperator) reset() {
if r, ok := o.input.(resetter); ok {
r.reset()
if !o.interceptReset {
if r, ok := o.input.(resetter); ok {
r.reset()
}
}
o.interceptReset = false
o.standaloneMemAccount.Shrink(o.ctx, o.standaloneMemAccount.Used())
}

0 comments on commit 0cc326d

Please sign in to comment.