Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: add support for DISTINCT and FILTER hash aggregation #50721

Merged
merged 1 commit into from
Aug 20, 2020

Conversation

yuzefovich
Copy link
Member

@yuzefovich yuzefovich commented Jun 26, 2020

This commit adds the support of DISTINCT and FILTERing hash aggregation.
The approach is as follows:

  • to handle FILTER we run a selection operator on the input state
  • to handle DISTINCT we encode aggregation columns, one tuple at a time,
    and update the selection vector to include tuples we haven't yet seen
  • then we run the aggregation on the remaining selected tuples
  • and then restore the state with the original length and selection
    vector.

Such handling of FILTER clause sounds good to me, but the handling of
DISTINCT is somewhat unfortunate: we perform encoding one tuple at
a time.
Other approaches have been prototyped but showed worse performance:

  • using the vectorized hash table - the benefit of such approach is that we
    don't reduce ourselves to one tuple at a time (because we would be hashing
    the full columns at once), but the big disadvantage is that the full tuples
    are stored in the hash table (instead of an encoded representation)
  • using a single global map for a particular aggregate function that is
    shared among all aggregation groups - the benefit of such approach is that
    we only have a handful of map, but it turned out that such global map grows
    a lot bigger and has worse performance.

Addresses: #39241.
Addresses: #39242.

Release note (sql change): Vectorized execution engine now natively
supports DISTINCT and FILTERing hash aggregation.

@yuzefovich yuzefovich requested review from asubiotto and a team June 26, 2020 23:02
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@yuzefovich
Copy link
Member Author

TPCH query 16 actually becomes a bit slower with this PR on a single node local cluster - roughly 0.50s before and 0.57s after, so I'm still working on optimizing things, but I thought I would push it out for some feedback.

@yuzefovich yuzefovich force-pushed the distinct-filter-agg branch 3 times, most recently from 68adb4a to ca2df67 Compare June 28, 2020 01:08
@yuzefovich yuzefovich added the do-not-merge bors won't merge a PR with this label. label Jun 28, 2020
@yuzefovich yuzefovich force-pushed the distinct-filter-agg branch from ca2df67 to 8b9c8b4 Compare June 29, 2020 14:59
@yuzefovich
Copy link
Member Author

Comparison of the first commit against the second is here (i.e. wrapped rowexec processor against unoptimized vectorized operator).

Comparison of the second commit against the third is here (i.e. unoptimized vs optimized vectorizer operator).

Comparison of the third against the fourth is here (optimized shared map vs map-per-bucket approach).

And, finally, the comparison of the first commit against the fourth is here. The results are now such that for small group sizes it is faster to use the wrapped processor (which is mostly explained by the fact that even for non-distinct hash aggregation wrapped processor is faster), and for bigger group sizes vectorized hash aggregator wins. So I'm satisfied with the numbers now.

I think that I want to squash the commits 2, 3, and 4 into one, but for now maybe I'll keep them separate if it makes easier to review the code.

@yuzefovich yuzefovich force-pushed the distinct-filter-agg branch 2 times, most recently from 6429763 to 19b68a7 Compare July 8, 2020 18:55
@yuzefovich yuzefovich force-pushed the distinct-filter-agg branch from 19b68a7 to 6577184 Compare July 8, 2020 23:43
@asubiotto
Copy link
Contributor

@yuzefovich sorry for the delay, reviewing this just hasn't been high priority. I'm planning on reviewing this soon though, could you rebase and ping?

@yuzefovich
Copy link
Member Author

Yeah, I've actually planned to rebase this PR today and ping you anyway :)

@yuzefovich yuzefovich force-pushed the distinct-filter-agg branch 3 times, most recently from 9030ab0 to fe4cc51 Compare August 14, 2020 20:51
@yuzefovich yuzefovich removed the do-not-merge bors won't merge a PR with this label. label Aug 14, 2020
@yuzefovich
Copy link
Member Author

yuzefovich commented Aug 14, 2020

I've squashed commits into one and rerun the benchmark, and now the results are quite good, for example:

DistinctAggregation/COUNT/groupSize=1/distinctProb=1.00/nulls=false-24       5.02MB/s ± 2%  10.66MB/s ± 2%  +112.27%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=1/distinctProb=1.00/nulls=true-24        4.83MB/s ± 5%  10.49MB/s ± 1%  +117.09%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=2/distinctProb=0.10/nulls=false-24       7.15MB/s ± 5%   8.85MB/s ± 3%   +23.71%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=2/distinctProb=0.10/nulls=true-24        7.10MB/s ± 4%   8.76MB/s ± 2%   +23.25%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=2/distinctProb=1.00/nulls=false-24       8.80MB/s ± 2%  10.31MB/s ± 1%   +17.17%  (p=0.000 n=9+9)
DistinctAggregation/COUNT/groupSize=2/distinctProb=1.00/nulls=true-24        8.28MB/s ±12%   9.64MB/s ± 3%   +16.41%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=16/distinctProb=0.01/nulls=false-24      12.9MB/s ± 1%   15.0MB/s ± 4%   +16.32%  (p=0.000 n=9+10)
DistinctAggregation/COUNT/groupSize=16/distinctProb=0.01/nulls=true-24       13.2MB/s ± 3%   14.9MB/s ± 3%   +13.01%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=16/distinctProb=0.10/nulls=false-24      17.1MB/s ± 1%   20.7MB/s ± 3%   +20.95%  (p=0.000 n=9+10)
DistinctAggregation/COUNT/groupSize=16/distinctProb=0.10/nulls=true-24       16.9MB/s ± 1%   20.0MB/s ± 1%   +18.74%  (p=0.000 n=10+9)
DistinctAggregation/COUNT/groupSize=16/distinctProb=1.00/nulls=false-24      23.9MB/s ± 0%   28.1MB/s ± 2%   +17.62%  (p=0.000 n=9+10)
DistinctAggregation/COUNT/groupSize=16/distinctProb=1.00/nulls=true-24       22.0MB/s ± 1%   26.4MB/s ± 2%   +20.02%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=64/distinctProb=0.01/nulls=false-24      17.4MB/s ± 0%   21.4MB/s ± 1%   +22.57%  (p=0.000 n=8+10)
DistinctAggregation/COUNT/groupSize=64/distinctProb=0.01/nulls=true-24       17.4MB/s ± 1%   21.4MB/s ± 1%   +22.93%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=64/distinctProb=0.10/nulls=false-24      24.0MB/s ± 1%   31.2MB/s ± 2%   +29.90%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=64/distinctProb=0.10/nulls=true-24       23.5MB/s ± 1%   30.6MB/s ± 1%   +29.97%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=64/distinctProb=1.00/nulls=false-24      27.4MB/s ± 1%   41.5MB/s ± 1%   +51.44%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=64/distinctProb=1.00/nulls=true-24       26.7MB/s ± 1%   39.9MB/s ± 1%   +49.60%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=256/distinctProb=0.01/nulls=false-24     21.3MB/s ± 1%   33.7MB/s ± 1%   +57.93%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=256/distinctProb=0.01/nulls=true-24      21.2MB/s ± 1%   33.0MB/s ± 0%   +55.34%  (p=0.000 n=10+9)
DistinctAggregation/COUNT/groupSize=256/distinctProb=0.10/nulls=false-24     26.5MB/s ± 0%   45.6MB/s ± 1%   +72.10%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=256/distinctProb=0.10/nulls=true-24      26.1MB/s ± 1%   44.1MB/s ± 1%   +68.55%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=256/distinctProb=1.00/nulls=false-24     29.2MB/s ± 0%   58.6MB/s ± 2%  +100.58%  (p=0.000 n=9+10)
DistinctAggregation/COUNT/groupSize=256/distinctProb=1.00/nulls=true-24      28.9MB/s ± 1%   56.2MB/s ± 1%   +94.95%  (p=0.000 n=10+9)
DistinctAggregation/COUNT/groupSize=1024/distinctProb=0.01/nulls=false-24    28.4MB/s ± 0%   49.7MB/s ± 1%   +75.00%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=1024/distinctProb=0.01/nulls=true-24     28.3MB/s ± 1%   48.8MB/s ± 1%   +72.53%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=1024/distinctProb=0.10/nulls=false-24    30.4MB/s ± 0%   55.8MB/s ± 1%   +83.89%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=1024/distinctProb=0.10/nulls=true-24     29.9MB/s ± 1%   53.7MB/s ± 1%   +79.84%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=1024/distinctProb=1.00/nulls=false-24    31.3MB/s ± 2%   74.8MB/s ± 2%  +138.70%  (p=0.000 n=10+10)
DistinctAggregation/COUNT/groupSize=1024/distinctProb=1.00/nulls=true-24     31.4MB/s ± 1%   71.6MB/s ± 1%  +127.96%  (p=0.000 n=9+10)

