Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: request file descriptors up front in external sorts/joins #45892

Merged
merged 1 commit into from
Mar 14, 2020
Merged

colexec: request file descriptors up front in external sorts/joins #45892

merged 1 commit into from
Mar 14, 2020

Conversation

asubiotto
Copy link
Contributor

@asubiotto asubiotto commented Mar 9, 2020

Release justification: fix for high-severity bug in existing functionality. The
current methodology of requesting file descriptors one at a time could lead to
a potential deadlock when spilling many queries to disk at the same time.

These operators would previously call semaphore.Acquire as needed. This could
lead to a situation where concurrent sorts/joins could block since they could
potentially deadlock Acquiring a file descriptor and never release ones
already held.

Now, sorts and joins that spill to disk will acquire all necessary resources
before spilling to disk.

Release note: None (no release with this potential issue)

Fixes #45602

@asubiotto asubiotto requested review from yuzefovich and a team March 9, 2020 16:53
@cockroach-teamcity
Copy link
Member

This change is Reviewable

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 7 of 7 files at r1.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)


pkg/sql/colexec/external_hash_joiner.go, line 266 at r1 (raw file):

		}
	}
	if maxNumberActivePartitions < 4 {

We should probably now have a hard limit on maxNumberActivePartitions dependent on VecMaxOpenFDsLimit (maybe 1/8 of it?).


pkg/sql/colexec/external_sort.go, line 196 at r1 (raw file):

	// In order to make progress when merging we have to merge at least two
	// partitions.
	if maxNumberPartitions < 2 {

ditto for maxNumberPartitions.

@asubiotto
Copy link
Contributor Author

I'll wait for #45825 to go in before doing some more work on this one.

@bobvawter
Copy link
Contributor

bobvawter commented Mar 10, 2020

CLA assistant check
All committers have signed the CLA.

@asubiotto
Copy link
Contributor Author

Rebased and updated to limit the number of file descriptors a sorter requests. I'm slightly confused with our current external hash join partition limits and minimums, which I left as a TODO to discuss.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 7 of 7 files at r2.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)


pkg/sql/colexec/external_hash_joiner.go, line 266 at r1 (raw file):

Previously, yuzefovich wrote…

We should probably now have a hard limit on maxNumberActivePartitions dependent on VecMaxOpenFDsLimit (maybe 1/8 of it?).

Do you want to address this suggestion in this PR or in the follow-up?


pkg/sql/colexec/external_hash_joiner.go, line 104 at r2 (raw file):

	// fallback to sort and merge join. We'll be using the minimum necessary per
	// input + 4 (2 for each spilling queue that the merge joiner uses).
	// TODO(asubiotto): Is this right?

Yes, I believe the number is correct. I'd probably add a more verbose description of this computation, for example:

During the fallback, at minimum we can have the following:
- 2 FDs will be used to read from the partition of the external hash joiner that could not have been repartitioned, these are inputs to the externalSorters
- 2 * externalSorterMinPartitions FDs will be used by two externalSorters that are inputs into the merge joiner
- 2 FDs will be used by the merge joiner

pkg/sql/colexec/external_hash_joiner.go, line 217 at r2 (raw file):

		// needed) before it proceeds to actual join partitions.
		numForcedRepartitions int
		// delegateFDAcquisitions if true, means that a test wants to force the

nit: s/ if true,/, if true,/g.


pkg/sql/colexec/external_hash_joiner.go, line 249 at r2 (raw file):

// - delegateFDAcquisitions specifies whether the external hash joiner should
// let the partitioned disk queues acquire file descriptors instead of acquiring
// them up front in Next.

nit: I'd add that this should be true only in tests.


pkg/sql/colexec/external_hash_joiner.go, line 311 at r2 (raw file):

	externalSorterMaxNumberPartitions := (maxNumberActivePartitions - 4) / 2
	if externalSorterMaxNumberPartitions < externalSorterMinPartitions {
		externalSorterMaxNumberPartitions = externalSorterMinPartitions

I feel like we should panic in this case because we're upping maxNumberActivePartitions so that there is a least minimum number of FDs needed for each of the external sorters.


pkg/sql/colexec/external_hash_joiner.go, line 335 at r2 (raw file):

		execerror.VectorizedInternalPanic(err)
	}
	fmt.Println("max number of active partitions is", maxNumberActivePartitions)

Leftover logging.


pkg/sql/colexec/external_hash_joiner.go, line 702 at r2 (raw file):

	}
	if !hj.testingKnobs.delegateFDAcquisitions && hj.fdState.acquiredFDs > 0 {
		hj.fdState.fdSemaphore.Release(hj.fdState.acquiredFDs)

nit: for completeness, we probably should do hj.fdState.acquiredFDs = 0.


pkg/sql/colexec/external_hash_joiner_test.go, line 43 at r2 (raw file):

// the fallback to sort + merge join we will be using 6 (external sorter on
// each side will use 2 to read its own partitions and 1 more to write out).
// TODO(asubiotto): It would probably be good to discuss this again.

I think this comment is incorrect. I now believe that we need, in theory, the minimum should be 10. I think we should remove this constant and use externalHJMinPartitions in the tests.


pkg/sql/colexec/external_sort.go, line 137 at r2 (raw file):

	testingKnobs struct {
		// delegateFDAcquisitions if true, means that a test wants to force the

nit: s/ if/, if/g.


pkg/sql/colexec/external_sort.go, line 163 at r2 (raw file):

// - delegateFDAcquisitions specifies whether the external sorter should let
// the partitioned disk queue acquire file descriptors instead of acquiring
// them up front in Next.

ditto for the "tests only".


pkg/sql/colexec/external_sort.go, line 198 at r2 (raw file):

		// external sorter
		// TODO(asubiotto): this number should be tuned.
		fmt.Println("maxNumberPartitions", maxNumberPartitions)

Leftover logging.


pkg/sql/colexec/external_sort.go, line 199 at r2 (raw file):

		// TODO(asubiotto): this number should be tuned.
		fmt.Println("maxNumberPartitions", maxNumberPartitions)
		maxPartitionsAllowed := fdSemaphore.GetLimit() / 16

nit: we probably should now extract this computation into a function used by both sorter and joiner.


pkg/sql/colexec/external_sort.go, line 365 at r2 (raw file):

	}
	if !s.testingKnobs.delegateFDAcquisitions && s.fdState.fdSemaphore != nil && s.fdState.acquiredFDs > 0 {
		s.fdState.fdSemaphore.Release(s.fdState.acquiredFDs)

ditto to zero out .acquiredFDs.


pkg/sql/colexec/external_sort_test.go, line 152 at r2 (raw file):

		memMonitors []*mon.BytesMonitor
	)
	const maxNumberPartitions = 3

nit: these constants should probably be removed in favor of externalSorterMinPartitions.

Copy link
Contributor Author

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)


pkg/sql/colexec/external_hash_joiner.go, line 266 at r1 (raw file):

Previously, yuzefovich wrote…

Do you want to address this suggestion in this PR or in the follow-up?

fdSemaphore.GetLimit()/16 above is what sets the maximum. I think this is fine, no?


pkg/sql/colexec/external_hash_joiner.go, line 104 at r2 (raw file):

Previously, yuzefovich wrote…

Yes, I believe the number is correct. I'd probably add a more verbose description of this computation, for example:

During the fallback, at minimum we can have the following:
- 2 FDs will be used to read from the partition of the external hash joiner that could not have been repartitioned, these are inputs to the externalSorters
- 2 * externalSorterMinPartitions FDs will be used by two externalSorters that are inputs into the merge joiner
- 2 FDs will be used by the merge joiner

Done. It seems like the correct number, but I think the merge joiner will use 4 FDs since it uses 2 spilling queues, right?


pkg/sql/colexec/external_hash_joiner.go, line 311 at r2 (raw file):

Previously, yuzefovich wrote…

I feel like we should panic in this case because we're upping maxNumberActivePartitions so that there is a least minimum number of FDs needed for each of the external sorters.

Hmm, this doesn't go hand in hand with using the semaphore limit to set a maximum on the number of active partitions, since tests will set this limit low to ensure the hash joiner doesn't exceed the limit. This results in passing in a limit of 0 to the sorter with a nil fdSemaphore. I don't do a nil check there because we should never pass in a limit of 0 to a sorter if we account for its maximum number of partitions. In practice, this if statement only happens in tests, so I'm fine leaving as is if you are.


pkg/sql/colexec/external_hash_joiner.go, line 335 at r2 (raw file):

Previously, yuzefovich wrote…

Leftover logging.

Done.


pkg/sql/colexec/external_hash_joiner_test.go, line 43 at r2 (raw file):

Previously, yuzefovich wrote…

I think this comment is incorrect. I now believe that we need, in theory, the minimum should be 10. I think we should remove this constant and use externalHJMinPartitions in the tests.

Done.


pkg/sql/colexec/external_sort.go, line 196 at r1 (raw file):

Previously, yuzefovich wrote…

ditto for maxNumberPartitions.

Replied there.


pkg/sql/colexec/external_sort.go, line 198 at r2 (raw file):

Previously, yuzefovich wrote…

Leftover logging.

Done.


pkg/sql/colexec/external_sort.go, line 199 at r2 (raw file):

Previously, yuzefovich wrote…

nit: we probably should now extract this computation into a function used by both sorter and joiner.

I don't think it's worth it. If we tune this, the numbers will probably be different so we'll have to change this anyway.


pkg/sql/colexec/external_sort.go, line 365 at r2 (raw file):

Previously, yuzefovich wrote…

ditto to zero out .acquiredFDs.

Done.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 4 of 4 files at r3.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)


pkg/sql/colexec/external_hash_joiner.go, line 266 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

fdSemaphore.GetLimit()/16 above is what sets the maximum. I think this is fine, no?

Oh yeah, I missed that.


pkg/sql/colexec/external_hash_joiner.go, line 104 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Done. It seems like the correct number, but I think the merge joiner will use 4 FDs since it uses 2 spilling queues, right?

Merge joiner follows the pattern of "write all data, then read all data", so each queue should be using 1 FD at any point. (Probably merge joiner should use "close on new partition" strategy.) However, as I said in the comment above, we should not be forgetting about the FD to read from the partition that we are joining (but it is outside of merge joiner's knowledge that we use a FD there).


pkg/sql/colexec/external_hash_joiner.go, line 311 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Hmm, this doesn't go hand in hand with using the semaphore limit to set a maximum on the number of active partitions, since tests will set this limit low to ensure the hash joiner doesn't exceed the limit. This results in passing in a limit of 0 to the sorter with a nil fdSemaphore. I don't do a nil check there because we should never pass in a limit of 0 to a sorter if we account for its maximum number of partitions. In practice, this if statement only happens in tests, so I'm fine leaving as is if you are.

SGTM. I'd add a comment here then that this if is in-place to handle artificially low FDs limits set by tests.


pkg/sql/colexec/external_hash_joiner.go, line 335 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Done.

I still see it :)


pkg/sql/colexec/external_hash_joiner.go, line 109 at r3 (raw file):

	//   sort and partition this input. At this stage 2 + 2 *
	//   externalMinPartitions FDs are used.
	// - Once the inputs (the hash joiner partitions) are finished, both FDs will

I think it is possible that the hash joiner partitions' FDs are not released when the merge joiner kicks in. Consider the following scenario (looking only at one side):

  • merge joiner requests the first batch from the external sorter (when it gets the batch, it will be written into the spilling queue) + 1 FD
  • external sorter requests batches from the hash joiner partition and does its job. At some point, however, the external sorter needed to do recursive merging, and at that point it was using all of its available FDs. + externalSorterMinPartitions FDs
  • the hash joiner partition is being read + 1 FD.

However, while typing out this comment, I realized that the merge joiner will start writing into its spilling queue only when it receives the first batch from the external sorter, and at that point, the hash joiner partition has been fully consumed, and the corresponding FD should have been released. I.e. we actually never use more than 1 + externalSorterMinPartitions number of FDs per side, so externalHJMinPartitions should be 2 + 2 * externalSorterMinPartitions.


pkg/sql/colexec/external_sort.go, line 163 at r2 (raw file):

Previously, yuzefovich wrote…

ditto for the "tests only".

Ping.


pkg/sql/colexec/external_sort.go, line 198 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Done.

Still see it.


pkg/sql/colexec/external_sort.go, line 199 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

I don't think it's worth it. If we tune this, the numbers will probably be different so we'll have to change this anyway.

Ok.


pkg/sql/colexec/external_sort.go, line 216 at r3 (raw file):

		OneInputNode:       NewOneInputNode(inMemSorter),
		unlimitedAllocator: unlimitedAllocator,
		memoryLimit:        memoryLimit,

nit: external sorter doesn't need to store memoryLimit.

Copy link
Contributor Author

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)


pkg/sql/colexec/external_hash_joiner.go, line 104 at r2 (raw file):

Previously, yuzefovich wrote…

Merge joiner follows the pattern of "write all data, then read all data", so each queue should be using 1 FD at any point. (Probably merge joiner should use "close on new partition" strategy.) However, as I said in the comment above, we should not be forgetting about the FD to read from the partition that we are joining (but it is outside of merge joiner's knowledge that we use a FD there).

Just checking, is this comment void now after the However, while typing out this comment? I responded re the 1FD above, it uses only 1, but actually acquires 2 right now (since there is no enforcing of the fact that all writes happen before all reads in the spilling queue, we should probably fix that)


pkg/sql/colexec/external_hash_joiner.go, line 311 at r2 (raw file):

Previously, yuzefovich wrote…

SGTM. I'd add a comment here then that this if is in-place to handle artificially low FDs limits set by tests.

Done.


pkg/sql/colexec/external_hash_joiner.go, line 335 at r2 (raw file):

Previously, yuzefovich wrote…

I still see it :)

Oops, thanks for catching


pkg/sql/colexec/external_hash_joiner.go, line 109 at r3 (raw file):

Previously, yuzefovich wrote…

I think it is possible that the hash joiner partitions' FDs are not released when the merge joiner kicks in. Consider the following scenario (looking only at one side):

  • merge joiner requests the first batch from the external sorter (when it gets the batch, it will be written into the spilling queue) + 1 FD
  • external sorter requests batches from the hash joiner partition and does its job. At some point, however, the external sorter needed to do recursive merging, and at that point it was using all of its available FDs. + externalSorterMinPartitions FDs
  • the hash joiner partition is being read + 1 FD.

However, while typing out this comment, I realized that the merge joiner will start writing into its spilling queue only when it receives the first batch from the external sorter, and at that point, the hash joiner partition has been fully consumed, and the corresponding FD should have been released. I.e. we actually never use more than 1 + externalSorterMinPartitions number of FDs per side, so externalHJMinPartitions should be 2 + 2 * externalSorterMinPartitions.

Right regarding that the FD is released. But note that each spilling queue, even though it only opens 1 FD, Acquires 2 of them (which is what needs to be reflected here), which is why it's 4 + 2 * externalSorterMinPartitions.


