Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Change equijoin keys from column to expression in logical join #4602

Merged
merged 13 commits into from
Dec 17, 2022

Conversation

ygf11
Copy link
Contributor

@ygf11 ygf11 commented Dec 13, 2022

Which issue does this PR close?

Closes #4389.

Rationale for this change

It can make the display representation of logical join more clean.

What changes are included in this PR?

  • Change equijoin keys from column to expression in logical join.
  • Remove additional projection in parse_join.
  • Add additional physical projection for physical join.
  • Adapter the expression keys in EliminateCrossJoin, FilterNullJoinKeys, PushDownFilter, SubqueryFilterToJoin optimization rules.
  • Change the equijoin keys of JoinNode in proto module.
  • Fix and add test cases.

Are these changes tested?

Yes

Are there any user-facing changes?

Yes, the JoinNode in proto module is also changed.

@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules sql SQL Planner labels Dec 13, 2022
@ygf11 ygf11 marked this pull request as ready for review December 14, 2022 11:30
let expected = if repartition_joins {
vec![
"ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems merging projection does not consider the ordering also.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I think that could be improved as a follow on PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will work on it.

" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + Int64(11)\", index: 3 }], 2)",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as t1.t1_id + Int64(11)]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name in join and projection matches now, both are t1.t1_id + Int64(11).
cc @liukun4515

" TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Inner Join: CAST(t1.t1_id AS Int64) + Int64(12) = CAST(t2.t2_id AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
" TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logical plan does not need any projections now.

@ygf11
Copy link
Contributor Author

ygf11 commented Dec 14, 2022

@alamb @mingmwang @jackwener @liukun4515, please take a look, thanks.

@alamb
Copy link
Contributor

alamb commented Dec 14, 2022

I plan to review this carefully tomorrow.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really cool PR. Thank you @ygf11

I went through the code and all the test changes carefully and I think this PR is 👨‍🍳 👌 very nice

I think it is ready to go, though plan to leave it open for another day or two in case @jackwener or anyone else would like a chance to review it.

&self,
right: &LogicalPlan,
join_type: JoinType,
join_keys: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe calling this parameter equi_exprs would better reflect what it is (exprs, not column keys) 🤔

@@ -1646,8 +1650,8 @@ pub struct Join {
pub left: Arc<LogicalPlan>,
/// Right input
pub right: Arc<LogicalPlan>,
/// Equijoin clause expressed as pairs of (left, right) join columns
pub on: Vec<(Column, Column)>,
/// Equijoin clause expressed as pairs of (left, right) join expressions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

" Projection: t2.a, t2.b, t2.c, t2.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32, t2.a * UInt32(2):UInt32]",
" TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]",
];
"Filter: t2.c < UInt32(20) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is quite cool to see the expressions directly in the Join without needing a projection to compute them

left_keys,
)?,
JoinConstraint::Using => {
// The equijoin keys in using-join must be column.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

\n TableScan: person\
\n Projection: orders.order_id, orders.customer_id, orders.o_item_id, orders.qty, orders.price, orders.delivered, orders.customer_id * Int64(2)\
\n TableScan: orders";
\n Inner Join: person.id + Int64(10) = orders.customer_id * Int64(2)\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this plan is certainly much nicer -- I don't understand where all the other columns used to come from but this is 👍

let expected = if repartition_joins {
vec![
"ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I think that could be improved as a follow on PR

@alamb
Copy link
Contributor

alamb commented Dec 16, 2022

Because this PR has been outstanding for some time, and it had a conflict I took the liberty to merge from master and resolve the conflicts

I am about out of time today but I plan to merge this PR first thing tomorrow.

Thanks again @ygf11 -- really great stuff.

@ygf11
Copy link
Contributor Author

ygf11 commented Dec 17, 2022

Thanks @alamb.

I resolve the remaining conflict, and ci success now.

@alamb alamb merged commit 8d36529 into apache:master Dec 17, 2022
@alamb
Copy link
Contributor

alamb commented Dec 17, 2022

Thanks again @ygf11 -- this is great work

@ursabot
Copy link

ursabot commented Dec 17, 2022

Benchmark runs are scheduled for baseline = 42b3a6c and contender = 8d36529. 8d36529 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@ygf11 ygf11 deleted the refactor-logical-join branch December 18, 2022 06:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Proposal: Improve the join keys of logical plan
3 participants