Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: add default aggregate function #52174

Merged
merged 2 commits into from
Aug 12, 2020
Merged

Conversation

yuzefovich
Copy link
Member

@yuzefovich yuzefovich commented Jul 31, 2020

Depends on #51337.
Depends on #52315.

colexec: add default aggregate function

This commit introduces "default" aggregate function which is an adapter
from tree.AggregateFunc to colexec.aggregateFunc. It works as
follows:

  • the aggregator (either hash or ordered) is responsible for converting
    all necessary vectors to tree.Datum columns before calling Compute
    (this allows us to share the conversion between multiple functions if
    they happened to take in the same columns and between multiple groups in
    case of the hash aggregator)
  • the default aggregate function populates "arguments" to be passed into
    the wrapped tree.AggregateFunc and adds them
  • when the new group is encountered, the result so far is flushed and
    the wrapped tree.AggregateFunc is reset.

One detail is that these wrapped tree.AggregateFuncs need to be
closed, and currently that responsibility lies with the alloc object
that is creating them. In the future, we might want to shift the
responsibility to the aggregators.

Addresses: #43561.

Release note: None

colexec: clean up hash aggregate functions

Hash aggregate function always have non-nil sel, and this commit
removes the code generation for nil sel case (meaning it removes the
dead code). It also templates out nulls vs no-nulls cases in bool_and
and bool_or aggregates.

Release note: None

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@yuzefovich yuzefovich force-pushed the agg-fns branch 3 times, most recently from 641ed72 to d37e40a Compare August 1, 2020 00:24
@yuzefovich yuzefovich changed the title colexec: WIP on default aggregate function colexec: add default aggregate function Aug 1, 2020
@yuzefovich
Copy link
Member Author

The comparison against the wrapped rowexec processor is rather positive:

BenchmarkAggregatorsAgainstWrappedProcessors/hash/STRING_AGG/groupSize=1/nulls=false/wrapped-16         	      16	  68629312 ns/op	  15.28 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/hash/STRING_AGG/groupSize=1/nulls=false/vectorized-16      	      25	  40485154 ns/op	  25.90 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/hash/STRING_AGG/groupSize=4/nulls=false/wrapped-16         	      40	  26597027 ns/op	  39.42 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/hash/STRING_AGG/groupSize=4/nulls=false/vectorized-16      	      51	  22222534 ns/op	  47.19 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/hash/STRING_AGG/groupSize=16/nulls=false/wrapped-16        	      52	  21222130 ns/op	  49.41 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/hash/STRING_AGG/groupSize=16/nulls=false/vectorized-16     	      78	  15627650 ns/op	  67.10 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/hash/STRING_AGG/groupSize=64/nulls=false/wrapped-16        	      58	  19286233 ns/op	  54.37 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/hash/STRING_AGG/groupSize=64/nulls=false/vectorized-16     	      88	  13062478 ns/op	  80.27 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/hash/STRING_AGG/groupSize=1024/nulls=false/wrapped-16      	      70	  17265061 ns/op	  60.73 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/hash/STRING_AGG/groupSize=1024/nulls=false/vectorized-16   	     132	   9052391 ns/op	 115.83 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/ordered/STRING_AGG/groupSize=1/nulls=false/wrapped-16      	      39	  31631538 ns/op	  33.15 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/ordered/STRING_AGG/groupSize=1/nulls=false/vectorized-16   	      90	  12740350 ns/op	  82.30 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/ordered/STRING_AGG/groupSize=4/nulls=false/wrapped-16      	      66	  17845765 ns/op	  58.76 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/ordered/STRING_AGG/groupSize=4/nulls=false/vectorized-16   	     142	   8276803 ns/op	 126.69 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/ordered/STRING_AGG/groupSize=16/nulls=false/wrapped-16     	      82	  15065607 ns/op	  69.60 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/ordered/STRING_AGG/groupSize=16/nulls=false/vectorized-16  	     163	   7316560 ns/op	 143.32 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/ordered/STRING_AGG/groupSize=64/nulls=false/wrapped-16     	      86	  14487031 ns/op	  72.38 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/ordered/STRING_AGG/groupSize=64/nulls=false/vectorized-16  	     157	   7523532 ns/op	 139.37 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/ordered/STRING_AGG/groupSize=1024/nulls=false/wrapped-16   	      86	  14206792 ns/op	  73.81 MB/s
BenchmarkAggregatorsAgainstWrappedProcessors/ordered/STRING_AGG/groupSize=1024/nulls=false/vectorized-16         	     160	   7407518 ns/op	 141.56 MB/s

And the absolute speeds are pretty good as well: string_agg. Note that the numbers are somewhat inflated when comparing against other optimized aggregate functions because string_agg takes in two columns as arguments.

@yuzefovich yuzefovich requested review from asubiotto and a team August 1, 2020 00:31
@yuzefovich yuzefovich force-pushed the agg-fns branch 4 times, most recently from 4ce6881 to 7067281 Compare August 1, 2020 21:11
@yuzefovich
Copy link
Member Author

The benchmarks of the last commit are here and here.

@yuzefovich
Copy link
Member Author

Only last two commits belong in this PR.

Copy link
Contributor

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 17 of 17 files at r3, 15 of 26 files at r4.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)


pkg/sql/colexec/aggregate_funcs.go, line 25 at r3 (raw file):

)

// isAggOptimized returns whether aggFn has optimized implementation.

nit: s/has/has an/


pkg/sql/colexec/aggregate_funcs.go, line 195 at r3 (raw file):

	funcAllocs := make([]aggregateFuncAlloc, len(spec.Aggregations))
	var toClose Closers
	var idxsToConvert util.FastIntSet

Is this over-optimization? If so, I would just use []int. If not, see if newVecToDatumConverter can use FastIntSet to avoid having to convert?


pkg/sql/colexec/aggregate_funcs.go, line 299 at r3 (raw file):

		if err != nil {
			return nil, nil, toClose, err

Are closers closed on error?


pkg/sql/colexec/aggregate_funcs.go, line 345 at r3 (raw file):

	inputTypes []*types.T,
) (
	constructors []execinfrapb.AggregateConstructor,

I don't think we need to use named return variables here (you're initializing pretty each one minus error anyway)


pkg/sql/colexec/count_agg_tmpl.go, line 78 at r4 (raw file):

) {
	var i int
	// Remove unused warning.

🤔 what's the cause of this unused warning?


pkg/sql/colexec/default_agg_tmpl.go, line 118 at r3 (raw file):

// 'convertedTupleIdx'. These indices are the same when there is no selection
// vector but could be different if there is one.
func _ADD_TUPLE(

Any chance we can use @jordanlewis' new templating framework for this and other things in this file?


pkg/sql/colexec/hash_aggregator.go, line 366 at r3 (raw file):

func (op *hashAggregator) Close(ctx context.Context) error {
	op.toClose.CloseAndLogOnErr(ctx, "hash-aggregator")

I think that CloseAndLogOnErr should be used when we cannot/don't want to return an error which I think is not the case here. We could maybe decorate the error with the fact that we're closing from the hash aggregator but I don't think we should swallow the error.


pkg/sql/colexec/utils_test.go, line 1439 at r3 (raw file):

}

func (c *chunkingBatchSource) reset(context.Context) {

Add a var _ resetter assertion above?


pkg/sql/colexec/colbuilder/execplan.go, line 698 at r3 (raw file):

				)
			} else {
				evalCtx.SingleDatumAggMemAccount = streamingMemAccount

Some aggregate functions like ARRAY_AGG are not streaming so it is no longer true that hash = buffering, ordered = streaming. We might need to do something different here.


pkg/sql/distsql/columnar_operators_test.go, line 60 at r3 (raw file):

	var da sqlbase.DatumAlloc

	// We need +1 because an entry for index=6 was omitted by mistake.

🤔


pkg/sql/distsql/columnar_operators_test.go, line 202 at r3 (raw file):

						// on.
						continue
						// TODO(yuzefovich): here is a more tight condition,

Don't think so. Do we check the case where one returns an error but the other doesn't?


pkg/sql/distsql/columnar_utils_test.go, line 79 at r3 (raw file):

	if rng.Float64() < 0.5 {
		randomBatchSize := 1 + rng.Intn(3)
		fmt.Printf("coldata.BatchSize() is set to %d\n", randomBatchSize)

Was this not useful?


pkg/sql/execinfrapb/processors.go, line 143 at r3 (raw file):

// AggregateFuncToNumArguments maps aggregate functions to the number of
// arguments they take.
var AggregateFuncToNumArguments = map[AggregatorSpec_Func]int{

How do we keep this up to date? If this is used by only testing code, I would rather keep it close to the tests (i.e. not in execinfrapb)

Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @jordanlewis)


pkg/sql/colexec/aggregate_funcs.go, line 195 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Is this over-optimization? If so, I would just use []int. If not, see if newVecToDatumConverter can use FastIntSet to avoid having to convert?

Done.


pkg/sql/colexec/aggregate_funcs.go, line 299 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Are closers closed on error?

It's the same behavior as for all other Closers. I've just checked vectorizedFlowCreator.setupFlow and no, they won't get closed. But this function will return an error only when we're trying to create an aggregate function on an unsupported type, and AFAIK we now fully support all optimized functions and have this default unoptimized one, so if an error occurs, then something has gone really wrong.


pkg/sql/colexec/aggregate_funcs.go, line 345 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

I don't think we need to use named return variables here (you're initializing pretty each one minus error anyway)

I added named return variables in order to document the code better and make it easier to use this function. Unless you have a strong objection, I'll keep it this way.


pkg/sql/colexec/count_agg_tmpl.go, line 78 at r4 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

🤔 what's the cause of this unused warning?

It was a case of hash aggregation when we don't need to pay attention to nulls. I updated the template to generate more efficient code for that case.


pkg/sql/colexec/default_agg_tmpl.go, line 118 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Any chance we can use @jordanlewis' new templating framework for this and other things in this file?

I think updated execgen doesn't support all things needed for all aggregate functions, so I don't think it's worth spending time on figuring out whether we could implement this particular template in that framework (some complications are that we have if eq "_AGGKIND" "Ordered" condition that all aggregate functions templates have, and that condition is handled on the "meta" level). I'm pretty sure it'll be easier to update all of the files at once.


pkg/sql/colexec/hash_aggregator.go, line 366 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

I think that CloseAndLogOnErr should be used when we cannot/don't want to return an error which I think is not the case here. We could maybe decorate the error with the fact that we're closing from the hash aggregator but I don't think we should swallow the error.

Actually, the aggregators will never return an error here because defaultHashAggAlloc implements Closer interface and in Close method it calls tree.AggregateFunc.Close method which doesn't return an error.

Left a clarifying comment.


pkg/sql/colexec/utils_test.go, line 1439 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Add a var _ resetter assertion above?

Done.


pkg/sql/colexec/colbuilder/execplan.go, line 698 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Some aggregate functions like ARRAY_AGG are not streaming so it is no longer true that hash = buffering, ordered = streaming. We might need to do something different here.

Such functions create and manage their own memory accounts.

SingleDatumAggMemAccount is shared by all aggregate functions that need to store like a single datum.


pkg/sql/distsql/columnar_operators_test.go, line 60 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

🤔

Yeah, I know. Apparently, that's an artifact that has been present since 1.0.


pkg/sql/distsql/columnar_operators_test.go, line 202 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Don't think so. Do we check the case where one returns an error but the other doesn't?

Removed. Yeah, that case is checked separately and it'll have a different error message ("different number of metas returned").


pkg/sql/distsql/columnar_utils_test.go, line 79 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Was this not useful?

Yeah, I think it wasn't useful, just clogging up the output.


pkg/sql/execinfrapb/processors.go, line 143 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

How do we keep this up to date? If this is used by only testing code, I would rather keep it close to the tests (i.e. not in execinfrapb)

I added a note to processors_sql.proto which should help with keeping the map up to date.

I agree, though, the map probably doesn't have to live in execinfrapb, moved it next to the tests. I think originally I was thinking of using the map somewhere else as well, but I don't remember what that was for.

Copy link
Contributor

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 10 of 15 files at r1, 52 of 52 files at r5, 8 of 8 files at r6, 18 of 18 files at r7, 26 of 26 files at r8.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @yuzefovich)


pkg/sql/colexec/aggregate_funcs.go, line 299 at r3 (raw file):

Previously, yuzefovich wrote…

It's the same behavior as for all other Closers. I've just checked vectorizedFlowCreator.setupFlow and no, they won't get closed. But this function will return an error only when we're trying to create an aggregate function on an unsupported type, and AFAIK we now fully support all optimized functions and have this default unoptimized one, so if an error occurs, then something has gone really wrong.

Fair enough, but maybe we shouldn't return toClose if we encounter an error.


pkg/sql/colexec/hash_aggregator.go, line 366 at r3 (raw file):

Previously, yuzefovich wrote…

Actually, the aggregators will never return an error here because defaultHashAggAlloc implements Closer interface and in Close method it calls tree.AggregateFunc.Close method which doesn't return an error.

Left a clarifying comment.

Even if the current implementations don't return an error, future ones might, or the code might change to do so. I think it's more sane to propagate whatever error happens up if we can. This is partly why I didn't want to implement CloseAndLogOnErr, because it should be pretty rare that you want to swallow the error. I think I'm also guilty of overusing CloseAndLogOnErr.


pkg/sql/colexec/ordered_aggregator.go, line 339 at r7 (raw file):

	// tree.AggregateFunc.Close doesn't return an error, and that's why it's ok
	// to "swallow" errors here because they won't actually occur.
	a.toClose.CloseAndLogOnErr(ctx, "ordered-aggregator")

ditto. Please also add a comment on CloseAndLogOnErr that specifies that one should only use it if returning an error doesn't make sense.


pkg/sql/colexec/colbuilder/execplan.go, line 698 at r3 (raw file):

Previously, yuzefovich wrote…

Such functions create and manage their own memory accounts.

SingleDatumAggMemAccount is shared by all aggregate functions that need to store like a single datum.

OK. I guess my concern was re IsStreaming below but I defer to you

Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto)


pkg/sql/colexec/aggregate_funcs.go, line 299 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Fair enough, but maybe we shouldn't return toClose if we encounter an error.

Done. I don't think it's important though.


pkg/sql/colexec/hash_aggregator.go, line 366 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Even if the current implementations don't return an error, future ones might, or the code might change to do so. I think it's more sane to propagate whatever error happens up if we can. This is partly why I didn't want to implement CloseAndLogOnErr, because it should be pretty rare that you want to swallow the error. I think I'm also guilty of overusing CloseAndLogOnErr.

Ok, fair enough, updated.


pkg/sql/colexec/ordered_aggregator.go, line 339 at r7 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

ditto. Please also add a comment on CloseAndLogOnErr that specifies that one should only use it if returning an error doesn't make sense.

Done.


pkg/sql/colexec/colbuilder/execplan.go, line 698 at r3 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

OK. I guess my concern was re IsStreaming below but I defer to you

I think still even with functions as array_agg ordered aggregator should be considered "streaming".

I agree that it becomes a little confusing, but I also think that it's not very important to be exactly correct here - the aggregate functions like this perform the memory accounting, so if we reach the memory limit, an error will return; however, this will also be the case for rowexec.orderedAggregator, so there is not much benefit in prohibiting such aggregate functions with vectorize=201auto (which is the only case when IsStreaming matters).

@dpulls
Copy link

dpulls bot commented Aug 11, 2020

🎉 All dependencies have been resolved !

@yuzefovich yuzefovich force-pushed the agg-fns branch 3 times, most recently from 3f3e660 to 3708582 Compare August 11, 2020 17:36
Copy link
Contributor

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 34 of 34 files at r9, 26 of 26 files at r10.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained

This commit introduces "default" aggregate function which is an adapter
from `tree.AggregateFunc` to `colexec.aggregateFunc`. It works as
follows:
- the aggregator (either hash or ordered) is responsible for converting
all necessary vectors to `tree.Datum` columns before calling `Compute`
(this allows us to share the conversion between multiple functions if
they happened to take in the same columns and between multiple groups in
case of the hash aggregator)
- the default aggregate function populates "arguments" to be passed into
the wrapped `tree.AggregateFunc` and adds them
- when the new group is encountered, the result so far is flushed and
the wrapped `tree.AggregateFunc` is reset.

One detail is that these wrapped `tree.AggregateFunc`s need to be
closed, and currently that responsibility lies with the alloc object
that is creating them. In the future, we might want to shift the
responsibility to the aggregators.

Release note: None
Hash aggregate function always have non-nil `sel`, and this commit
removes the code generation for nil `sel` case (meaning it removes the
dead code). It also templates out nulls vs no-nulls cases in `bool_and`
and `bool_or` aggregates.

Release note: None
@yuzefovich
Copy link
Member Author

TFTR!

bors r+

@craig
Copy link
Contributor

craig bot commented Aug 12, 2020

Build succeeded:

@craig craig bot merged commit 28c0337 into cockroachdb:master Aug 12, 2020
@yuzefovich yuzefovich deleted the agg-fns branch August 12, 2020 19:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants