Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: sql planner creates cross join instead of inner join from select predicates #1566

Merged
merged 1 commit into from
Jan 21, 2022

Conversation

xudong963
Copy link
Member

Which issue does this PR close?

Closes #1293

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

@xudong963
Copy link
Member Author

❯ create table part as select 1 as p_partkey;                                                                                                                                                
0 rows in set. Query took 0.003 seconds.                                                      
❯ create table lineitem as select 1 as l_partkey, 2 as l_suppkey;                             
0 rows in set. Query took 0.005 seconds.                                                      
❯ create table supplier as select 1 as s_suppkey;                                             
0 rows in set. Query took 0.002 seconds.                                                                                                
❯ explain select * from part, supplier, lineitem where p_partkey = l_partkey and s_suppkey = l_suppkey; 

+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                               |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: #part.p_partkey, #lineitem.l_partkey, #lineitem.l_suppkey, #supplier.s_suppkey                                                         |
|               |   Join: #lineitem.l_suppkey = #supplier.s_suppkey                                                                                                  |
|               |     Join: #part.p_partkey = #lineitem.l_partkey                                                                                                    |
|               |       TableScan: part projection=Some([0])                                                                                                         |
|               |       TableScan: lineitem projection=Some([0, 1])                                                                                                  |
|               |     TableScan: supplier projection=Some([0])                                                                                                       |

@xudong963
Copy link
Member Author

xudong963 commented Jan 15, 2022

cc @houqp @alamb

@xudong963
Copy link
Member Author

xudong963 commented Jan 15, 2022

Currently, query 8 can't seem to pass due to case ... when ... then ... else, so I delete it to run the bench

select
    o_year,
from
    (
        select
            extract(year from o_orderdate) as o_year,
            l_extendedprice * (1 - l_discount) as volume,
            n2.n_name as nation
        from
            part,
            supplier,
            lineitem,
            orders,
            customer,
            nation n1,
            nation n2,
            region
        where
                p_partkey = l_partkey
          and s_suppkey = l_suppkey
          and l_orderkey = o_orderkey
          and o_custkey = c_custkey
          and c_nationkey = n1.n_nationkey
          and n1.n_regionkey = r_regionkey
          and r_name = 'AMERICA'
          and s_nationkey = n2.n_nationkey
          and o_orderdate between date '1995-01-01' and date '1996-12-31'
          and p_type = 'ECONOMY ANODIZED STEEL'
    ) as all_nations
group by
    o_year
order by
    o_year;

master

➜  benchmarks git:(master) cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 8 --batch-size 4096
  
Query 8 iteration 0 took 5047.0 ms
Query 8 iteration 1 took 4540.6 ms
Query 8 iteration 2 took 4552.0 ms
Query 8 avg time: 4713.19 ms

xudong963:fix_cross_join

➜  benchmarks git:(fix_cross_join) ✗ cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 8 --batch-size 4096
Query 8 iteration 0 took 2823.8 ms
Query 8 iteration 1 took 2543.9 ms
Query 8 iteration 2 took 2442.7 ms
Query 8 avg time: 2603.48 ms

@alamb
Copy link
Contributor

alamb commented Jan 15, 2022

Thank you @xudong963 -- this looks very cool. I'll try and review it carefully tomorrow 👍

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you possibly provide some tests @xudong963 ?

I don't fully understand how this works -- I was expecting to see code that basically applied an algebraic transformation on predicates like:

(A AND X) OR (A and Y) OR (A and Z)

And rewrote them to

A AND (X OR Y OR Z)

Update:

I was totally confused with q19: https://github.com/xudong963/arrow-datafusion/blob/fix_cross_join/benchmarks/queries/q19.sql

please ignore 🤦

@xudong963
Copy link
Member Author

xudong963 commented Jan 16, 2022

Could you possibly provide some tests @xudong963 ?

Sure. The resulting correctness test can be overridden by the current test. I can add a test about the logical plan.

I was expecting to see code that basically applied an algebraic transformation on predicates like:

The ticket doesn't do the transformation. It does the following thing.

First of all, let's see the example:

❯ create table part as select 1 as p_partkey;                                                                                                                                                
0 rows in set. Query took 0.003 seconds.                                                      
❯ create table lineitem as select 1 as l_partkey, 2 as l_suppkey;                             
0 rows in set. Query took 0.005 seconds.                                                      
❯ create table supplier as select 1 as s_suppkey;                                             
0 rows in set. Query took 0.002 seconds.                                                                                                
❯ explain select * from part, supplier, lineitem where p_partkey = l_partkey and s_suppkey = l_suppkey;                                                                                      
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------+                       
| plan_type     | plan                                                                                                                                                                                                                         |               
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------+  
| logical_plan  | Projection: #part.p_partkey, #supplier.s_suppkey, #lineitem.l_partkey, #lineitem.l_suppkey                                                                                 
                                                  | 
|               |   Join: #part.p_partkey = #lineitem.l_partkey, #supplier.s_suppkey = #lineitem.l_suppkey                                                                                   
                                                  |                                           
|               |     CrossJoin:                                                                                                                                                                                                               |                          
|               |       TableScan: part projection=Some([0])                                  
                                                  |                                                                                                                                          
|               |       TableScan: supplier projection=Some([0])                                                                                                                             
                                                  |                                                                                                                                          
|               |     TableScan: lineitem projection=Some([0, 1])      

https://github.com/apache/arrow-datafusion/blob/6f7b2d25fb75c843efed67fbd72d09b2c2d6c2eb/datafusion/src/sql/planner.rs#L718
In the for loop, at first, left is part, right is supplier, there is no join key between part and right, so result in cross join between part and right. It's heavy.

In the ticket, we can push supplier to mut_plans, after inner join part and lineitem, supplier can inner join with them.

+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                               |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: #part.p_partkey, #lineitem.l_partkey, #lineitem.l_suppkey, #supplier.s_suppkey                                                         |
|               |   Join: #lineitem.l_suppkey = #supplier.s_suppkey                                                                                                  |
|               |     Join: #part.p_partkey = #lineitem.l_partkey                                                                                                    |
|               |       TableScan: part projection=Some([0])                                                                                                         |
|               |       TableScan: lineitem projection=Some([0, 1])                                                                                                  |
|               |     TableScan: supplier projection=Some([0])   

@pjmore
Copy link
Contributor

pjmore commented Jan 17, 2022

@xudong963 This might be a bit nitpicky but this will enter an infinite loop if multiple cross joins are required for a plan. As an example, the following explain select will never finish:


❯ create table part as select 1 as p_partkey, 2 as p_partprice, 3 as p_partprice
_taxadded;
0 rows in set. Query took 0.036 seconds.
❯ create table lineitem as select 1 as l_itemkey;
0 rows in set. Query took 0.001 seconds.
❯ create table supplier as select 1 as s_suppkey;
0 rows in set.0 rows in set. Query took 0.001 seconds. Query took 0.001 seconds.
❯ explain select * from part,supplier,lineitem where p_partprice=p_partprice_tax
added;

In practice I'm not sure how much of a deal this is as it requires multiple cross joins with a filter that operates solely on a single table.

@xudong963
Copy link
Member Author

xudong963 commented Jan 17, 2022

@xudong963 This might be a bit nitpicky but this will enter an infinite loop if multiple cross joins are required for a plan. As an example, the following explain select will never finish:


❯ create table part as select 1 as p_partkey, 2 as p_partprice, 3 as p_partprice
_taxadded;
0 rows in set. Query took 0.036 seconds.
❯ create table lineitem as select 1 as l_itemkey;
0 rows in set. Query took 0.001 seconds.
❯ create table supplier as select 1 as s_suppkey;
0 rows in set.0 rows in set. Query took 0.001 seconds. Query took 0.001 seconds.
❯ explain select * from part,supplier,lineitem where p_partprice=p_partprice_tax
added;

In practice I'm not sure how much of a deal this is as it requires multiple cross joins with a filter that operates solely on a single table.

Good catch, thanks @pjmore. I think we should process the case because we can't guarantee what kind of queries the user will write 😁

@xudong963 xudong963 force-pushed the fix_cross_join branch 2 times, most recently from fd62c56 to 7d46587 Compare January 17, 2022 14:30
@xudong963
Copy link
Member Author

add tests and fix the extreme case. cc @alamb @pjmore


#[test]
fn cross_join_not_to_inner_join() {
let sql = "select person.id from person, orders, lineitem where person.id = person.age;";
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here @pjmore

@@ -214,8 +220,14 @@ pub struct Extension {
pub node: Arc<dyn UserDefinedLogicalNode + Send + Sync>,
}

impl PartialEq for Extension {
fn eq(&self, _other: &Self) -> bool {
todo!()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if need to do something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for implementing PartialEq for all the plan nodes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you leave todo!() here it means the code will assert for plans with UserDefinedPlanNodes and cross joins (which is likely not possible given the inputs are from the FROM list, and thus shoudl be Select (for subqueries) or TableScan for table references

For the future, I think you can change

#[derive(Clone)]
pub struct Extension {
    /// The runtime extension operator
    pub node: Arc<dyn UserDefinedLogicalNode + PartialEq Send + Sync>,
}

to the following (aka force the UserDefinedLogicalNode to implement PartialEq):

#[derive(Clone, PartialEq)]
pub struct Extension {
    /// The runtime extension operator
    pub node: Arc<dyn UserDefinedLogicalNode + PartialEq Send + Sync>,
}

Copy link
Member Author

@xudong963 xudong963 Jan 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the future, I think you can change

PartialEq isn't auto-trait, so it can't be used as an additional trait in a trait object

@houqp houqp requested a review from alamb January 18, 2022 06:53
Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, nice performance gain @xudong963 👍

@@ -214,8 +220,14 @@ pub struct Extension {
pub node: Arc<dyn UserDefinedLogicalNode + Send + Sync>,
}

impl PartialEq for Extension {
fn eq(&self, _other: &Self) -> bool {
todo!()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for implementing PartialEq for all the plan nodes?

.cross_join(&mut_plans[idx])?
.build()?;
} else {
mut_plans.push(mut_plans[idx].clone());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be helpful to add a simple comment here to explain why pushing the same plan to the end helps. my understanding is we want to retry the join key matching after we go through all the plan once just in case this plan can be indirectly joined with left through other plans.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you understand exactly right 👍

@houqp
Copy link
Member

houqp commented Jan 18, 2022

As a future follow up task, I think there is value in handling this plan rewrite in our optimizer layer so dataframe API users can benefit from it as well.

@houqp houqp added the performance Make DataFusion faster label Jan 18, 2022
left =
LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
// check [1..idx] if contains current plan to avoid infinite loop
if mut_plans[1..idx].contains(&mut_plans[idx])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we use contains() which requests Logical Plan implement PartialEq trait @houqp

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I am thinking that we might be able to avoid this extra PartialEq and contains check overhead by breaking this up into two loops:

  1. extract the loop logic into its own function
  2. first loop through plans once, for each plan that doesn't have join key matches, we push its index in plans into a retry vector
  3. second loop to go through the retry vector

This way, we can guarantee every plan in the original plans vector is evaluated at most twice without getting into an infinite loop while avoiding particleq compare and clones. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh never mind, just noticed @alamb already implemented a similar approach at xudong963#2 :D nice!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'll look it carefully! 👍

@alamb
Copy link
Contributor

alamb commented Jan 18, 2022

As a future follow up task, I think there is value in handling this plan rewrite in our optimizer layer so dataframe API users can benefit from it as well.

@houqp what do you have in mind?

With the dataframe API users can't create implicit joins the way you can with a sql query like SELECT .. FROM foo, bar;

The only thing I can think of would be to extend FilterPushdown in a way that would try and push down column = column type expressions from Filter nodes into Join nodes?

So like

Filter(a = b)
  Join (Cross)
    TableScan A
    TableScan B

To

  Join (Inner) exprs: {a = b}
    TableScan A
    TableScan B

And

Filter(a = b)
  Join (Inner) exprs: {a2 = b2}
    TableScan A
    TableScan B
Filter(a = b)
  Join (Inner) exprs: {a2 = b2 AND a = b}
    TableScan A
    TableScan B

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @xudong963 -- this is looking much better.

I am a little worried about having to implement PartialEq for all nodes -- I think we might be able to avoid doing so with a worklist type algorithm.

Here is a sketch of doing so xudong963#2 as perhaps some inspiration

@@ -214,8 +220,14 @@ pub struct Extension {
pub node: Arc<dyn UserDefinedLogicalNode + Send + Sync>,
}

impl PartialEq for Extension {
fn eq(&self, _other: &Self) -> bool {
todo!()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you leave todo!() here it means the code will assert for plans with UserDefinedPlanNodes and cross joins (which is likely not possible given the inputs are from the FROM list, and thus shoudl be Select (for subqueries) or TableScan for table references

For the future, I think you can change

#[derive(Clone)]
pub struct Extension {
    /// The runtime extension operator
    pub node: Arc<dyn UserDefinedLogicalNode + PartialEq Send + Sync>,
}

to the following (aka force the UserDefinedLogicalNode to implement PartialEq):

#[derive(Clone, PartialEq)]
pub struct Extension {
    /// The runtime extension operator
    pub node: Arc<dyn UserDefinedLogicalNode + PartialEq Send + Sync>,
}

@alamb
Copy link
Contributor

alamb commented Jan 18, 2022

Also, @xudong963 thank you for taking on this task. Really neat to see

@houqp
Copy link
Member

houqp commented Jan 19, 2022

The only thing I can think of would be to extend FilterPushdown in a way that would try and push down column = column type expressions from Filter nodes into Join nodes?

Yes, this is exactly what I have in mind. With this optimizer rule, the sql planner can just naively plan these types of queries as cross joins and let the optimizer rewrite them into inner joins whenever applicable. And dataframe api users can write dumb cross join queries that run as fast as optimized inner join queries.

@alamb
Copy link
Contributor

alamb commented Jan 19, 2022

Yes, this is exactly what I have in mind.

Filed #1612 to track the idea. It is a very good one @houqp 👍

@xudong963
Copy link
Member Author

Thanks, @alamb ❤️. I look at xudong963#2 carefully and I think the implementation is very elegant, so I directly use it with only minor fixes.

Copy link
Member Author

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again, @alamb PTAL. BTW, the annotation is very nice, taking you as the model!

// Search all remaining plans for the next to
// join. Prefer the first one that has a join
// predicate in the predicate lists
let plan_with_idx = remaining_plans.iter().enumerate().find(|(_idx, plan)| {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I use plan_with_idx to replace idx

if join_keys.is_empty() {
left =
LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
assert!(plan_with_idx.is_none());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very tight!

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me (though I am somewhat biased :)) @houqp would you like to take another look / review ?

@houqp houqp merged commit 2f702e4 into apache:master Jan 21, 2022
@houqp
Copy link
Member

houqp commented Jan 21, 2022

Nice work @xudong963 @alamb !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate performance Make DataFusion faster sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

sql planner creates cross join instead of inner join from select predicates
4 participants