Note that I made sure not to fool myself (instead of resetting the vectorized aggregator it was fully recreated with TestNewColOperator, similar to how the wrapped processor was benchmarked). I've updated the benchmark to be shorter and removed that "operator recreation" behavior.

@asubiotto RFAL.

@yuzefovich
Copy link
Member Author

Huh, distinct aggregation is no longer required for TPCH query 16 (Drew merged a commit to push "distinct" into GROUP BY about a month ago, so we now have unordered distinct + hash aggregator).

Copy link
Contributor

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the big blockers to writing something that can be reused by both the ordered and hash aggregator?

Reviewed 10 of 10 files at r1.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @yuzefovich)


pkg/sql/colexec/aggregate_funcs.go, line 313 at r1 (raw file):

// countHashAgg was chosen arbitrarily, but it's important that we use a
// pointer to the aggregate function struct.
const sizeOfAggregateFunc = int64(unsafe.Sizeof(&countHashAgg{}))

Not related to this PR but: this will be the size of a pointer to the aggregate function. Is that the intention or are we looking for the size of the struct as well?


pkg/sql/colexec/hash_aggregator.go, line 311 at r1 (raw file):

				eqChain := op.scratch.eqChains[eqChainsSlot]
				bucket := op.buckets[headID-1]
				op.aggHelper.performAggregation(ctx, inputVecs, len(eqChain), eqChain, bucket.fns, bucket.seen)

Is len(eqChain) a redundant argument? If it makes sense for performAggregation to just get the len(eqChain) rather than pass it in as a parameter, I would prefer that.


pkg/sql/colexec/hash_aggregator_util.go, line 42 at r1 (raw file):

	// tuples that have already been seen by the corresponding group (it can be
	// nil when no aggregate function performs distinct aggregation).
	performAggregation(ctx context.Context, vecs []coldata.Vec, inputLen int, sel []int, fns []aggregateFunc, seen []map[string]struct{})

Is it an issue that we're passing in seen here? What if it grows in performAggregation? Does the caller hold on to the old pointer? I know this is a known gotcha for slices, not sure if it is for maps.


pkg/sql/colexec/hash_aggregator_util.go, line 45 at r1 (raw file):

}

// newHashAggregatorHelper creates a new hashAggregatorHelper based on provided

nit: s/on provided/on the provided


pkg/sql/colexec/hash_aggregator_util.go, line 186 at r1 (raw file):

	h.filterInput.reset(vecs, inputLen, sel)
	newBatch := h.filter.Next(ctx)
	return newBatch.ColVecs(), newBatch.Length(), newBatch.Selection(), true

Is it easier to work with these separate fields rather than newBatch?


pkg/sql/colexec/hash_aggregator_util.go, line 192 at r1 (raw file):

// aggregate functions which have at least one FILTER clause but no DISTINCT
// clauses.
type filteringHashAggregatorHelper struct {

This name is very confusing because you have a different struct named filteringHashAggHelper above. Could you reconsider the naming? Maybe something with singleFunction in it?


pkg/sql/colexec/hash_aggregator_util.go, line 248 at r1 (raw file):

// is separate for each aggregation group and for each aggregate function with
// DISTINCT clause.
// Another approaches have been prototyped but showed worse performance:

nit: s/another/other


pkg/sql/colexec/hash_aggregator_util.go, line 299 at r1 (raw file):

	for aggIdx := range spec.Aggregations {
		isDistinct := false
		for _, distinctAggIdx := range distinctAggIdxs {

Is there a good reason for distinctAggIdxs to be dense instead of using -1 to signify non-distinct?


pkg/sql/colexec/hash_aggregator_util.go, line 389 at r1 (raw file):

//    aggregation
// 3. for every function:
//    1) apply the filter to the selection vector of the input

nit: I think these are full sentences so need capitalization/period.


pkg/sql/colexec/ordered_aggregator.go, line 112 at r1 (raw file):

	for _, aggFn := range spec.Aggregations {
		if aggFn.Distinct {
			return nil, errors.AssertionFailedf("distinct ordered aggregation is not supported")

Does this fit in with our


pkg/sql/colexec/vec_to_datum_tmpl.go, line 113 at r1 (raw file):

// convertVecs converts the selected vectors from vecs *without* performing a
// deselection step.
func (c *vecToDatumConverter) convertVecs(vecs []coldata.Vec, inputLen int, sel []int) {

I hope we can work with batches in the aggregator helper and keep this converter to work with only batches. I think adding multiple different entry points into an API causes confusion because caller's don't know what to use and why (although it might be clear to you). If you feel like we need to keep this, please add a comment explaining the difference.


pkg/sql/colmem/allocator.go, line 241 at r1 (raw file):

}

// GetAccount returns the memory account that this allocator is working with.

I don't like that this gives an entry point to bypass all of the allocator methods. Can we at least add a warning to not use this function unless you need to pass an account to some other component?

@yuzefovich yuzefovich force-pushed the distinct-filter-agg branch from fe4cc51 to c9ecb51 Compare August 19, 2020 19:47
Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in the standup, the fundamental difference is that in the ordered aggregation we can have multiple groups within the same batch of tuples, so the logic to handle FILTERing (and possibly DISTINCT) needs to be plumbed into the heart of the aggregate function. I don't know an easy way to get an intuition for it other than taking a look at the unit tests I added in aggregators_test.go and trying to work through them by hand for the ordered aggregation.

I think the edge case for which I couldn't figure out how to not plumb the FILTERing logic into the aggregate function is as follows:

input:
{1, 1, true}
{1, 2, nil}
{1, 2, true}

grouping on column 0, distinct aggregating on column 1 while filtering on column 2. If we have any_not_null(0th), sum_int(distinct 1st) filter 2nd, then the output is (1, 3). The order of the operations should be

  1. apply the filter
  2. on the remaining tuples, apply the distinctness check
  3. aggregate the remaining tuples

This seems easy enough, but we add the fact that different groups could be present in the input, then it's very hard to get it right (another edge case - what if the whole group is filtered out?).

After thinking a bit more, I think it might be possible to add support for DISTINCT ordered aggregation when there is no FILTER, I'll think more about it.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)


pkg/sql/colexec/aggregate_funcs.go, line 313 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Not related to this PR but: this will be the size of a pointer to the aggregate function. Is that the intention or are we looking for the size of the struct as well?

Yes, that's the intention - we want to estimate the size of aggregateFunc (which is an interface) because we're creating []aggregateFunc.


pkg/sql/colexec/hash_aggregator.go, line 311 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Is len(eqChain) a redundant argument? If it makes sense for performAggregation to just get the len(eqChain) rather than pass it in as a parameter, I would prefer that.

Yes, it is redundant, but I prefer it this way because aggregateFunc.Compute has this signature
Compute(vecs []coldata.Vec, inputIdxs []uint32, inputLen int, sel []int)
which has an explicit inputLen argument and I like keeping both in sync.


pkg/sql/colexec/hash_aggregator_util.go, line 42 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Is it an issue that we're passing in seen here? What if it grows in performAggregation? Does the caller hold on to the old pointer? I know this is a known gotcha for slices, not sure if it is for maps.

I extended the comment, but the idea is that seen has already been created by makeSeenMaps, and the slice itself will not be modified at all.

I refactored this code.


pkg/sql/colexec/hash_aggregator_util.go, line 186 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Is it easier to work with these separate fields rather than newBatch?

It's the same perspective - we use disassembled batch everywhere throughout this file, and applyFilter follows that pattern as well.


pkg/sql/colexec/hash_aggregator_util.go, line 192 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

This name is very confusing because you have a different struct named filteringHashAggHelper above. Could you reconsider the naming? Maybe something with singleFunction in it?

Good point, done.

Originally I had slightly different naming when I was trying to support both ordered and hash aggregations, and I think it was less confusing then, but now I agree, it is a very poor name.


pkg/sql/colexec/hash_aggregator_util.go, line 299 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Is there a good reason for distinctAggIdxs to be dense instead of using -1 to signify non-distinct?

Refactored this code.

I also removed the allocator object and confirmed that it doesn't have impact on the performance (only the number of allocations have increased, but the speed remained pretty much the same).


pkg/sql/colexec/ordered_aggregator.go, line 112 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Does this fit in with our

Unfinished comment.

But I think you wanted to say something like "does this fit in with our error propagation story"? If so, yeah, why not? This constructor already has an error as the return parameter, so I think it's ok not to panic and return an assertion failure here. Or did you mean something else?


pkg/sql/colexec/vec_to_datum_tmpl.go, line 113 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

I hope we can work with batches in the aggregator helper and keep this converter to work with only batches. I think adding multiple different entry points into an API causes confusion because caller's don't know what to use and why (although it might be clear to you). If you feel like we need to keep this, please add a comment explaining the difference.

We have changed aggregateFunc.Compute to operate on "disassembled" batches because that allows us to not copy the selection vector (eqChain) into the selection vector of the batch - because of the performance reasons.

Why does it create confusion for the user of this API? The methods are identical in its purpose and behavior with the only difference being "how the batches are represented" - if you have coldata.Batch, then use convertBatch, if you have a "disassembled" batch, then use convertVecs. Basically, use the method that is more convenient for you to use based on the structs that you have, added such comment.


pkg/sql/colmem/allocator.go, line 241 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

I don't like that this gives an entry point to bypass all of the allocator methods. Can we at least add a warning to not use this function unless you need to pass an account to some other component?

Good point, removed.

@yuzefovich yuzefovich force-pushed the distinct-filter-agg branch from c9ecb51 to f027864 Compare August 20, 2020 00:34
Copy link
Contributor

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in the standup, the fundamental difference is that in the ordered aggregation we can have multiple groups within the same batch of tuples, so the logic to handle FILTERing (and possibly DISTINCT) needs to be plumbed into the heart of the aggregate function.

I'm probably missing something, but why can't we plan a filter/distinct chain before the ordered aggregation?

I really don't like the idea of having to plumb this behavior into each aggregation function used by the ordered aggregator. If it was somehow shared that would at least be better, but that would require an overhaul. If we forget this, distinct behavior should be pretty easy since we can just ignore all tuples after the first tuple of each group until we encounter the next group.

Reviewed 9 of 9 files at r2.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @yuzefovich)


pkg/sql/colexec/aggregate_funcs.go, line 313 at r1 (raw file):

Previously, yuzefovich wrote…

Yes, that's the intention - we want to estimate the size of aggregateFunc (which is an interface) because we're creating []aggregateFunc.

Thanks, I remember you also did struct accounting, where does that happen?


pkg/sql/colexec/hash_aggregator.go, line 311 at r1 (raw file):

Previously, yuzefovich wrote…

Yes, it is redundant, but I prefer it this way because aggregateFunc.Compute has this signature
Compute(vecs []coldata.Vec, inputIdxs []uint32, inputLen int, sel []int)
which has an explicit inputLen argument and I like keeping both in sync.

Hmm, the question then becomes why Compute has an inputLen parameter?


pkg/sql/colexec/hash_aggregator_util.go, line 186 at r1 (raw file):
But why? I thought we determined that unwrapping batches didn't lead to many performance benefits since it mostly happens on a per-batch level. We're making it really confusing for future programmers. I think it's ok to unwrap batches and work with the separate components if there's a demonstrated performance benefit, but the default should be working with batches.

we use disassembled batch everywhere throughout this file

Isn't this is a new file? I think we should be using batches throughout this file, not vecs, unless I'm missing some performance benefit.


pkg/sql/colexec/hash_aggregator_util.go, line 303 at r2 (raw file):

func (h *filteringDistinctHashAggregatorHelper) makeSeenMaps() []map[string]struct{} {
	// Note that we consciously don't account for the memory used under seen
	// maps because that memory is likely noticeably smaller than the memory

Couldn't we have used the same argument to justify not accounting for aggregate functions? Maybe it's worth adding accounting.


pkg/sql/colexec/ordered_aggregator.go, line 112 at r1 (raw file):

Previously, yuzefovich wrote…

Unfinished comment.

But I think you wanted to say something like "does this fit in with our error propagation story"? If so, yeah, why not? This constructor already has an error as the return parameter, so I think it's ok not to panic and return an assertion failure here. Or did you mean something else?

Yeah that's what I meant. I was wondering if it was worth checking anything here if we already do it at plan time, but it's good to be defensive.


pkg/sql/colexec/vec_to_datum_tmpl.go, line 113 at r1 (raw file):

We have changed aggregateFunc.Compute to operate on "disassembled" batches because that allows us to not copy the selection vector (eqChain) into the selection vector of the batch - because of the performance reasons.

I think that's OK. As per my other comment, I think working with disassembled batches is OK if it's for performance reasons. The reason it creates confusion is because there are now two ways to do the same thing (the phrase in your comment "methods are identical in its purpose and behavior with the only difference being "how the batches are represented"" is I think the confusing part) so as a user of the API you're left wondering which you should use and why. I think the why of disassembled batches is performance reasons and something we should seldom use (because you introduce the need to keep track of three things rather than just one) and I have the concern that introducing general-purpose methods will lead to unnecessary uses of unwrapped batches.

If the hash_aggregator_util has a good reason to use unwrapped batches that I'm not aware of (which is also likely), then this is fine, but I'd like you to think about it.

This commit adds the support of DISTINCT and FILTERing hash aggregation.
The approach is as follows:
- to handle FILTER we run a selection operator on the input state
- to handle DISTINCT we encode aggregation columns, one tuple at a time,
and update the selection vector to include tuples we haven't yet seen
- then we run the aggregation on the remaining selected tuples
- and then restore the state with the original length and selection
vector.

Such handling of FILTER clause sounds good to me, but the handling of
DISTINCT is somewhat unfortunate: we perform encoding one tuple at
a time.
Other approaches have been prototyped but showed worse performance:
- using the vectorized hash table - the benefit of such approach is that we
don't reduce ourselves to one tuple at a time (because we would be hashing
the full columns at once), but the big disadvantage is that the full tuples
are stored in the hash table (instead of an encoded representation)
- using a single global map for a particular aggregate function that is
shared among all aggregation groups - the benefit of such approach is that
we only have a handful of map, but it turned out that such global map grows
a lot bigger and has worse performance.

Release note (sql change): Vectorized execution engine now natively
supports DISTINCT and FILTERing hash aggregation.
@yuzefovich yuzefovich force-pushed the distinct-filter-agg branch from f027864 to 08b282a Compare August 20, 2020 15:05
Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we plan a filter/distinct chain before the ordered aggregation?

One edge case is what happens when the whole group is filtered out? We still need to let aggregate function populate the output for such group, otherwise there will be a misalignment between any_not_null and all other functions. I think if we don't have any FILTER clauses, handling DISTINCT is relatively easy, but FILTER clause makes it extremely hard to not plumb the logic into each aggregate function.

It's been a while since I struggled with that (but I did spend like a day trying to get it working in all cases), and here is a snippet of what I had in the end. I can't easily describe why I came to realization that there is no way around plumbing the FILTERing logic into the aggregate function because I forgot and lost all the context, but at the time I was certain of it.

In the code snippet I was trying to modify the groups vector so that FILTERing would work in all cases (another case that comes to mind: how do you handle the scenario when the "head" of the group is filtered out? (the "head" is the first tuple in the group for which groups[i]==true)).

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)


pkg/sql/colexec/aggregate_funcs.go, line 313 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Thanks, I remember you also did struct accounting, where does that happen?

That happens in implementations of aggregateFuncAlloc which are function-specific.


pkg/sql/colexec/hash_aggregator.go, line 311 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Hmm, the question then becomes why Compute has an inputLen parameter?

Because in the ordered aggregation we can have a case when sel == nil.


pkg/sql/colexec/hash_aggregator_util.go, line 186 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

But why? I thought we determined that unwrapping batches didn't lead to many performance benefits since it mostly happens on a per-batch level. We're making it really confusing for future programmers. I think it's ok to unwrap batches and work with the separate components if there's a demonstrated performance benefit, but the default should be working with batches.

we use disassembled batch everywhere throughout this file

Isn't this is a new file? I think we should be using batches throughout this file, not vecs, unless I'm missing some performance benefit.

That unwrapping was done in order to avoid copies of selection vector (eqChain) in the hash aggregator (third commit in #51337).

I think you're missing the point here: hash aggregator is the user of these hashAggregatorHelpers, in the hash aggregator we have an input batch and multiple "selection vectors" (different eqChains), and in order to get the performance benefit we avoid copying those selection vectors onto the batch and, instead, pass unwrapped batch and each selection vector separately to the helpers to perform the aggregation. So the decision of whether to use disassembled batch is not made in this file - it is made in hash_aggregator.go, and that's why performAggregation has the signature it has. Handling FILTER clause requires us to reassemble the batch to run it through the chain of filtering operators, but everywhere else in this file we operate with disassembled batch, most importantly when calling aggregateFunc.Compute, so I really don't see a reason why this function should return coldata.Batch when all other functions operate on the disassembled representation.


pkg/sql/colexec/hash_aggregator_util.go, line 303 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Couldn't we have used the same argument to justify not accounting for aggregate functions? Maybe it's worth adding accounting.

The difficulty of trying to account for seen maps is that map in Go will grow dynamically, and we can't easily account for the memory they are using. We could account for the initial size, but we don't account for the memory under seen maps in the row-by-row engine at all, so I think the current approach is reasonable (note that we do account for the memory under the keys of the map, we're only missing accounting of map internal things).


pkg/sql/colexec/ordered_aggregator.go, line 112 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Yeah that's what I meant. I was wondering if it was worth checking anything here if we already do it at plan time, but it's good to be defensive.

I'm not sure what else would you do here? In execplan.go we do check whether the ordered aggregation spec is supported, and here we return an assertion failure.


pkg/sql/colexec/vec_to_datum_tmpl.go, line 113 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

We have changed aggregateFunc.Compute to operate on "disassembled" batches because that allows us to not copy the selection vector (eqChain) into the selection vector of the batch - because of the performance reasons.

I think that's OK. As per my other comment, I think working with disassembled batches is OK if it's for performance reasons. The reason it creates confusion is because there are now two ways to do the same thing (the phrase in your comment "methods are identical in its purpose and behavior with the only difference being "how the batches are represented"" is I think the confusing part) so as a user of the API you're left wondering which you should use and why. I think the why of disassembled batches is performance reasons and something we should seldom use (because you introduce the need to keep track of three things rather than just one) and I have the concern that introducing general-purpose methods will lead to unnecessary uses of unwrapped batches.

If the hash_aggregator_util has a good reason to use unwrapped batches that I'm not aware of (which is also likely), then this is fine, but I'd like you to think about it.

I responded to the other comment, but I do think that there is no way around introducing convertVecs method because the hash aggregator operates on disassembled batches which causes hashAggregatorHelpers to operate on them as well. However, we don't have a user of convertVecsAndDeselect, so that method is now removed (I initially added it just for symmetry).

Copy link
Contributor

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

I don't think this PR will concern itself with the ordered aggregator so I think it's fine to merge. However, I think it'd be great if you could post to the filter/distinct issues about the ordered aggregator.

Reviewed 2 of 2 files at r3.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @yuzefovich)


pkg/sql/colexec/hash_aggregator_util.go, line 186 at r1 (raw file):

Previously, yuzefovich wrote…

That unwrapping was done in order to avoid copies of selection vector (eqChain) in the hash aggregator (third commit in #51337).

I think you're missing the point here: hash aggregator is the user of these hashAggregatorHelpers, in the hash aggregator we have an input batch and multiple "selection vectors" (different eqChains), and in order to get the performance benefit we avoid copying those selection vectors onto the batch and, instead, pass unwrapped batch and each selection vector separately to the helpers to perform the aggregation. So the decision of whether to use disassembled batch is not made in this file - it is made in hash_aggregator.go, and that's why performAggregation has the signature it has. Handling FILTER clause requires us to reassemble the batch to run it through the chain of filtering operators, but everywhere else in this file we operate with disassembled batch, most importantly when calling aggregateFunc.Compute, so I really don't see a reason why this function should return coldata.Batch when all other functions operate on the disassembled representation.

Ah, got it, thanks for the explanation. That's fine then.


pkg/sql/colexec/hash_aggregator_util.go, line 303 at r2 (raw file):

Previously, yuzefovich wrote…

The difficulty of trying to account for seen maps is that map in Go will grow dynamically, and we can't easily account for the memory they are using. We could account for the initial size, but we don't account for the memory under seen maps in the row-by-row engine at all, so I think the current approach is reasonable (note that we do account for the memory under the keys of the map, we're only missing accounting of map internal things).

Agreed 👍


pkg/sql/colexec/ordered_aggregator.go, line 112 at r1 (raw file):

Previously, yuzefovich wrote…

I'm not sure what else would do you here? In execplan.go we do check whether the ordered aggregation spec is supported, and here we return an assertion failure.

I meant worth doing what you're already doing, I'm not suggesting adding anything else.

Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a stub at distinct ordered aggregation now, but I'll leave the FILTER clause until better times.

TFTR!

bors r+

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @yuzefovich)

@craig
Copy link
Contributor

craig bot commented Aug 20, 2020

Build succeeded:

@craig craig bot merged commit 6b147c7 into cockroachdb:master Aug 20, 2020
@yuzefovich yuzefovich deleted the distinct-filter-agg branch August 20, 2020 18:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants