Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

need to support pulling aggregate up upon join #6895

Open
zz-jason opened this issue Jun 24, 2018 · 6 comments
Open

need to support pulling aggregate up upon join #6895

zz-jason opened this issue Jun 24, 2018 · 6 comments
Labels
sig/planner SIG: Planner type/enhancement The issue or PR belongs to an enhancement.

Comments

@zz-jason
Copy link
Member

The original query 17 in the TPC-H benchmark is:

select
	sum(l_extendedprice) / 7.0 as avg_yearly
from
	lineitem,
	part
where
	p_partkey = l_partkey
	and p_brand = 'Brand#44'
	and p_container = 'WRAP PKG'
	and l_quantity < (
		select
			0.2 * avg(l_quantity)
		from
			lineitem
		where
			l_partkey = p_partkey
	);

The physical plan generated by TiDB optimizer is:
2018-06-24 20 13 08

+------------------------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
| id                           | task | operator info                                                                                                                                                      | count       |
+------------------------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
| Projection_15                | root | div(11_col_0, 7.0)                                                                                                                                                 | 1.00        |
|   StreamAgg_20               | root | funcs:sum(tpch.lineitem.l_extendedprice)                                                                                                                           | 1.00        |
|     Projection_43            | root | tpch.lineitem.l_partkey, tpch.lineitem.l_quantity, tpch.lineitem.l_extendedprice, tpch.part.p_partkey, tpch.part.p_brand, tpch.part.p_container, mul(0.2, 7_col_0) | 47187.51    |
|       Selection_44           | root | lt(tpch.lineitem.l_quantity, mul(0.2, 7_col_0))                                                                                                                    | 47187.51    |
|         HashLeftJoin_45      | root | left outer join, inner:HashAgg_39, equal:[eq(tpch.part.p_partkey, tpch.lineitem.l_partkey)]                                                                        | 58984.39    |
|           HashRightJoin_51   | root | inner join, inner:TableReader_34, equal:[eq(tpch.part.p_partkey, tpch.lineitem.l_partkey)]                                                                         | 58984.39    |
|             TableReader_34   | root | data:Selection_33                                                                                                                                                  | 1958.42     |
|               Selection_33   | cop  | eq(tpch.part.p_brand, Brand#44), eq(tpch.part.p_container, WRAP PKG)                                                                                               | 1958.42     |
|                 TableScan_32 | cop  | table:part, range:[-inf,+inf], keep order:false                                                                                                                    | 2000000.00  |
|             TableReader_53   | root | data:TableScan_52                                                                                                                                                  | 59986052.00 |
|               TableScan_52   | cop  | table:lineitem, range:[-inf,+inf], keep order:false                                                                                                                | 59986052.00 |
|           HashAgg_39         | root | group by:col_3, funcs:avg(col_0, col_1), firstrow(col_2)                                                                                                           | 1991680.00  |
|             TableReader_40   | root | data:HashAgg_35                                                                                                                                                    | 1991680.00  |
|               HashAgg_35     | cop  | group by:tpch.lineitem.l_partkey, funcs:avg(tpch.lineitem.l_quantity), firstrow(tpch.lineitem.l_partkey)                                                           | 1991680.00  |
|                 TableScan_38 | cop  | table:lineitem, range:[-inf,+inf], keep order:false                                                                                                                | 59986052.00 |
+------------------------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
15 rows in set (0.00 sec)

We handle the subquery in the method of "aggregate(HashAgg_39) then join(HashLeftJoin_45)", this query runs about 3 minutes in my computer with scale factor 10

In fact, we can pull the aggregate HashAgg_39 up, handle this subquery in the method of "join then aggregate", which means the subquery can be modified to the following form:

select sum(sumb)/7.0 as avg_yearly
from (
    select a,
           sumb,
           0.2*avg(l_quantity) as filter
    from (
        select l_quantity as a,
               l_extendedprice as b,
               p_partkey as c,
               sum(l_extendedprice) as sumb
        from lineitem
        inner join part
        on p_partkey = l_partkey and p_brand = 'Brand#44' and p_container = 'WRAP PKG'
        group by a, b, c
    ) tmp
    left outer join lineitem
    on l_partkey = c
    group by a, b, c, sumb
    having a < filter
) xx;

The corresponding execution plan for this modified query is:
2018-06-24 20 19 52

+----------------------------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
| id                               | task | operator info                                                                                                                                                                                                                               | count       |
+----------------------------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
| Projection_15                    | root | div(12_col_0, 7.0)                                                                                                                                                                                                                          | 1.00        |
|   StreamAgg_20                   | root | funcs:sum(tmp.sumb)                                                                                                                                                                                                                         | 1.00        |
|     Projection_43                | root | sumb, mul(0.2, 8_col_0), tpch.lineitem.a                                                                                                                                                                                                    | 47187.51    |
|       Selection_44               | root | lt(tpch.lineitem.a, mul(0.2, 8_col_0))                                                                                                                                                                                                      | 47187.51    |
|         HashAgg_47               | root | group by:tpch.lineitem.a, tpch.lineitem.b, tpch.part.c, sumb, funcs:avg(tpch.lineitem.l_quantity), firstrow(tpch.lineitem.a), firstrow(sumb)                                                                                                | 58984.39    |
|           HashLeftJoin_26        | root | left outer join, inner:TableReader_42, equal:[eq(tpch.part.c, tpch.lineitem.l_partkey)]                                                                                                                                                     | 1776510.56  |
|             HashAgg_29           | root | group by:tpch.lineitem.l_quantity, tpch.lineitem.l_extendedprice, tpch.part.p_partkey, funcs:sum(tpch.lineitem.l_extendedprice), firstrow(tpch.lineitem.l_quantity), firstrow(tpch.lineitem.l_extendedprice), firstrow(tpch.part.p_partkey) | 58984.39    |
|               HashLeftJoin_34    | root | inner join, inner:TableReader_40, equal:[eq(tpch.lineitem.l_partkey, tpch.part.p_partkey)]                                                                                                                                                  | 58984.39    |
|                 TableReader_37   | root | data:TableScan_36                                                                                                                                                                                                                           | 59986052.00 |
|                   TableScan_36   | cop  | table:lineitem, range:[-inf,+inf], keep order:false                                                                                                                                                                                         | 59986052.00 |
|                 TableReader_40   | root | data:Selection_39                                                                                                                                                                                                                           | 1958.42     |
|                   Selection_39   | cop  | eq(tpch.part.p_brand, Brand#44), eq(tpch.part.p_container, WRAP PKG)                                                                                                                                                                        | 1958.42     |
|                     TableScan_38 | cop  | table:part, range:[-inf,+inf], keep order:false                                                                                                                                                                                             | 2000000.00  |
|             TableReader_42       | root | data:TableScan_41                                                                                                                                                                                                                           | 59986052.00 |
|               TableScan_41       | cop  | table:lineitem, range:[-inf,+inf], keep order:false                                                                                                                                                                                         | 59986052.00 |
+----------------------------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
15 rows in set (0.00 sec)

This modified version only runs 45 seconds and produced the same result with the original one.

But, this rule is not guaranteed to always generate a better plan, we should take cost into consideration: in the physical plan exhausting phase, not only consider the implementation rules(which physical operator implementation to use, chose hash join or merge join), but also consider the transformation rules, like aggregate push down, aggregate pull up

@zz-jason zz-jason added type/enhancement The issue or PR belongs to an enhancement. sig/planner SIG: Planner labels Jun 24, 2018
@zz-jason zz-jason self-assigned this Jun 24, 2018
@winoros
Copy link
Member

winoros commented Jun 25, 2018

Is the result still be correct?
Maybe you should try to use a merge join hint.

@zz-jason
Copy link
Member Author

@winoros The result is correct, merge join will not save the performance. The main overhead here is we aggregate a table with 6kw rows, and the aggregate resulted in 200w rows, which takes a lot of time. In stead we should first do a outer join, which can have a smaller result set.

@winoros
Copy link
Member

winoros commented Jun 25, 2018

This rewrite is not reasonable.
sum(l_extendedprice) should be the aggregate function on the outside lineitem table instead of on the subquery.
And that this rewriting can be one equivalence of the original sql relies on the fact that p_partkey is primary key of the part table. So the join won't expand the data of the inner lineitem. Pull up and add group by field is not reasonable, This may destroy the data distribution.
So it just happens to be right. Not a uniform rewriting method.

In most cases, a rule should be something performed on a small field. Usually just about a plan and its children. Neither should you consider this plan's father. Nor should you consider the children of its children.

@zz-jason
Copy link
Member Author

@winoros You are right, the rule only need to consider a piece of the operator tree. But here I only meant to show that we need to perform the "join then aggregate" operation, maybe you should drive your focus of attention to this point. As the plan I showed in this case, we can generate it through the combination of other rules.

@zz-jason
Copy link
Member Author

And yes, before we applying a rule, we need to consider the equivalency between the two operator trees before and after applying the rule. And as you said we may need to take data distribution, column index, etc. into consideration. For these subquery rewriting methods I think maybe we can lean from "Orthogonal Optimization of Subqueries and Aggregation"

@winoros
Copy link
Member

winoros commented Jun 25, 2018

@zz-jason
The actual part that speed the query is three part:

  • put the filter on p down to the table. (Currently, tidb cannot do this because the left join blocked this.).
  • this rewriting also changed the join order.
  • pull up the aggregate function.

@zz-jason zz-jason removed their assignment Mar 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sig/planner SIG: Planner type/enhancement The issue or PR belongs to an enhancement.
Projects
None yet
Development

No branches or pull requests

2 participants