Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsql: refactor distributed aggregation to support arbitrary local aggregate inputs in final stage #18386

Merged
merged 1 commit into from
Sep 15, 2017

Conversation

richardwu
Copy link
Contributor

@richardwu richardwu commented Sep 9, 2017

In order to implement a distributed version of STDDEV and VARIANCE that is numerically stable (see #14351), we require the final stage aggregators to have access to multiple local (or intermediary) aggregate values (in the case of VARIANCE, we require the intermediary aggregate values "SQDIFF", "SUM", and "COUNT"; FYI see https://www.johndcook.com/blog/skewness_kurtosis/ and https://github.com/cockroachdb/cockroach/pull/17728/files).

This PR allows specifying the corresponding local indices in LocalStage that the FinalStage aggregator functions may use as inputs in DistAggregationTable.

There is no logical change to current aggregate functions.

  • I plan to verify (in a separate branch) that multiple intermediary values can indeed propagate e.g. in the case of distributed VARIANCE before merging this refactor.

cc: @vivekmenezes

@richardwu richardwu requested review from rjnn, RaduBerinde and a team September 9, 2017 00:11
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@richardwu richardwu force-pushed the stddev-var-local-final branch from 0f16dfc to 7b4a226 Compare September 9, 2017 00:24
@RaduBerinde
Copy link
Member

Looks great to me!

@knz, is the new AggregateFunc.Add API ok with you? (we want to allow multiple inputs, but avoid an extra allocation for the common case of 1 input; maybe I'm prematurely optimizing?).


Review status: 0 of 5 files reviewed at latest revision, 6 unresolved discussions, all commit checks successful.


pkg/sql/distsql_physical_planner.go, line 1400 at r1 (raw file):

		// inside localAgg and finalIdx is an index inside finalAgg.
		localIdx := 0
		finalIdx := 0

finalIdx := 0 should be moved below, before the loop that uses it


pkg/sql/distsqlplan/aggregator_funcs.go, line 93 at r1 (raw file):

		LocalStage: []distsqlrun.AggregatorSpec_Func{distsqlrun.AggregatorSpec_IDENT},
		FinalStage: []FinalStageInfo{
			{distsqlrun.AggregatorSpec_IDENT, passThroughLocalIdxs},

[nit] use Fn: .., LocalIdxs: .. (helps when reading code, and if the structure changes in the future)


pkg/sql/distsqlrun/aggregator.go, line 302 at r1 (raw file):

			}
			// Extract the corresponding values from the row to feed into the aggregate function.
			values := make(parser.Datums, len(a.ColIdx))

would be nice to avoid the allocation for the common case of one value. Maybe add can also have firstArg Datum, otherArgs ...Datum


pkg/sql/distsqlrun/aggregator.go, line 303 at r1 (raw file):

			// Extract the corresponding values from the row to feed into the aggregate function.
			values := make(parser.Datums, len(a.ColIdx))
			for i, c := range a.ColIdx {

[nit] use j instead of i (the outer loop uses i and makes this harder to read)


pkg/sql/parser/aggregate_builtins.go, line 64 at r1 (raw file):

// AggregateFunc accumulates the result of a function of a Datum.
type AggregateFunc interface {
	// Add accumulates the passed datums into the AggregateFunc.

This should document the arguments (maybe name them firstArg, otherArgs) and explain why not just ...Datum.


pkg/sql/parser/aggregate_builtins.go, line 216 at r1 (raw file):

	argTypes := make(ArgTypes, len(in))
	for i, typ := range in {
		argTypes[i] = struct {

[nit] can just beargsTypes[i].Name = .. and argTypes[i].Typ = typ


Comments from Reviewable

@richardwu richardwu requested a review from knz September 10, 2017 01:28
@richardwu richardwu force-pushed the stddev-var-local-final branch from 7b4a226 to 610ddc4 Compare September 10, 2017 02:15
@richardwu
Copy link
Contributor Author

Review status: 0 of 6 files reviewed at latest revision, 6 unresolved discussions.


pkg/sql/distsql_physical_planner.go, line 1400 at r1 (raw file):

Previously, RaduBerinde wrote…

finalIdx := 0 should be moved below, before the loop that uses it

The loop that uses this is actually nested within this outer loop (loops over all aggregations in the plan). This counter persists across each aggregation, so it's necessary to initialize this before the loop of aggregations.


pkg/sql/distsqlplan/aggregator_funcs.go, line 93 at r1 (raw file):

Previously, RaduBerinde wrote…

[nit] use Fn: .., LocalIdxs: .. (helps when reading code, and if the structure changes in the future)

Done.


pkg/sql/distsqlrun/aggregator.go, line 302 at r1 (raw file):

Previously, RaduBerinde wrote…

would be nice to avoid the allocation for the common case of one value. Maybe add can also have firstArg Datum, otherArgs ...Datum

Done.


pkg/sql/distsqlrun/aggregator.go, line 303 at r1 (raw file):

Previously, RaduBerinde wrote…

[nit] use j instead of i (the outer loop uses i and makes this harder to read)

Done.


pkg/sql/parser/aggregate_builtins.go, line 64 at r1 (raw file):

Previously, RaduBerinde wrote…

This should document the arguments (maybe name them firstArg, otherArgs) and explain why not just ...Datum.

Done.


pkg/sql/parser/aggregate_builtins.go, line 216 at r1 (raw file):

Previously, RaduBerinde wrote…

[nit] can just beargsTypes[i].Name = .. and argTypes[i].Typ = typ

Done.


Comments from Reviewable

@RaduBerinde
Copy link
Member

Review status: 0 of 6 files reviewed at latest revision, 7 unresolved discussions.


pkg/sql/distsql_physical_planner.go, line 1400 at r1 (raw file):

Previously, richardwu (Richard Wu) wrote…

The loop that uses this is actually nested within this outer loop (loops over all aggregations in the plan). This counter persists across each aggregation, so it's necessary to initialize this before the loop of aggregations.

Ah sorry didn't see the nesting properly.


pkg/sql/distsql_physical_planner.go, line 1436 at r1 (raw file):

				// by the # of local values, or len(info.LocalStage) = 2 to get the adjusted
				// localIdxCurrentAgg = 2 - 2 = 0.
				localIdxCurrentAgg := uint32(localIdx - len(info.LocalStage))

Instead of subtracting len(info.LocalStage) here, can't we initialize localIdxCurrentAgg before the previous loop?


Comments from Reviewable

@richardwu richardwu force-pushed the stddev-var-local-final branch from 610ddc4 to e43919a Compare September 10, 2017 02:42
@richardwu
Copy link
Contributor Author

Review status: 0 of 6 files reviewed at latest revision, 2 unresolved discussions.


pkg/sql/distsql_physical_planner.go, line 1436 at r1 (raw file):

Previously, RaduBerinde wrote…

Instead of subtracting len(info.LocalStage) here, can't we initialize localIdxCurrentAgg before the previous loop?

Good point! Done.


Comments from Reviewable

@RaduBerinde
Copy link
Member

:lgtm:


Comments from Reviewable

@richardwu richardwu changed the title distsql: (WIP) refactor distributed aggregation to support arbitrary local aggregate inputs in final stage distsql: refactor distributed aggregation to support arbitrary local aggregate inputs in final stage Sep 10, 2017
@knz
Copy link
Contributor

knz commented Sep 10, 2017

LGTM with nit. Does this patch also enable fixing #10495?


Reviewed 1 of 5 files at r1, 5 of 5 files at r2.
Review status: all files reviewed at latest revision, 3 unresolved discussions, some commit checks failed.


pkg/sql/parser/aggregate_builtins.go, line 222 at r2 (raw file):

	argTypes := make(ArgTypes, len(in))
	for i, typ := range in {
		argTypes[i].Name = fmt.Sprintf("arg%d", i)

nit: , i+1) - this is for humans.


Comments from Reviewable

@richardwu richardwu force-pushed the stddev-var-local-final branch from e43919a to b42cb3e Compare September 12, 2017 16:40
@richardwu
Copy link
Contributor Author

This should be possible now as long as we can parse and propagate the other arguments.


Review status: 4 of 6 files reviewed at latest revision, 3 unresolved discussions, some commit checks pending.


pkg/sql/parser/aggregate_builtins.go, line 222 at r2 (raw file):

Previously, knz (kena) wrote…

nit: , i+1) - this is for humans.

Done.


Comments from Reviewable

@knz
Copy link
Contributor

knz commented Sep 12, 2017

👍 thanks


Reviewed 2 of 2 files at r3.
Review status: all files reviewed at latest revision, 2 unresolved discussions, some commit checks failed.


Comments from Reviewable

@richardwu richardwu force-pushed the stddev-var-local-final branch 3 times, most recently from 85c1a89 to 626027b Compare September 15, 2017 15:14
@richardwu richardwu force-pushed the stddev-var-local-final branch from 626027b to 95cff88 Compare September 15, 2017 15:48
@richardwu richardwu merged commit 1c6b721 into cockroachdb:master Sep 15, 2017
@richardwu richardwu deleted the stddev-var-local-final branch September 15, 2017 18:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants