Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner, expression: support multi-distinct agg under MPP mode #39973

Merged
merged 25 commits into from
Feb 20, 2023

Conversation

AilinKid
Copy link
Contributor

@AilinKid AilinKid commented Dec 15, 2022

Signed-off-by: AilinKid [email protected]

What problem does this PR solve?

Issue Number: close #16581 #34704

What is changed and how it works?

1: implement grouping sets basic expression logic
2: implement multi-distinct agg's admission logic and 3-phase plan adjustment
3: implement a mock repeat source executor to help understand what's going on and even for test convenience.
4: implement the theoretically computed stats for partial agg, projection, and exchanger related. (details commented below)

tipb requirement: pingcap/tipb#283

tiflash optional: pingcap/tiflash#6545

For review friendly, some isolated parts can be extracted as alternative small ones. You can pick friendly here first.

Demo

image

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Side effects

  • Performance regression: Consumes more CPU
  • Performance regression: Consumes more Memory
  • Breaking backward compatibility

Documentation

  • Affects user behaviors
  • Contains syntax changes
  • Contains variable changes
  • Contains experimental features
  • Changes MySQL compatibility

Release note

Please refer to Release Notes Language Style Guide to write a quality release note.

support multi-distinct agg under MPP mode

@ti-chi-bot
Copy link
Member

ti-chi-bot commented Dec 15, 2022

[REVIEW NOTIFICATION]

This pull request has been approved by:

  • fixdb
  • winoros

To complete the pull request process, please ask the reviewers in the list to review by filling /cc @reviewer in the comment.
After your PR has acquired the required number of LGTMs, you can assign this pull request to the committer in the list by filling /assign @committer in the comment to help you merge this pull request.

The full list of commands accepted by this bot can be found here.

Reviewer can indicate their review by submitting an approval review.
Reviewer can cancel approval by submitting a request changes review.

@ti-chi-bot ti-chi-bot added do-not-merge/invalid-title release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. labels Dec 15, 2022
@AilinKid AilinKid changed the title multi distinct agg initial commit planner, mock-copr: multi distinct agg initial commit Dec 15, 2022
@AilinKid AilinKid changed the title planner, mock-copr: multi distinct agg initial commit planner, expression: support multi distinct agg under mpp mode Dec 15, 2022
@ti-chi-bot ti-chi-bot added release-note Denotes a PR that will be considered when it comes time to generate release notes. and removed release-note-none Denotes a PR that doesn't merit a release note. labels Dec 15, 2022
@AilinKid AilinKid changed the title planner, expression: support multi distinct agg under mpp mode planner, expression: support multi-distinct agg under MPP mode Dec 15, 2022
@AilinKid AilinKid requested review from fixdb and winoros December 15, 2022 10:12
expression/grouping_sets_test.go Show resolved Hide resolved
expression/grouping_sets.go Outdated Show resolved Hide resolved
expression/grouping_sets.go Outdated Show resolved Hide resolved
expression/grouping_sets.go Show resolved Hide resolved
planner/core/explain.go Outdated Show resolved Hide resolved
@AilinKid
Copy link
Contributor Author

AilinKid commented Dec 18, 2022

add another commit to adjust the stats estimated
cases diff

 [    └─HashAgg_28 1.00 mpp[tiflash]  funcs:sum(Column#20)->Column#6, funcs:sum(Column#22)->Column#7, funcs:sum(Column#24)->Column#8]
-[      └─ExchangeReceiver_36 1.00 mpp[tiflash]  ]
-[        └─ExchangeSender_35 1.00 mpp[tiflash]  ExchangeType: PassThrough]
-[          └─HashAgg_30 1.00 mpp[tiflash]  funcs:count(distinct Column#21)->Column#20, funcs:count(distinct Column#23)->Column#22, funcs:sum(Column#17)->Column#24]
-[            └─Projection_32 1.00 mpp[tiflash]  Column#17, test.t.a, test.t.b, Column#18, case(eq(Column#18, 1), test.t.a, <nil>)->Column#21, case(eq(Column#18, 2), test.t.b, <nil>)->Column#23]
-[              └─ExchangeReceiver_34 1.00 mpp[tiflash]  ]
-[                └─ExchangeSender_33 1.00 mpp[tiflash]  ExchangeType: HashPartition, Hash Cols: [name: test.t.a, collate: binary], [name: test.t.b, collate: binary], [name: Column#18, collate: binary]]
-[                  └─HashAgg_26 1.00 mpp[tiflash]  group by:Column#18, test.t.a, test.t.b, funcs:count(Column#19)->Column#17]
+[      └─ExchangeReceiver_36 1.00 mpp[tiflash]  ]
+[        └─ExchangeSender_35 1.00 mpp[tiflash]  ExchangeType: PassThrough]
+[          └─HashAgg_30 1.00 mpp[tiflash]  funcs:count(distinct Column#21)->Column#20, funcs:count(distinct Column#23)->Column#22, funcs:sum(Column#17)->Column#24]
+[            └─Projection_32 16000.00 mpp[tiflash]  Column#17, test.t.a, test.t.b, Column#18, case(eq(Column#18, 1), test.t.a, <nil>)->Column#21, case(eq(Column#18, 2), test.t.b, <nil>)->Column#23]
+[              └─ExchangeReceiver_34 16000.00 mpp[tiflash]  ]
+[                └─ExchangeSender_33 16000.00 mpp[tiflash]  ExchangeType: HashPartition, Hash Cols: [name: test.t.a, collate: binary], [name: test.t.b, collate: binary], [name: Column#18, collate: binary]]
+[                  └─HashAgg_26 16000.00 mpp[tiflash]  group by:Column#18, test.t.a, test.t.b, funcs:count(Column#19)->Column#17]
 [                    └─Projection_31 20000.00 mpp[tiflash]  test.t.a, test.t.b, test.t.c, Column#18, case(eq(Column#18, 1), test.t.c, <nil>)->Column#19]

when split mppScalar agg into as follows:
output operator <= 3 stages agg + repeat source <= input operator
the stats before input operator and after output operator (themself included) is unchanged.

what we need to do is to adjust the partial agg, middle proj, partial proj, and the exchanger between them. some notes here:

  • repeat source doesn't change the old input operator's NDV, it just expanded the old rows.
  • use the unified and theoretical stats after group items are applied over repeated data for all operators between repeat op and middle-agg.
  • for middle-agg and final-agg, they just group all the data in its node into one row, so use their old stats, the case here is 1

expression/aggregation/descriptor.go Outdated Show resolved Hide resolved
expression/constant.go Outdated Show resolved Hide resolved
go.mod Outdated Show resolved Hide resolved
planner/core/task.go Outdated Show resolved Hide resolved
return false, nil
}
for _, arg := range fun.Args {
// bail out when args are not simple column, see GitHub issue #35417
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does that issue affect this branch?

Copy link
Contributor Author

@AilinKid AilinKid Dec 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied from hasheng's pr in single distinct-agg. @fixdb have some memory about this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In single distinct agg's case, if the distinct key is not a simple column, planner will generate a shuffle with empty key. But In this PR, @AilinKid will project a column for complex expression, so I think the comment doesn't apply here if the projection is done before optimization.

planner/core/task.go Outdated Show resolved Hide resolved
planner/core/task.go Outdated Show resolved Hide resolved
planner/core/task.go Outdated Show resolved Hide resolved
planner/core/task.go Outdated Show resolved Hide resolved
planner/core/task.go Show resolved Hide resolved
@ti-chi-bot ti-chi-bot added the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Dec 27, 2022
@ti-chi-bot ti-chi-bot removed the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Dec 27, 2022
@AilinKid
Copy link
Contributor Author

/run-unit-test

expression/constant.go Outdated Show resolved Hide resolved
planner/core/task.go Outdated Show resolved Hide resolved
planner/core/task.go Outdated Show resolved Hide resolved
planner/core/task.go Outdated Show resolved Hide resolved
planner/core/task.go Outdated Show resolved Hide resolved
// for every group set,try to find its position to insert if possible,otherwise create a new grouping set.
for j := len(oneNewGroupingSet) - 1; j >= 0; j-- {
cur := oneNewGroupingSet[j]
if targetOne.SubSetOf(cur) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the targetOne is super set of cur and j=0, then it will create the new GroupingSet at the line:126.
For example:

targetOne is [a,b]
gss is [
   [[a],]
]

The result will be

gss [
   [[a]]
   [[a,b]]
]

The result is not expected by your comment. Are you sure this logic is correctly ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch, fixed~

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, LGTM

expression/grouping_sets_test.go Show resolved Hide resolved
planner/core/task.go Outdated Show resolved Hide resolved
planner/core/task.go Outdated Show resolved Hide resolved

// canUse3Stage4MultiDistinctAgg returns true if this agg can use 3 stage for multi distinct aggregation
func (p *basePhysicalAgg) canUse3Stage4MultiDistinctAgg() (can bool, gss expression.GroupingSets) {
if !p.ctx.GetSessionVars().Enable3StageDistinctAgg || len(p.GroupByItems) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we don't support multi distinct with group by ?

Copy link
Contributor Author

@AilinKid AilinKid Dec 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the next step if needed

return partialAgg, finalAgg
}

func (p *basePhysicalAgg) scale3StageForDistinctAgg() (bool, expression.GroupingSets) {
if p.canUse3Stage4SingleDistinctAgg() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this logic is more clearly ?

if (!Enable3StageDistinctAgg) {
    return false
}
distinct_num = 0;
if (distinct_num == 1) {
    canUse3Stage4SingleDistinctAgg();
} else {
    canUse3Stage4MultiDistinctAgg();
}

(BTW, this is just my personal opinion)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, switch judgment can be taken out here, while the distinct_num can only be scratched inside

expression/aggregation/descriptor.go Outdated Show resolved Hide resolved
return false, nil
}
for _, arg := range fun.Args {
// bail out when args are not simple column, see GitHub issue #35417
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In single distinct agg's case, if the distinct key is not a simple column, planner will generate a shuffle with empty key. But In this PR, @AilinKid will project a column for complex expression, so I think the comment doesn't apply here if the projection is done before optimization.

// groupingID now is the offset of target grouping in GroupingSets.
// todo: it may be changed after grouping set merge in the future.
fun.GroupingID = int64(len(GroupingSets))
} else if len(fun.Args) > 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really want to bail out here? I see there are a lot of code dealing with group with multiple columns in GroupingSet.

Copy link
Contributor Author

@AilinKid AilinKid Jan 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this bailout cases like normal agg with multi args:
sum(a+b) has only one arg. --- scalarFunc(a,b). admitted
group_concat(x order by y) has two arg here. banned now
And some JSON agg will have complicated multi args too. banned now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, make sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can some comments here to make it clearer

planner/core/task.go Outdated Show resolved Hide resolved
// even for distinct agg multi args like count(distinct a,b), either null of a,b will cause count func to skip this row.
// so we only need to set the either one arg of them to be a case when expression to target for its self-group:
// Expr = (case when groupingID = ? then arg else null end)
// count(distinct a,b) => count(distinct (case when groupingID = targetID then a else null end), b)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I remembered we don't need the case when here, at least for built-in agg functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a standard way, actually, we can remove this projection to let distinct-agg and normal-agg aggregate on whole scope data. (null rows included but not counted for these null-strict built aggs), what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. count(*) is an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

count(*) has already been rewritten as count(1) when entering here

]
},
{
"SQL": "select count(distinct a, b), count(distinct b), count(c), sum(d) from t",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't support it at the moment, is it because TiFlash doesn't support?

Copy link
Contributor Author

@AilinKid AilinKid Jan 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it needs some changes: Expand op produces non-fixed schema length(currently just append one more LongLong column --- groupingID), I will do it next step. Otherwise, this TiFlash pr will become huge.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood.


// step2: adjust middle agg
// proj4Middle output all the base col from lower op + caseWhen proj cols.(reuse partial agg stats)
proj4Middle := new(PhysicalProjection).Init(p.ctx, partialHashAgg.stats, partialHashAgg.SelectBlockOffset())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no non-distinct agg function (all of them are distinct functions), do we still need the projection operator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

proj4Partial: project for normal-agg arg with case-when
proj4Middle: project for distinct-agg arg with case-when

as mentioned in the comment above, if do it in a more efficient/non-standard way, we can remove case-when to let both null-strict agg aggregate on whole scope data

@AilinKid
Copy link
Contributor Author

AilinKid commented Jan 4, 2023

new refactor, simplify the computation since all current TiDB aggregation is null-stricted, so we can just let all distinct aggregation aggregate on whole-scope data (including their expanded rows, which means no case-when expression to select targeted data), because those expanded rows are all filled with null values for your current targeted agg, these rows all naturally ignored in it's agg computation logic.

for normal agg, cases like: select count(distinct a), count(distinct b), count(c) from t; since c is not one of the grouping column, count(c) will get a doubled-result when it aggregates on whole-scope data since column c is not filled with null in other grouping sets. So normal agg still needs case-when expression to targeted for their cared group data.

// select count(distinct a), count(distinct b), count(c) from t
//  final agg   : sum(#1), sum(#2), sum(#3)
//                |         |        +--------------------------------------------+
//                |         +--------------------------------+                    |
//                +-------------+                            |                    |
//                              v                            v                    v
//  middle agg  : count(distinct caseWhen4a) as #1, count(distinct caseWhen4b) as #2, sum(#4) as #3    # all partial result
//                                   |                              |
//                                   +---------------------+        |
//                                                         v        v
//  proj4Middle : a, b, groupingID, #4(partial res), caseWhen4a, caseWhen4b               # caseWhen4a & caseWhen4a depend on groupingID and a, b
//                                      |
//                                      |
//  exchange hash partition: <a, b, groupingID>                                # shuffle data by <a,b,groupingID>
//                                      |
//                                      v
//  partial agg : a, b, groupingID, count(caseWhen4c) as #4           # since group by a, b, groupingID we can directly output all this three.
//                                         |
//                                         v
//  proj4Partial: a, b, c, groupingID, caseWhen4c            # caseWhen4c depend on groupingID and c
//
//  expand: a, b, c, groupingID                        # appended groupingID to diff group set
//
//  lower source: a, b, c                       # don't care what the lower source is

// since TiDB currently doesn't have supported non-null strict aggregation, we can remove the case-when projection
// in the cases above and let distinct-agg directly to aggregate on the whole-scope data. As a result,
// the new refactored plan will be like below.

// select count(distinct a), count(distinct b), count(c) from t
//  final agg   : sum(#1), sum(#2), sum(#3)
//                |         |        +------------------------+
//                |         +-----------------+               |
//                +-----------+               |               |
//                            v               v               v
//  middle agg  : count(a) as #1, count(b) as #2, sum(#4) as #3          # all partial result, since current agg is all null-stricted,distinct agg is targeting whole-scope data                                       
//  
//
//  exchange hash partition: <a, b, groupingID>                     # shuffle data by <a,b,groupingID>
//                                 
//
//  partial agg : a, b, groupingID, count(caseWhen4c) as #4    # since current agg is all null-stricted,normal agg is targeting whole-scope data
//
//  proj4Partial: a, b, c, groupingID, caseWhen4c         # caseWhen4c depend on groupingID and c
//
//  expand: a, b, c, groupingID                      # appended groupingID to diff group set
//
//  lower source: a, b, c                       # don't care what the lower source is

Copy link
Member

@winoros winoros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't taken a careful look at how we merge the grouping sets. Others parts LGTM

// groupingID now is the offset of target grouping in GroupingSets.
// todo: it may be changed after grouping set merge in the future.
fun.GroupingID = int64(len(GroupingSets))
} else if len(fun.Args) > 1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can some comments here to make it clearer

Copy link
Contributor

@elsa0520 elsa0520 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

// for every group set,try to find its position to insert if possible,otherwise create a new grouping set.
for j := len(oneNewGroupingSet) - 1; j >= 0; j-- {
cur := oneNewGroupingSet[j]
if targetOne.SubSetOf(cur) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, LGTM

@AilinKid AilinKid requested a review from a team as a code owner February 20, 2023 09:27
@AilinKid
Copy link
Contributor Author

/merge

@ti-chi-bot
Copy link
Member

This pull request has been accepted and is ready to merge.

Commit hash: b353681

@ti-chi-bot ti-chi-bot added the status/can-merge Indicates a PR has been approved by a committer. label Feb 20, 2023
Signed-off-by: AilinKid <[email protected]>
@ti-chi-bot ti-chi-bot removed the status/can-merge Indicates a PR has been approved by a committer. label Feb 20, 2023
@AilinKid
Copy link
Contributor Author

/merge

@ti-chi-bot
Copy link
Member

This pull request has been accepted and is ready to merge.

Commit hash: d9f693b

@ti-chi-bot ti-chi-bot added the status/can-merge Indicates a PR has been approved by a committer. label Feb 20, 2023
@AilinKid
Copy link
Contributor Author

/retest

1 similar comment
@AilinKid
Copy link
Contributor Author

/retest

@AilinKid AilinKid removed the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Feb 20, 2023
@AilinKid AilinKid merged commit 4ccce9c into pingcap:master Feb 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. status/can-merge Indicates a PR has been approved by a committer. status/LGT2 Indicates that a PR has LGTM 2.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

planner: eliminate aggregation with distinct
6 participants