Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: request file descriptors up front in external sorts/joins #45892

Merged
merged 1 commit into from
Mar 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions pkg/sql/colcontainer/partitionedqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ type PartitionedDiskQueue struct {
// enqueueing to a new partition. Each new partition will use
// cfg.BufferSizeBytes, so memory usage may increase in an unbounded fashion if
// used unmethodically. The file descriptors are acquired through fdSemaphore.
// If fdSemaphore is nil, the partitioned disk queue will not Acquire or Release
// file descriptors. Do this if the caller knows that it will use a constant
// maximum number of file descriptors and wishes to acquire these up front.
// Note that actual file descriptors open may be less than, but never more than
// the number acquired through the semaphore.
func NewPartitionedDiskQueue(
Expand Down Expand Up @@ -171,7 +174,7 @@ func (p *PartitionedDiskQueue) closeWritePartition(
if err := p.partitions[idx].Enqueue(coldata.ZeroBatch); err != nil {
return err
}
if releaseFDOption == releaseFD {
if releaseFDOption == releaseFD && p.fdSemaphore != nil {
p.fdSemaphore.Release(1)
p.numOpenFDs--
}
Expand All @@ -186,13 +189,18 @@ func (p *PartitionedDiskQueue) closeReadPartition(idx int) error {
if err := p.partitions[idx].CloseRead(); err != nil {
return err
}
p.fdSemaphore.Release(1)
p.numOpenFDs--
if p.fdSemaphore != nil {
p.fdSemaphore.Release(1)
p.numOpenFDs--
}
p.partitions[idx].state = partitionStateClosedForReading
return nil
}

func (p *PartitionedDiskQueue) acquireNewFD(ctx context.Context) error {
if p.fdSemaphore == nil {
return nil
}
if err := p.fdSemaphore.Acquire(ctx, 1); err != nil {
return err
}
Expand Down Expand Up @@ -370,6 +378,8 @@ func (p *PartitionedDiskQueue) Close() error {
p.partitions[i].state = partitionStatePermanentlyClosed
}
if p.numOpenFDs != 0 {
// Note that if p.numOpenFDs is non-zero, it must be the case that
// fdSemaphore is non-nil.
p.fdSemaphore.Release(p.numOpenFDs)
p.numOpenFDs = 0
}
Expand Down
19 changes: 18 additions & 1 deletion pkg/sql/colexec/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ type NewColOperatorArgs struct {
// sorter it is merging already created partitions into new one before
// proceeding to the next partition from the input).
NumForcedRepartitions int
// DelegateFDAcquisitions should be observed by users of a
// PartitionedDiskQueue. During normal operations, these should acquire the
// maximum number of file descriptors they will use from FDSemaphore up
// front. Setting this testing knob to true disables that behavior and
// lets the PartitionedDiskQueue interact with the semaphore as partitions
// are opened/closed, which ensures that the number of open files never
// exceeds what is expected.
DelegateFDAcquisitions bool
}
}

Expand Down Expand Up @@ -395,6 +403,7 @@ func (r *NewColOperatorResult) createDiskBackedSort(
input, inputTypes, ordering,
execinfra.GetWorkMemLimit(flowCtx.Cfg),
maxNumberPartitions,
args.TestingKnobs.DelegateFDAcquisitions,
diskQueueCfg,
args.FDSemaphore,
)
Expand Down Expand Up @@ -786,13 +795,21 @@ func NewColOperator(
diskQueueCfg,
args.FDSemaphore,
func(input Operator, inputTypes []coltypes.T, orderingCols []execinfrapb.Ordering_Column, maxNumberPartitions int) (Operator, error) {
sortArgs := args
if !args.TestingKnobs.DelegateFDAcquisitions {
// Set the FDSemaphore to nil. This indicates that no FDs
// should be acquired. The external hash joiner will do this
// up front.
sortArgs.FDSemaphore = nil
}
return result.createDiskBackedSort(
ctx, flowCtx, args, input, inputTypes,
ctx, flowCtx, sortArgs, input, inputTypes,
execinfrapb.Ordering{Columns: orderingCols},
0 /* matchLen */, maxNumberPartitions, spec.ProcessorID,
&execinfrapb.PostProcessSpec{}, monitorNamePrefix+"-")
},
args.TestingKnobs.NumForcedRepartitions,
args.TestingKnobs.DelegateFDAcquisitions,
)
},
args.TestingKnobs.SpillingCallbackFn,
Expand Down
83 changes: 73 additions & 10 deletions pkg/sql/colexec/external_hash_joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,24 @@ const (
// externalHJDiskQueuesMemFraction determines the fraction of the available
// RAM that is allocated for the in-memory cache of disk queues.
externalHJDiskQueuesMemFraction = 0.5
// We need at least two buckets per side to make progress. However, the
// minimum number of partitions necessary are the partitions in use during a
// fallback to sort and merge join. We'll be using the minimum necessary per
// input + 2 (1 for each spilling queue that the merge joiner uses). For
// clarity this is what happens:
// - The 2 partitions that need to be sorted + merged will use an FD each: 2
// FDs. Meanwhile, each sorter will use up to externalSorterMinPartitions to
// sort and partition this input. At this stage 2 + 2 *
// externalMinPartitions FDs are used.
// - Once the inputs (the hash joiner partitions) are finished, both FDs will
// be released. The merge joiner will now be in use, which uses two
// spillingQueues with 1 FD each for a total of 2. Since each sorter will
// use externalSorterMinPartitions, the FDs used at this stage are 2 +
// (2 * externalSorterMinPartitions) as well. Note that as soon as the
// sorter emits its first batch, it must be the case that the input to it
// has returned a zero batch, and thus the FD has been closed.
sortMergeNonSortMinFDsOpen = 2
externalHJMinPartitions = sortMergeNonSortMinFDsOpen + (externalSorterMinPartitions * 2)
)

// externalHashJoiner is an operator that performs Grace hash join algorithm
Expand Down Expand Up @@ -141,6 +159,12 @@ type externalHashJoiner struct {
spec hashJoinerSpec
diskQueueCfg colcontainer.DiskQueueCfg

// fdState is used to acquire file descriptors up front.
fdState struct {
fdSemaphore semaphore.Semaphore
acquiredFDs int
}

// Partitioning phase variables.
leftPartitioner colcontainer.PartitionedQueue
rightPartitioner colcontainer.PartitionedQueue
Expand Down Expand Up @@ -202,6 +226,11 @@ type externalHashJoiner struct {
// is forced to recursively repartition (even if it is otherwise not
// needed) before it proceeds to actual join partitions.
numForcedRepartitions int
// delegateFDAcquisitions, if true, means that a test wants to force the
// PartitionedDiskQueues to track the number of file descriptors the hash
// joiner will open/close. This disables the default behavior of acquiring
// all file descriptors up front in Next.
delegateFDAcquisitions bool
}
}

Expand All @@ -227,6 +256,9 @@ const (
// - numForcedRepartitions is a number of times that the external hash joiner
// is forced to recursively repartition (even if it is otherwise not needed).
// This should be non-zero only in tests.
// - delegateFDAcquisitions specifies whether the external hash joiner should
// let the partitioned disk queues acquire file descriptors instead of acquiring
// them up front in Next. Should be true only in tests.
func newExternalHashJoiner(
unlimitedAllocator *Allocator,
spec hashJoinerSpec,
Expand All @@ -236,15 +268,27 @@ func newExternalHashJoiner(
fdSemaphore semaphore.Semaphore,
createReusableDiskBackedSorter func(input Operator, inputTypes []coltypes.T, orderingCols []execinfrapb.Ordering_Column, maxNumberPartitions int) (Operator, error),
numForcedRepartitions int,
delegateFDAcquisitions bool,
) Operator {
if diskQueueCfg.CacheMode != colcontainer.DiskQueueCacheModeClearAndReuseCache {
execerror.VectorizedInternalPanic(errors.Errorf("external hash joiner instantiated with suboptimal disk queue cache mode: %d", diskQueueCfg.CacheMode))
}
leftPartitioner := colcontainer.NewPartitionedDiskQueue(spec.left.sourceTypes, diskQueueCfg, fdSemaphore, colcontainer.PartitionerStrategyDefault)
partitionedDiskQueueSemaphore := fdSemaphore
if !delegateFDAcquisitions {
// To avoid deadlocks with other disk queues, we manually attempt to acquire
// the maximum number of descriptors all at once in Next. Passing in a nil
// semaphore indicates that the caller will do the acquiring.
partitionedDiskQueueSemaphore = nil
}
leftPartitioner := colcontainer.NewPartitionedDiskQueue(
spec.left.sourceTypes, diskQueueCfg, partitionedDiskQueueSemaphore, colcontainer.PartitionerStrategyDefault,
)
leftJoinerInput := newPartitionerToOperator(
unlimitedAllocator, spec.left.sourceTypes, leftPartitioner, 0, /* partitionIdx */
)
rightPartitioner := colcontainer.NewPartitionedDiskQueue(spec.right.sourceTypes, diskQueueCfg, fdSemaphore, colcontainer.PartitionerStrategyDefault)
rightPartitioner := colcontainer.NewPartitionedDiskQueue(
spec.right.sourceTypes, diskQueueCfg, partitionedDiskQueueSemaphore, colcontainer.PartitionerStrategyDefault,
)
rightJoinerInput := newPartitionerToOperator(
unlimitedAllocator, spec.right.sourceTypes, rightPartitioner, 0, /* partitionIdx */
)
Expand All @@ -261,9 +305,8 @@ func newExternalHashJoiner(
maxNumberActivePartitions = numDiskQueuesThatFit
}
}
if maxNumberActivePartitions < 4 {
// We need at least two buckets per side to make progress.
maxNumberActivePartitions = 4
if maxNumberActivePartitions < externalHJMinPartitions {
maxNumberActivePartitions = externalHJMinPartitions
}
makeOrderingCols := func(eqCols []uint32) []execinfrapb.Ordering_Column {
res := make([]execinfrapb.Ordering_Column, len(eqCols))
Expand All @@ -272,10 +315,17 @@ func newExternalHashJoiner(
}
return res
}
// We need to allocate 2 FDs for the merge joiner plus 2 FDs for reading
// the partitions that we need to join using sort + merge join strategy, and
// all others are divided between the two inputs.
externalSorterMaxNumberPartitions := (maxNumberActivePartitions - 4) / 2
// We need to allocate 2 FDs for reading the partitions (reused by the merge
// joiner) that we need to join using sort + merge join strategy, and all
// others are divided between the two inputs.
externalSorterMaxNumberPartitions := (maxNumberActivePartitions - sortMergeNonSortMinFDsOpen) / 2
if externalSorterMaxNumberPartitions < externalSorterMinPartitions {
// This code gets a maximum number of partitions based on the semaphore
// limit. In tests, this limit is set artificially low to catch any
// violations of the limit, resulting in possibly computing a low number of
// partitions for the sorter, which we overwrite here.
externalSorterMaxNumberPartitions = externalSorterMinPartitions
}
leftOrdering := makeOrderingCols(spec.left.eqCols)
leftPartitionSorter, err := createReusableDiskBackedSorter(
leftJoinerInput, spec.left.sourceTypes, leftOrdering, externalSorterMaxNumberPartitions,
Expand All @@ -292,7 +342,7 @@ func newExternalHashJoiner(
}
diskBackedSortMerge, err := newMergeJoinOp(
unlimitedAllocator, memoryLimit, diskQueueCfg,
fdSemaphore, spec.joinType, leftPartitionSorter, rightPartitionSorter,
partitionedDiskQueueSemaphore, spec.joinType, leftPartitionSorter, rightPartitionSorter,
spec.left.sourceTypes, spec.right.sourceTypes, leftOrdering, rightOrdering,
)
if err != nil {
Expand Down Expand Up @@ -321,6 +371,7 @@ func newExternalHashJoiner(
).(*hashJoiner),
diskBackedSortMerge: diskBackedSortMerge,
}
ehj.fdState.fdSemaphore = fdSemaphore
// To simplify the accounting, we will assume that the in-memory hash
// joiner's memory usage is equal to the size of the right partition to be
// joined (which will be fully buffered). This is an underestimate because a
Expand All @@ -346,6 +397,7 @@ func newExternalHashJoiner(
ehj.recursiveScratch.rightBatch = unlimitedAllocator.NewMemBatchNoCols(spec.right.sourceTypes, 0 /* size */)
}
ehj.testingKnobs.numForcedRepartitions = numForcedRepartitions
ehj.testingKnobs.delegateFDAcquisitions = delegateFDAcquisitions
return ehj
}

Expand Down Expand Up @@ -453,6 +505,13 @@ StateChanged:
hj.state = externalHJJoinNewPartition
continue
}
if !hj.testingKnobs.delegateFDAcquisitions && hj.fdState.acquiredFDs == 0 {
toAcquire := hj.maxNumberActivePartitions
if err := hj.fdState.fdSemaphore.Acquire(ctx, toAcquire); err != nil {
execerror.VectorizedInternalPanic(err)
}
hj.fdState.acquiredFDs = toAcquire
}
hj.partitionBatch(ctx, leftBatch, leftSide, math.MaxInt64)
hj.partitionBatch(ctx, rightBatch, rightSide, math.MaxInt64)

Expand Down Expand Up @@ -654,6 +713,10 @@ func (hj *externalHashJoiner) Close() error {
retErr = err
}
}
if !hj.testingKnobs.delegateFDAcquisitions && hj.fdState.acquiredFDs > 0 {
hj.fdState.fdSemaphore.Release(hj.fdState.acquiredFDs)
hj.fdState.acquiredFDs = 0
}
hj.closed = true
return retErr
}
28 changes: 11 additions & 17 deletions pkg/sql/colexec/external_hash_joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/colcontainerutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/marusama/semaphore"
"github.com/stretchr/testify/require"
)

// externalHJTestMaxNumberPartitions specifies the minimum number of partitions
// that the maximum limit on the number of partitions for the external hash
// joiner can be set to. Here is the reasoning (note that we only describe a
// point in time when the operator will use the most number of files
// descriptors simultaneously - the operator is supposed to release FDs once it
// is done with the corresponding partitions): external hash joiner itself
// needs at least two partitions per side (i.e. we need at least 4), but during
// the fallback to sort + merge join we will be using 6 (external sorter on
// each side will use 2 to read its own partitions and 1 more to write out).
const externalHJTestMaxNumberPartitions = 6

func TestExternalHashJoiner(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand All @@ -60,13 +50,15 @@ func TestExternalHashJoiner(t *testing.T) {
memAccounts []*mon.BoundAccount
memMonitors []*mon.BytesMonitor
)
rng, _ := randutil.NewPseudoRand()
// Test the case in which the default memory is used as well as the case in
// which the joiner spills to disk.
for _, spillForced := range []bool{false, true} {
flowCtx.Cfg.TestingKnobs.ForceDiskSpill = spillForced
for _, tcs := range [][]joinTestCase{hjTestCases, mjTestCases} {
for _, tc := range tcs {
t.Run(fmt.Sprintf("spillForced=%t/%s", spillForced, tc.description), func(t *testing.T) {
delegateFDAcquisitions := rng.Float64() < 0.5
t.Run(fmt.Sprintf("spillForced=%t/%s/delegateFDAcquisitions=%t", spillForced, tc.description, delegateFDAcquisitions), func(t *testing.T) {
// Unfortunately, there is currently no better way to check that the
// external hash joiner does not have leftover file descriptors other
// than appending each semaphore used to this slice on construction.
Expand All @@ -92,12 +84,12 @@ func TestExternalHashJoiner(t *testing.T) {
tc.skipAllNullsInjection = true
}
runHashJoinTestCase(t, tc, func(sources []Operator) (Operator, error) {
sem := NewTestingSemaphore(externalHJTestMaxNumberPartitions)
sem := NewTestingSemaphore(externalHJMinPartitions)
semsToCheck = append(semsToCheck, sem)
spec := createSpecForHashJoiner(tc)
hjOp, accounts, monitors, err := createDiskBackedHashJoiner(
ctx, flowCtx, spec, sources, func() {}, queueCfg,
2 /* numForcedPartitions */, sem,
2 /* numForcedPartitions */, delegateFDAcquisitions, sem,
)
memAccounts = append(memAccounts, accounts...)
memMonitors = append(memMonitors, monitors...)
Expand Down Expand Up @@ -159,8 +151,8 @@ func TestExternalHashJoinerFallbackToSortMergeJoin(t *testing.T) {
defer cleanup()
hj, accounts, monitors, err := createDiskBackedHashJoiner(
ctx, flowCtx, spec, []Operator{leftSource, rightSource},
func() { spilled = true }, queueCfg, 0, /* numForcedRepartitions */
NewTestingSemaphore(externalHJTestMaxNumberPartitions),
func() { spilled = true }, queueCfg, 0 /* numForcedRepartitions */, true, /* delegateFDAcquisitions */
NewTestingSemaphore(externalHJMinPartitions),
)
defer func() {
for _, memAccount := range accounts {
Expand Down Expand Up @@ -258,7 +250,7 @@ func BenchmarkExternalHashJoiner(b *testing.B) {
rightSource.reset(nBatches)
hj, accounts, monitors, err := createDiskBackedHashJoiner(
ctx, flowCtx, spec, []Operator{leftSource, rightSource},
func() {}, queueCfg, 0, /* numForcedRepartitions */
func() {}, queueCfg, 0 /* numForcedRepartitions */, false, /* delegateFDAcquisitions */
NewTestingSemaphore(VecMaxOpenFDsLimit),
)
memAccounts = append(memAccounts, accounts...)
Expand Down Expand Up @@ -294,6 +286,7 @@ func createDiskBackedHashJoiner(
spillingCallbackFn func(),
diskQueueCfg colcontainer.DiskQueueCfg,
numForcedRepartitions int,
delegateFDAcquisitions bool,
testingSemaphore semaphore.Semaphore,
) (Operator, []*mon.BoundAccount, []*mon.BytesMonitor, error) {
args := NewColOperatorArgs{
Expand All @@ -308,6 +301,7 @@ func createDiskBackedHashJoiner(
// flowCtx.
args.TestingKnobs.SpillingCallbackFn = spillingCallbackFn
args.TestingKnobs.NumForcedRepartitions = numForcedRepartitions
args.TestingKnobs.DelegateFDAcquisitions = delegateFDAcquisitions
result, err := NewColOperator(ctx, flowCtx, args)
return result.Op, result.BufferingOpMemAccounts, result.BufferingOpMemMonitors, err
}
Loading