pkg/sql/colexec/external_sort.go, line 163 at r2 (raw file):

Previously, yuzefovich wrote…

Ping.

Done.


pkg/sql/colexec/external_sort.go, line 198 at r2 (raw file):

Previously, yuzefovich wrote…

Still see it.

I don't think it's there anymore.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 2 files at r4.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)


pkg/sql/colexec/external_hash_joiner.go, line 104 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Just checking, is this comment void now after the However, while typing out this comment? I responded re the 1FD above, it uses only 1, but actually acquires 2 right now (since there is no enforcing of the fact that all writes happen before all reads in the spilling queue, we should probably fix that)

Yeah, resolved.


pkg/sql/colexec/external_hash_joiner.go, line 109 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Right regarding that the FD is released. But note that each spilling queue, even though it only opens 1 FD, Acquires 2 of them (which is what needs to be reflected here), which is why it's 4 + 2 * externalSorterMinPartitions.

Oh, I see, I forgot about that. I guess we could introduce non-default mode for spilling queue so that it would acquire only 1 FD. I'll leave it up to you to decide whether it is important to add a TODO for this or we can just ignore this.


pkg/sql/colexec/external_sort.go, line 198 at r4 (raw file):

	}
	// Each disk queue will use up to BufferSizeBytes of RAM, so we reduce the
	// memoryLimit of the partitions to sort in memory by those cache sizes.will give

nit: "sizes.will"


pkg/sql/colexec/external_sort.go, line 203 at r4 (raw file):

	batchMemSize := estimateBatchSizeBytes(inputTypes, coldata.BatchSize())
	// Reserve a certain amount of memory for the partition caches.
	memoryLimit -= int64((maxNumberPartitions * diskQueueCfg.BufferSizeBytes) + batchMemSize)

At this point this update no longer matters. We need to move the creation of inputPartitioningOperator after this update.


pkg/sql/colexec/external_sort.go, line 205 at r4 (raw file):

	memoryLimit -= int64((maxNumberPartitions * diskQueueCfg.BufferSizeBytes) + batchMemSize)
	if memoryLimit < 0 {
		memoryLimit = 0

I think that we need to have memory limit of at least 1 byte (otherwise inputPartitioningOperator will only return zero-length batches, always).

Copy link
Contributor Author

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @yuzefovich)


pkg/sql/colexec/external_hash_joiner.go, line 109 at r3 (raw file):

Previously, yuzefovich wrote…

Oh, I see, I forgot about that. I guess we could introduce non-default mode for spilling queue so that it would acquire only 1 FD. I'll leave it up to you to decide whether it is important to add a TODO for this or we can just ignore this.

It was actually very easy to add (just check for whether the queue is rewindable or not) so added it.


pkg/sql/colexec/external_sort.go, line 203 at r4 (raw file):

Previously, yuzefovich wrote…

At this point this update no longer matters. We need to move the creation of inputPartitioningOperator after this update.

Done.


pkg/sql/colexec/external_sort.go, line 205 at r4 (raw file):

Previously, yuzefovich wrote…

I think that we need to have memory limit of at least 1 byte (otherwise inputPartitioningOperator will only return zero-length batches, always).

Done. This makes us hit what #46052 fixes a lot more easily.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 3 of 3 files at r5.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)


pkg/sql/colexec/spilling_queue.go, line 210 at r5 (raw file):

func (q *spillingQueue) numFDsOpenAtAnyGivenTime() int {
	if q.rewindable {

Merge joiner uses rewindable queue only on the right side, yet it follows the pattern "write all, then read all" on both sides. We probably need a separate knob for this pattern after all. (Or, alternatively, we could be instantiating a rewindable queue in the merge joiner on both sides, but using rewind() method only on the right.)

Copy link
Contributor Author

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @yuzefovich)


pkg/sql/colexec/spilling_queue.go, line 210 at r5 (raw file):

Previously, yuzefovich wrote…

Merge joiner uses rewindable queue only on the right side, yet it follows the pattern "write all, then read all" on both sides. We probably need a separate knob for this pattern after all. (Or, alternatively, we could be instantiating a rewindable queue in the merge joiner on both sides, but using rewind() method only on the right.)

The check is easy enough. If the underlying disk queue is not using a default cache mode then this pattern is enforced, so we can use that.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 1 of 1 files at r6.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @asubiotto)


pkg/sql/colexec/external_hash_joiner.go, line 320 at r6 (raw file):

	// the partitions that we need to join using sort + merge join strategy, and
	// all others are divided between the two inputs.
	externalSorterMaxNumberPartitions := (maxNumberActivePartitions - 4) / 2

s/4/2/g.

Maybe we should define a separate constant for this 2 so that we don't forget to update one of the places where this constant is used if the calculation changes for some reason?


pkg/sql/colexec/external_sort_test.go, line 89 at r6 (raw file):

						orderedVerifier,
						func(input []Operator) (Operator, error) {
							// A sorter should never exceed maxNumberPartitions+1, even during

nit: the comment needs updating.


pkg/sql/colexec/spilling_queue.go, line 210 at r5 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

The check is easy enough. If the underlying disk queue is not using a default cache mode then this pattern is enforced, so we can use that.

SGTM.

Copy link
Contributor Author

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @asubiotto and @yuzefovich)


pkg/sql/colexec/external_hash_joiner.go, line 320 at r6 (raw file):

Previously, yuzefovich wrote…

s/4/2/g.

Maybe we should define a separate constant for this 2 so that we don't forget to update one of the places where this constant is used if the calculation changes for some reason?

Done.

@asubiotto
Copy link
Contributor Author

bors r=yuzefovich

@craig
Copy link
Contributor

craig bot commented Mar 14, 2020

Build failed

Release justification: fix for high-severity bug in existing functionality. The
current methodology of requesting file descriptors one at a time could lead to
a potential deadlock when spilling many queries to disk at the same time.

These operators would previously call semaphore.Acquire as needed. This could
lead to a situation where concurrent sorts/joins could block since they could
potentially deadlock Acquiring a file descriptor and never release ones
already held.

Now, sorts and joins that spill to disk will acquire all necessary resources
before spilling to disk.

Release note: None (no release with this potential issue)
@asubiotto
Copy link
Contributor Author

A release justification was needed in the PR description as well. Updated.

bors r=yuzefovich

@craig
Copy link
Contributor

craig bot commented Mar 14, 2020

Build succeeded

@craig craig bot merged commit 79c470b into cockroachdb:master Mar 14, 2020
@asubiotto asubiotto deleted the ddlk branch March 16, 2020 09:14
yuzefovich added a commit to yuzefovich/cockroach that referenced this pull request Jul 13, 2022
This commit fixes a long-standing issue which could cause
memory-intensive queries to deadlock on acquiring the file descriptors
quota when vectorized execution spills to disk. This bug has been
present since the introduction of disk-spilling (over two and a half
years ago, introduced in cockroachdb#45318 and partially mitigated in cockroachdb#45892), but
we haven't seen this in any user reports, only in `tpch_concurrency`
roachtest runs, so the severity seems pretty minor.

Consider the following query plan:
```
   Node 1                   Node 2

TableReader              TableReader
    |                         |
HashRouter                HashRouter
    |     \  ___________ /    |
    |      \/__________       |
    |      /           \      |
HashAggregator         HashAggregator
```
and let's imagine that each hash aggregator has to spill to disk. This
would require acquiring the file descriptors quota. Now, imagine that
because of that hash aggregators' spilling, each of the hash routers has
slow outputs causing them to spill too. As a result, this query plan can
require `A + 2 * R` number of FDs of a single node to succeed where `A`
is the quota for a single hash aggregator (equal to 16 - with the
default value of `COCKROACH_VEC_MAX_OPEN_FDS` environment variable which
is 256) and `R` is the quota for a single router output (2). This means
that we can estimate that 20 FDs from each node are needed for the query
to finish execution with 16 FDs being acquired first.

Now imagine that this query is run with concurrency of 16. We can end up
in such a situation that all hash aggregators have spilled, fully
exhausting the global node limit on each node, so whenever the hash
router outputs need to spill, they block forever since no FDs will ever
be released, until a query is canceled or a node is shutdown. In other
words, we have a deadlock.

This commit fixes this situation by introducing a retry mechanism to
exponentially backoff when trying to acquire the FD quota, until a time
out. The randomizations provided by the `retry` package should be
sufficient so that some of the queries succeed while others result in
an error.

Unfortunately, I don't see a way to prevent this deadlock from occurring
in the first place without possible increase in latency in some case.
The difficult thing is that we currently acquire FDs only once we need
them, meaning once a particular component spills to disk. We could
acquire the maximum number of FDs that a query might need up-front,
before the query execution starts, but that could lead to starvation of
the queries that ultimately won't spill to disk. This seems like a much
worse impact than receiving timeout errors on some analytical queries
when run with high concurrency. We're not an OLAP database, so this
behavior seems ok.

Release note (bug fix): Previously, CockroachDB could deadlock when
evaluating analytical queries f multiple queries had to spill to disk
at the same time. This is now fixed by making some of the queries error
out instead.
yuzefovich added a commit to yuzefovich/cockroach that referenced this pull request Jul 14, 2022
This commit fixes a long-standing issue which could cause
memory-intensive queries to deadlock on acquiring the file descriptors
quota when vectorized execution spills to disk. This bug has been
present since the introduction of disk-spilling (over two and a half
years ago, introduced in cockroachdb#45318 and partially mitigated in cockroachdb#45892), but
we haven't seen this in any user reports, only in `tpch_concurrency`
roachtest runs, so the severity seems pretty minor.

Consider the following query plan:
```
   Node 1                   Node 2

TableReader              TableReader
    |                         |
HashRouter                HashRouter
    |     \  ___________ /    |
    |      \/__________       |
    |      /           \      |
HashAggregator         HashAggregator
```
and let's imagine that each hash aggregator has to spill to disk. This
would require acquiring the file descriptors quota. Now, imagine that
because of that hash aggregators' spilling, each of the hash routers has
slow outputs causing them to spill too. As a result, this query plan can
require `A + 2 * R` number of FDs of a single node to succeed where `A`
is the quota for a single hash aggregator (equal to 16 - with the
default value of `COCKROACH_VEC_MAX_OPEN_FDS` environment variable which
is 256) and `R` is the quota for a single router output (2). This means
that we can estimate that 20 FDs from each node are needed for the query
to finish execution with 16 FDs being acquired first.

Now imagine that this query is run with concurrency of 16. We can end up
in such a situation that all hash aggregators have spilled, fully
exhausting the global node limit on each node, so whenever the hash
router outputs need to spill, they block forever since no FDs will ever
be released, until a query is canceled or a node is shutdown. In other
words, we have a deadlock.

This commit fixes this situation by introducing a retry mechanism to
exponentially backoff when trying to acquire the FD quota, until a time
out. The randomizations provided by the `retry` package should be
sufficient so that some of the queries succeed while others result in
an error.

Unfortunately, I don't see a way to prevent this deadlock from occurring
in the first place without possible increase in latency in some case.
The difficult thing is that we currently acquire FDs only once we need
them, meaning once a particular component spills to disk. We could
acquire the maximum number of FDs that a query might need up-front,
before the query execution starts, but that could lead to starvation of
the queries that ultimately won't spill to disk. This seems like a much
worse impact than receiving timeout errors on some analytical queries
when run with high concurrency. We're not an OLAP database, so this
behavior seems ok.

Release note (bug fix): Previously, CockroachDB could deadlock when
evaluating analytical queries f multiple queries had to spill to disk
at the same time. This is now fixed by making some of the queries error
out instead.
yuzefovich added a commit to yuzefovich/cockroach that referenced this pull request Jul 14, 2022
This commit fixes a long-standing issue which could cause
memory-intensive queries to deadlock on acquiring the file descriptors
quota when vectorized execution spills to disk. This bug has been
present since the introduction of disk-spilling (over two and a half
years ago, introduced in cockroachdb#45318 and partially mitigated in cockroachdb#45892), but
we haven't seen this in any user reports, only in `tpch_concurrency`
roachtest runs, so the severity seems pretty minor.

Consider the following query plan:
```
   Node 1                   Node 2

TableReader              TableReader
    |                         |
HashRouter                HashRouter
    |     \  ___________ /    |
    |      \/__________       |
    |      /           \      |
HashAggregator         HashAggregator
```
and let's imagine that each hash aggregator has to spill to disk. This
would require acquiring the file descriptors quota. Now, imagine that
because of that hash aggregators' spilling, each of the hash routers has
slow outputs causing them to spill too. As a result, this query plan can
require `A + 2 * R` number of FDs of a single node to succeed where `A`
is the quota for a single hash aggregator (equal to 16 - with the
default value of `COCKROACH_VEC_MAX_OPEN_FDS` environment variable which
is 256) and `R` is the quota for a single router output (2). This means
that we can estimate that 20 FDs from each node are needed for the query
to finish execution with 16 FDs being acquired first.

Now imagine that this query is run with concurrency of 16. We can end up
in such a situation that all hash aggregators have spilled, fully
exhausting the global node limit on each node, so whenever the hash
router outputs need to spill, they block forever since no FDs will ever
be released, until a query is canceled or a node is shutdown. In other
words, we have a deadlock.

This commit fixes this situation by introducing a retry mechanism to
exponentially backoff when trying to acquire the FD quota, until a time
out. The randomizations provided by the `retry` package should be
sufficient so that some of the queries succeed while others result in
an error.

Unfortunately, I don't see a way to prevent this deadlock from occurring
in the first place without possible increase in latency in some case.
The difficult thing is that we currently acquire FDs only once we need
them, meaning once a particular component spills to disk. We could
acquire the maximum number of FDs that a query might need up-front,
before the query execution starts, but that could lead to starvation of
the queries that ultimately won't spill to disk. This seems like a much
worse impact than receiving timeout errors on some analytical queries
when run with high concurrency. We're not an OLAP database, so this
behavior seems ok.

Release note (bug fix): Previously, CockroachDB could deadlock when
evaluating analytical queries f multiple queries had to spill to disk
at the same time. This is now fixed by making some of the queries error
out instead.
craig bot pushed a commit that referenced this pull request Jul 14, 2022
84324: sqlsmith: make order-dependent aggregation functions deterministic r=msirek,mgartner a=michae2

**cmd: add smith command**

Add a new top-level command `smith` which dumps randomly-generated
sqlsmith queries. This is useful for testing modifications to sqlsmith.

Assists: #83024

Release note: None

**sqlsmith: make order-dependent aggregation functions deterministic**

Some aggregation functions (e.g. string_agg) have results that depend
on the order of input rows. To make sqlsmith more deterministic, add
ORDER BY clauses to these aggregation functions whenever their argument
is a column reference. (When their argument is a constant, ordering
doesn't matter.)

Fixes: #83024

Release note: None

84398: colflow: prevent deadlocks when many queries spill to disk at same time r=yuzefovich a=yuzefovich

**colflow: prevent deadlocks when many queries spill to disk at same time**

This commit fixes a long-standing issue which could cause
memory-intensive queries to deadlock on acquiring the file descriptors
quota when vectorized execution spills to disk. This bug has been
present since the introduction of disk-spilling (over two and a half
years ago, introduced in #45318 and partially mitigated in #45892), but
we haven't seen this in any user reports, only in `tpch_concurrency`
roachtest runs, so the severity seems pretty minor.

Consider the following query plan:
```
   Node 1                   Node 2

TableReader              TableReader
    |                         |
HashRouter                HashRouter
    |     \  ___________ /    |
    |      \/__________       |
    |      /           \      |
HashAggregator         HashAggregator
```
and let's imagine that each hash aggregator has to spill to disk. This
would require acquiring the file descriptors quota. Now, imagine that
because of that hash aggregators' spilling, each of the hash routers has
slow outputs causing them to spill too. As a result, this query plan can
require `A + 2 * R` number of FDs of a single node to succeed where `A`
is the quota for a single hash aggregator (equal to 16 - with the
default value of `COCKROACH_VEC_MAX_OPEN_FDS` environment variable which
is 256) and `R` is the quota for a single router output (2). This means
that we can estimate that 20 FDs from each node are needed for the query
to finish execution with 16 FDs being acquired first.

Now imagine that this query is run with concurrency of 16. We can end up
in such a situation that all hash aggregators have spilled, fully
exhausting the global node limit on each node, so whenever the hash
router outputs need to spill, they block forever since no FDs will ever
be released, until a query is canceled or a node is shutdown. In other
words, we have a deadlock.

This commit fixes this situation by introducing a retry mechanism to
exponentially backoff when trying to acquire the FD quota, until a time
out. The randomizations provided by the `retry` package should be
sufficient so that some of the queries succeed while others result in
an error.

Unfortunately, I don't see a way to prevent this deadlock from occurring
in the first place without possible increase in latency in some case.
The difficult thing is that we currently acquire FDs only once we need
them, meaning once a particular component spills to disk. We could
acquire the maximum number of FDs that a query might need up-front,
before the query execution starts, but that could lead to starvation of
the queries that ultimately won't spill to disk. This seems like a much
worse impact than receiving timeout errors on some analytical queries
when run with high concurrency. We're not an OLAP database, so this
behavior seems ok.

Fixes: #80290.

Release note (bug fix): Previously, CockroachDB could deadlock when
evaluating analytical queries f multiple queries had to spill to disk
at the same time. This is now fixed by making some of the queries error
out instead.

**roachtest: remove some debugging printouts in tpch_concurrency**

This was added to track down the deadlock fixed in the previous commit,
so we no longer need it.

Release note: None

84430: sql/schemachanger/scplan: allow plan to move to NonRevertible earlier r=ajwerner a=ajwerner

This is critical for DROP COLUMN. In `DROP COLUMN` we cannot perform the
primary index swap until the dropping column reaches `DELETE_ONLY`, which is
not revertible. The primary index swap itself is revertible. Given this fact,
we need a mechanism to be able to "promote" revertible operations (operations
which don't destroy information or cause irrevocable visible side effects) to
be grouped with or after non-revertible operations. This commit makes that
happen naturally.

Release note: None

84433: rowexec: increase the batch size for join reader ordering strategy r=yuzefovich a=yuzefovich

This commit increases the default value of
`sql.distsql.join_reader_ordering_strategy.batch_size` cluster setting
(which determines the input row batch size for the lookup joins when
ordering needs to be maintained) from 10KiB to 100KiB since the previous
number is likely to have been too conservative. We have seen support
cases (https://github.com/cockroachlabs/support/issues/1627) where
bumping up this setting was needed to get reasonable performance, we
also change this setting to 100KiB in our TPC-E setup
(https://github.com/cockroachlabs/tpc-e/blob/d47d3ea5ce71ecb1be5e0e466a8aa7c94af63d17/tier-a/src/schema.rs#L404).

I did some historical digging, and I believe that the number 10KiB was
chosen somewhat arbitrarily with no real justification in #48058. That
PR changed how we measure the input row batches from the number of rows
to the memory footprint of the input rows. Prior to that change we had
100 rows limit, so my guess that 10KiB was somewhat dependent on that
number.

The reason we don't want this batch size to be too large is that we
buffer all looked up rows in a disk-backed container, so if too many
responses come back (because we included too many input rows in the
batch), that container has to spill to disk. To make sure we don't
regress in such scenarios this commit teaches the join reader to lower
the batch size bytes limit if the container does spill to disk, until
10 KiB which is treated as the minimum.

Release note: None

Co-authored-by: Michael Erickson <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
yuzefovich added a commit to yuzefovich/cockroach that referenced this pull request Jul 19, 2022
This commit fixes a long-standing issue which could cause
memory-intensive queries to deadlock on acquiring the file descriptors
quota when vectorized execution spills to disk. This bug has been
present since the introduction of disk-spilling (over two and a half
years ago, introduced in cockroachdb#45318 and partially mitigated in cockroachdb#45892), but
we haven't seen this in any user reports, only in `tpch_concurrency`
roachtest runs, so the severity seems pretty minor.

Consider the following query plan:
```
   Node 1                   Node 2

TableReader              TableReader
    |                         |
HashRouter                HashRouter
    |     \  ___________ /    |
    |      \/__________       |
    |      /           \      |
HashAggregator         HashAggregator
```
and let's imagine that each hash aggregator has to spill to disk. This
would require acquiring the file descriptors quota. Now, imagine that
because of that hash aggregators' spilling, each of the hash routers has
slow outputs causing them to spill too. As a result, this query plan can
require `A + 2 * R` number of FDs of a single node to succeed where `A`
is the quota for a single hash aggregator (equal to 16 - with the
default value of `COCKROACH_VEC_MAX_OPEN_FDS` environment variable which
is 256) and `R` is the quota for a single router output (2). This means
that we can estimate that 20 FDs from each node are needed for the query
to finish execution with 16 FDs being acquired first.

Now imagine that this query is run with concurrency of 16. We can end up
in such a situation that all hash aggregators have spilled, fully
exhausting the global node limit on each node, so whenever the hash
router outputs need to spill, they block forever since no FDs will ever
be released, until a query is canceled or a node is shutdown. In other
words, we have a deadlock.

This commit fixes this situation by introducing a retry mechanism to
exponentially backoff when trying to acquire the FD quota, until a time
out. The randomizations provided by the `retry` package should be
sufficient so that some of the queries succeed while others result in
an error.

Unfortunately, I don't see a way to prevent this deadlock from occurring
in the first place without possible increase in latency in some case.
The difficult thing is that we currently acquire FDs only once we need
them, meaning once a particular component spills to disk. We could
acquire the maximum number of FDs that a query might need up-front,
before the query execution starts, but that could lead to starvation of
the queries that ultimately won't spill to disk. This seems like a much
worse impact than receiving timeout errors on some analytical queries
when run with high concurrency. We're not an OLAP database, so this
behavior seems ok.

Release note (bug fix): Previously, CockroachDB could deadlock when
evaluating analytical queries f multiple queries had to spill to disk
at the same time. This is now fixed by making some of the queries error
out instead.
yuzefovich added a commit to yuzefovich/cockroach that referenced this pull request Jul 20, 2022
This commit fixes a long-standing issue which could cause
memory-intensive queries to deadlock on acquiring the file descriptors
quota when vectorized execution spills to disk. This bug has been
present since the introduction of disk-spilling (over two and a half
years ago, introduced in cockroachdb#45318 and partially mitigated in cockroachdb#45892), but
we haven't seen this in any user reports, only in `tpch_concurrency`
roachtest runs, so the severity seems pretty minor.

Consider the following query plan:
```
   Node 1                   Node 2

TableReader              TableReader
    |                         |
HashRouter                HashRouter
    |     \  ___________ /    |
    |      \/__________       |
    |      /           \      |
HashAggregator         HashAggregator
```
and let's imagine that each hash aggregator has to spill to disk. This
would require acquiring the file descriptors quota. Now, imagine that
because of that hash aggregators' spilling, each of the hash routers has
slow outputs causing them to spill too. As a result, this query plan can
require `A + 2 * R` number of FDs of a single node to succeed where `A`
is the quota for a single hash aggregator (equal to 16 - with the
default value of `COCKROACH_VEC_MAX_OPEN_FDS` environment variable which
is 256) and `R` is the quota for a single router output (2). This means
that we can estimate that 20 FDs from each node are needed for the query
to finish execution with 16 FDs being acquired first.

Now imagine that this query is run with concurrency of 16. We can end up
in such a situation that all hash aggregators have spilled, fully
exhausting the global node limit on each node, so whenever the hash
router outputs need to spill, they block forever since no FDs will ever
be released, until a query is canceled or a node is shutdown. In other
words, we have a deadlock.

This commit fixes this situation by introducing a retry mechanism to
exponentially backoff when trying to acquire the FD quota, until a time
out. The randomizations provided by the `retry` package should be
sufficient so that some of the queries succeed while others result in
an error.

Unfortunately, I don't see a way to prevent this deadlock from occurring
in the first place without possible increase in latency in some case.
The difficult thing is that we currently acquire FDs only once we need
them, meaning once a particular component spills to disk. We could
acquire the maximum number of FDs that a query might need up-front,
before the query execution starts, but that could lead to starvation of
the queries that ultimately won't spill to disk. This seems like a much
worse impact than receiving timeout errors on some analytical queries
when run with high concurrency. We're not an OLAP database, so this
behavior seems ok.

Release note (bug fix): Previously, CockroachDB could deadlock when
evaluating analytical queries if multiple queries had to spill to disk
at the same time. This is now fixed by making some of the queries error
out instead. If a user knows that there is no deadlock and that some
analytical queries that have spilled just taking too long, blocking
other queries from spilling, and is ok with waiting for longer, the user
can adjust newly introduced `sql.distsql.acquire_vec_fds.max_retries`
cluster setting (using 0 to get the previous behavior of indefinite
waiting until spilling resources open up).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

colexec: possible deadlock in ExternalHashJoiner
4 participants