Skip to content

Commit

Permalink
Merge #45892
Browse files Browse the repository at this point in the history
45892: colexec: request file descriptors up front in external sorts/joins r=yuzefovich a=asubiotto

Release justification: fix for high-severity bug in existing functionality. The
current methodology of requesting file descriptors one at a time could lead to
a potential deadlock when spilling many queries to disk at the same time.

These operators would previously call semaphore.Acquire as needed. This could
lead to a situation where concurrent sorts/joins could block since they could
potentially deadlock Acquiring a file descriptor and never release ones
already held.

Now, sorts and joins that spill to disk will acquire all necessary resources
before spilling to disk.

Release note: None (no release with this potential issue)

Fixes #45602

Co-authored-by: Alfonso Subiotto Marques <[email protected]>
  • Loading branch information
craig[bot] and asubiotto committed Mar 14, 2020
2 parents f5585e9 + 63c23b9 commit 79c470b
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 63 deletions.
16 changes: 13 additions & 3 deletions pkg/sql/colcontainer/partitionedqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ type PartitionedDiskQueue struct {
// enqueueing to a new partition. Each new partition will use
// cfg.BufferSizeBytes, so memory usage may increase in an unbounded fashion if
// used unmethodically. The file descriptors are acquired through fdSemaphore.
// If fdSemaphore is nil, the partitioned disk queue will not Acquire or Release
// file descriptors. Do this if the caller knows that it will use a constant
// maximum number of file descriptors and wishes to acquire these up front.
// Note that actual file descriptors open may be less than, but never more than
// the number acquired through the semaphore.
func NewPartitionedDiskQueue(
Expand Down Expand Up @@ -171,7 +174,7 @@ func (p *PartitionedDiskQueue) closeWritePartition(
if err := p.partitions[idx].Enqueue(coldata.ZeroBatch); err != nil {
return err
}
if releaseFDOption == releaseFD {
if releaseFDOption == releaseFD && p.fdSemaphore != nil {
p.fdSemaphore.Release(1)
p.numOpenFDs--
}
Expand All @@ -186,13 +189,18 @@ func (p *PartitionedDiskQueue) closeReadPartition(idx int) error {
if err := p.partitions[idx].CloseRead(); err != nil {
return err
}
p.fdSemaphore.Release(1)
p.numOpenFDs--
if p.fdSemaphore != nil {
p.fdSemaphore.Release(1)
p.numOpenFDs--
}
p.partitions[idx].state = partitionStateClosedForReading
return nil
}

func (p *PartitionedDiskQueue) acquireNewFD(ctx context.Context) error {
if p.fdSemaphore == nil {
return nil
}
if err := p.fdSemaphore.Acquire(ctx, 1); err != nil {
return err
}
Expand Down Expand Up @@ -370,6 +378,8 @@ func (p *PartitionedDiskQueue) Close() error {
p.partitions[i].state = partitionStatePermanentlyClosed
}
if p.numOpenFDs != 0 {
// Note that if p.numOpenFDs is non-zero, it must be the case that
// fdSemaphore is non-nil.
p.fdSemaphore.Release(p.numOpenFDs)
p.numOpenFDs = 0
}
Expand Down
19 changes: 18 additions & 1 deletion pkg/sql/colexec/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ type NewColOperatorArgs struct {
// sorter it is merging already created partitions into new one before
// proceeding to the next partition from the input).
NumForcedRepartitions int
// DelegateFDAcquisitions should be observed by users of a
// PartitionedDiskQueue. During normal operations, these should acquire the
// maximum number of file descriptors they will use from FDSemaphore up
// front. Setting this testing knob to true disables that behavior and
// lets the PartitionedDiskQueue interact with the semaphore as partitions
// are opened/closed, which ensures that the number of open files never
// exceeds what is expected.
DelegateFDAcquisitions bool
}
}

Expand Down Expand Up @@ -395,6 +403,7 @@ func (r *NewColOperatorResult) createDiskBackedSort(
input, inputTypes, ordering,
execinfra.GetWorkMemLimit(flowCtx.Cfg),
maxNumberPartitions,
args.TestingKnobs.DelegateFDAcquisitions,
diskQueueCfg,
args.FDSemaphore,
)
Expand Down Expand Up @@ -786,13 +795,21 @@ func NewColOperator(
diskQueueCfg,
args.FDSemaphore,
func(input Operator, inputTypes []coltypes.T, orderingCols []execinfrapb.Ordering_Column, maxNumberPartitions int) (Operator, error) {
sortArgs := args
if !args.TestingKnobs.DelegateFDAcquisitions {
// Set the FDSemaphore to nil. This indicates that no FDs
// should be acquired. The external hash joiner will do this
// up front.
sortArgs.FDSemaphore = nil
}
return result.createDiskBackedSort(
ctx, flowCtx, args, input, inputTypes,
ctx, flowCtx, sortArgs, input, inputTypes,
execinfrapb.Ordering{Columns: orderingCols},
0 /* matchLen */, maxNumberPartitions, spec.ProcessorID,
&execinfrapb.PostProcessSpec{}, monitorNamePrefix+"-")
},
args.TestingKnobs.NumForcedRepartitions,
args.TestingKnobs.DelegateFDAcquisitions,
)
},
args.TestingKnobs.SpillingCallbackFn,
Expand Down
83 changes: 73 additions & 10 deletions pkg/sql/colexec/external_hash_joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,24 @@ const (
// externalHJDiskQueuesMemFraction determines the fraction of the available
// RAM that is allocated for the in-memory cache of disk queues.
externalHJDiskQueuesMemFraction = 0.5
// We need at least two buckets per side to make progress. However, the
// minimum number of partitions necessary are the partitions in use during a
// fallback to sort and merge join. We'll be using the minimum necessary per
// input + 2 (1 for each spilling queue that the merge joiner uses). For
// clarity this is what happens:
// - The 2 partitions that need to be sorted + merged will use an FD each: 2
// FDs. Meanwhile, each sorter will use up to externalSorterMinPartitions to
// sort and partition this input. At this stage 2 + 2 *
// externalMinPartitions FDs are used.
// - Once the inputs (the hash joiner partitions) are finished, both FDs will
// be released. The merge joiner will now be in use, which uses two
// spillingQueues with 1 FD each for a total of 2. Since each sorter will
// use externalSorterMinPartitions, the FDs used at this stage are 2 +
// (2 * externalSorterMinPartitions) as well. Note that as soon as the
// sorter emits its first batch, it must be the case that the input to it
// has returned a zero batch, and thus the FD has been closed.
sortMergeNonSortMinFDsOpen = 2
externalHJMinPartitions = sortMergeNonSortMinFDsOpen + (externalSorterMinPartitions * 2)
)

// externalHashJoiner is an operator that performs Grace hash join algorithm
Expand Down Expand Up @@ -141,6 +159,12 @@ type externalHashJoiner struct {
spec hashJoinerSpec
diskQueueCfg colcontainer.DiskQueueCfg

// fdState is used to acquire file descriptors up front.
fdState struct {
fdSemaphore semaphore.Semaphore
acquiredFDs int
}

// Partitioning phase variables.
leftPartitioner colcontainer.PartitionedQueue
rightPartitioner colcontainer.PartitionedQueue
Expand Down Expand Up @@ -202,6 +226,11 @@ type externalHashJoiner struct {
// is forced to recursively repartition (even if it is otherwise not
// needed) before it proceeds to actual join partitions.
numForcedRepartitions int
// delegateFDAcquisitions, if true, means that a test wants to force the
// PartitionedDiskQueues to track the number of file descriptors the hash
// joiner will open/close. This disables the default behavior of acquiring
// all file descriptors up front in Next.
delegateFDAcquisitions bool
}
}

Expand All @@ -227,6 +256,9 @@ const (
// - numForcedRepartitions is a number of times that the external hash joiner
// is forced to recursively repartition (even if it is otherwise not needed).
// This should be non-zero only in tests.
// - delegateFDAcquisitions specifies whether the external hash joiner should
// let the partitioned disk queues acquire file descriptors instead of acquiring
// them up front in Next. Should be true only in tests.
func newExternalHashJoiner(
unlimitedAllocator *Allocator,
spec hashJoinerSpec,
Expand All @@ -236,15 +268,27 @@ func newExternalHashJoiner(
fdSemaphore semaphore.Semaphore,
createReusableDiskBackedSorter func(input Operator, inputTypes []coltypes.T, orderingCols []execinfrapb.Ordering_Column, maxNumberPartitions int) (Operator, error),
numForcedRepartitions int,
delegateFDAcquisitions bool,
) Operator {
if diskQueueCfg.CacheMode != colcontainer.DiskQueueCacheModeClearAndReuseCache {
execerror.VectorizedInternalPanic(errors.Errorf("external hash joiner instantiated with suboptimal disk queue cache mode: %d", diskQueueCfg.CacheMode))
}
leftPartitioner := colcontainer.NewPartitionedDiskQueue(spec.left.sourceTypes, diskQueueCfg, fdSemaphore, colcontainer.PartitionerStrategyDefault)
partitionedDiskQueueSemaphore := fdSemaphore
if !delegateFDAcquisitions {
// To avoid deadlocks with other disk queues, we manually attempt to acquire
// the maximum number of descriptors all at once in Next. Passing in a nil
// semaphore indicates that the caller will do the acquiring.
partitionedDiskQueueSemaphore = nil
}
leftPartitioner := colcontainer.NewPartitionedDiskQueue(
spec.left.sourceTypes, diskQueueCfg, partitionedDiskQueueSemaphore, colcontainer.PartitionerStrategyDefault,
)
leftJoinerInput := newPartitionerToOperator(
unlimitedAllocator, spec.left.sourceTypes, leftPartitioner, 0, /* partitionIdx */
)
rightPartitioner := colcontainer.NewPartitionedDiskQueue(spec.right.sourceTypes, diskQueueCfg, fdSemaphore, colcontainer.PartitionerStrategyDefault)
rightPartitioner := colcontainer.NewPartitionedDiskQueue(
spec.right.sourceTypes, diskQueueCfg, partitionedDiskQueueSemaphore, colcontainer.PartitionerStrategyDefault,
)
rightJoinerInput := newPartitionerToOperator(
unlimitedAllocator, spec.right.sourceTypes, rightPartitioner, 0, /* partitionIdx */
)
Expand All @@ -261,9 +305,8 @@ func newExternalHashJoiner(
maxNumberActivePartitions = numDiskQueuesThatFit
}
}
if maxNumberActivePartitions < 4 {
// We need at least two buckets per side to make progress.
maxNumberActivePartitions = 4
if maxNumberActivePartitions < externalHJMinPartitions {
maxNumberActivePartitions = externalHJMinPartitions
}
makeOrderingCols := func(eqCols []uint32) []execinfrapb.Ordering_Column {
res := make([]execinfrapb.Ordering_Column, len(eqCols))
Expand All @@ -272,10 +315,17 @@ func newExternalHashJoiner(
}
return res
}
// We need to allocate 2 FDs for the merge joiner plus 2 FDs for reading
// the partitions that we need to join using sort + merge join strategy, and
// all others are divided between the two inputs.
externalSorterMaxNumberPartitions := (maxNumberActivePartitions - 4) / 2
// We need to allocate 2 FDs for reading the partitions (reused by the merge
// joiner) that we need to join using sort + merge join strategy, and all
// others are divided between the two inputs.
externalSorterMaxNumberPartitions := (maxNumberActivePartitions - sortMergeNonSortMinFDsOpen) / 2
if externalSorterMaxNumberPartitions < externalSorterMinPartitions {
// This code gets a maximum number of partitions based on the semaphore
// limit. In tests, this limit is set artificially low to catch any
// violations of the limit, resulting in possibly computing a low number of
// partitions for the sorter, which we overwrite here.
externalSorterMaxNumberPartitions = externalSorterMinPartitions
}
leftOrdering := makeOrderingCols(spec.left.eqCols)
leftPartitionSorter, err := createReusableDiskBackedSorter(
leftJoinerInput, spec.left.sourceTypes, leftOrdering, externalSorterMaxNumberPartitions,
Expand All @@ -292,7 +342,7 @@ func newExternalHashJoiner(
}
diskBackedSortMerge, err := newMergeJoinOp(
unlimitedAllocator, memoryLimit, diskQueueCfg,
fdSemaphore, spec.joinType, leftPartitionSorter, rightPartitionSorter,
partitionedDiskQueueSemaphore, spec.joinType, leftPartitionSorter, rightPartitionSorter,
spec.left.sourceTypes, spec.right.sourceTypes, leftOrdering, rightOrdering,
)
if err != nil {
Expand Down Expand Up @@ -321,6 +371,7 @@ func newExternalHashJoiner(
).(*hashJoiner),
diskBackedSortMerge: diskBackedSortMerge,
}
ehj.fdState.fdSemaphore = fdSemaphore
// To simplify the accounting, we will assume that the in-memory hash
// joiner's memory usage is equal to the size of the right partition to be
// joined (which will be fully buffered). This is an underestimate because a
Expand All @@ -346,6 +397,7 @@ func newExternalHashJoiner(
ehj.recursiveScratch.rightBatch = unlimitedAllocator.NewMemBatchNoCols(spec.right.sourceTypes, 0 /* size */)
}
ehj.testingKnobs.numForcedRepartitions = numForcedRepartitions
ehj.testingKnobs.delegateFDAcquisitions = delegateFDAcquisitions
return ehj
}

Expand Down Expand Up @@ -453,6 +505,13 @@ StateChanged:
hj.state = externalHJJoinNewPartition
continue
}
if !hj.testingKnobs.delegateFDAcquisitions && hj.fdState.acquiredFDs == 0 {
toAcquire := hj.maxNumberActivePartitions
if err := hj.fdState.fdSemaphore.Acquire(ctx, toAcquire); err != nil {
execerror.VectorizedInternalPanic(err)
}
hj.fdState.acquiredFDs = toAcquire
}
hj.partitionBatch(ctx, leftBatch, leftSide, math.MaxInt64)
hj.partitionBatch(ctx, rightBatch, rightSide, math.MaxInt64)

Expand Down Expand Up @@ -654,6 +713,10 @@ func (hj *externalHashJoiner) Close() error {
retErr = err
}
}
if !hj.testingKnobs.delegateFDAcquisitions && hj.fdState.acquiredFDs > 0 {
hj.fdState.fdSemaphore.Release(hj.fdState.acquiredFDs)
hj.fdState.acquiredFDs = 0
}
hj.closed = true
return retErr
}
28 changes: 11 additions & 17 deletions pkg/sql/colexec/external_hash_joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/colcontainerutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/marusama/semaphore"
"github.com/stretchr/testify/require"
)

// externalHJTestMaxNumberPartitions specifies the minimum number of partitions
// that the maximum limit on the number of partitions for the external hash
// joiner can be set to. Here is the reasoning (note that we only describe a
// point in time when the operator will use the most number of files
// descriptors simultaneously - the operator is supposed to release FDs once it
// is done with the corresponding partitions): external hash joiner itself
// needs at least two partitions per side (i.e. we need at least 4), but during
// the fallback to sort + merge join we will be using 6 (external sorter on
// each side will use 2 to read its own partitions and 1 more to write out).
const externalHJTestMaxNumberPartitions = 6

func TestExternalHashJoiner(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand All @@ -60,13 +50,15 @@ func TestExternalHashJoiner(t *testing.T) {
memAccounts []*mon.BoundAccount
memMonitors []*mon.BytesMonitor
)
rng, _ := randutil.NewPseudoRand()
// Test the case in which the default memory is used as well as the case in
// which the joiner spills to disk.
for _, spillForced := range []bool{false, true} {
flowCtx.Cfg.TestingKnobs.ForceDiskSpill = spillForced
for _, tcs := range [][]joinTestCase{hjTestCases, mjTestCases} {
for _, tc := range tcs {
t.Run(fmt.Sprintf("spillForced=%t/%s", spillForced, tc.description), func(t *testing.T) {
delegateFDAcquisitions := rng.Float64() < 0.5
t.Run(fmt.Sprintf("spillForced=%t/%s/delegateFDAcquisitions=%t", spillForced, tc.description, delegateFDAcquisitions), func(t *testing.T) {
// Unfortunately, there is currently no better way to check that the
// external hash joiner does not have leftover file descriptors other
// than appending each semaphore used to this slice on construction.
Expand All @@ -92,12 +84,12 @@ func TestExternalHashJoiner(t *testing.T) {
tc.skipAllNullsInjection = true
}
runHashJoinTestCase(t, tc, func(sources []Operator) (Operator, error) {
sem := NewTestingSemaphore(externalHJTestMaxNumberPartitions)
sem := NewTestingSemaphore(externalHJMinPartitions)
semsToCheck = append(semsToCheck, sem)
spec := createSpecForHashJoiner(tc)
hjOp, accounts, monitors, err := createDiskBackedHashJoiner(
ctx, flowCtx, spec, sources, func() {}, queueCfg,
2 /* numForcedPartitions */, sem,
2 /* numForcedPartitions */, delegateFDAcquisitions, sem,
)
memAccounts = append(memAccounts, accounts...)
memMonitors = append(memMonitors, monitors...)
Expand Down Expand Up @@ -159,8 +151,8 @@ func TestExternalHashJoinerFallbackToSortMergeJoin(t *testing.T) {
defer cleanup()
hj, accounts, monitors, err := createDiskBackedHashJoiner(
ctx, flowCtx, spec, []Operator{leftSource, rightSource},
func() { spilled = true }, queueCfg, 0, /* numForcedRepartitions */
NewTestingSemaphore(externalHJTestMaxNumberPartitions),
func() { spilled = true }, queueCfg, 0 /* numForcedRepartitions */, true, /* delegateFDAcquisitions */
NewTestingSemaphore(externalHJMinPartitions),
)
defer func() {
for _, memAccount := range accounts {
Expand Down Expand Up @@ -258,7 +250,7 @@ func BenchmarkExternalHashJoiner(b *testing.B) {
rightSource.reset(nBatches)
hj, accounts, monitors, err := createDiskBackedHashJoiner(
ctx, flowCtx, spec, []Operator{leftSource, rightSource},
func() {}, queueCfg, 0, /* numForcedRepartitions */
func() {}, queueCfg, 0 /* numForcedRepartitions */, false, /* delegateFDAcquisitions */
NewTestingSemaphore(VecMaxOpenFDsLimit),
)
memAccounts = append(memAccounts, accounts...)
Expand Down Expand Up @@ -294,6 +286,7 @@ func createDiskBackedHashJoiner(
spillingCallbackFn func(),
diskQueueCfg colcontainer.DiskQueueCfg,
numForcedRepartitions int,
delegateFDAcquisitions bool,
testingSemaphore semaphore.Semaphore,
) (Operator, []*mon.BoundAccount, []*mon.BytesMonitor, error) {
args := NewColOperatorArgs{
Expand All @@ -308,6 +301,7 @@ func createDiskBackedHashJoiner(
// flowCtx.
args.TestingKnobs.SpillingCallbackFn = spillingCallbackFn
args.TestingKnobs.NumForcedRepartitions = numForcedRepartitions
args.TestingKnobs.DelegateFDAcquisitions = delegateFDAcquisitions
result, err := NewColOperator(ctx, flowCtx, args)
return result.Op, result.BufferingOpMemAccounts, result.BufferingOpMemMonitors, err
}
Loading

0 comments on commit 79c470b

Please sign in to comment.