Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: skip EliminateCrossJoin rule if inner join with filter is found #7529

Merged
merged 2 commits into from
Sep 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 59 additions & 22 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,26 @@ impl OptimizerRule for EliminateCrossJoin {

let mut possible_join_keys: Vec<(Expr, Expr)> = vec![];
let mut all_inputs: Vec<LogicalPlan> = vec![];
match &input {
let did_flat_successfully = match &input {
LogicalPlan::Join(join) if (join.join_type == JoinType::Inner) => {
// The filter of inner join will lost, skip this rule.
// issue: https://github.com/apache/arrow-datafusion/issues/4844
if join.filter.is_some() {
return Ok(None);
}

flatten_join_inputs(
try_flatten_join_inputs(
&input,
&mut possible_join_keys,
&mut all_inputs,
)?;
}
LogicalPlan::CrossJoin(_) => {
flatten_join_inputs(
&input,
&mut possible_join_keys,
&mut all_inputs,
)?;
)?
}
LogicalPlan::CrossJoin(_) => try_flatten_join_inputs(
&input,
&mut possible_join_keys,
&mut all_inputs,
)?,
_ => {
return utils::optimize_children(self, plan, config);
}
};

if !did_flat_successfully {
return Ok(None);
}

let predicate = &filter.predicate;
Expand Down Expand Up @@ -137,13 +133,20 @@ impl OptimizerRule for EliminateCrossJoin {
}
}

fn flatten_join_inputs(
/// Recursively accumulate possible_join_keys and inputs from inner joins (including cross joins).
/// Returns a boolean indicating whether the flattening was successful.
fn try_flatten_join_inputs(
plan: &LogicalPlan,
possible_join_keys: &mut Vec<(Expr, Expr)>,
all_inputs: &mut Vec<LogicalPlan>,
) -> Result<()> {
) -> Result<bool> {
let children = match plan {
LogicalPlan::Join(join) => {
LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {
if join.filter.is_some() {
// The filter of inner join will lost, skip this rule.
// issue: https://github.com/apache/arrow-datafusion/issues/4844
return Ok(false);
}
possible_join_keys.extend(join.on.clone());
let left = &*(join.left);
let right = &*(join.right);
Expand All @@ -163,18 +166,22 @@ fn flatten_join_inputs(
match *child {
LogicalPlan::Join(left_join) => {
if left_join.join_type == JoinType::Inner {
flatten_join_inputs(child, possible_join_keys, all_inputs)?;
if !try_flatten_join_inputs(child, possible_join_keys, all_inputs)? {
return Ok(false);
}
} else {
all_inputs.push((*child).clone());
}
}
LogicalPlan::CrossJoin(_) => {
flatten_join_inputs(child, possible_join_keys, all_inputs)?;
if !try_flatten_join_inputs(child, possible_join_keys, all_inputs)? {
return Ok(false);
}
}
_ => all_inputs.push((*child).clone()),
}
}
Ok(())
Ok(true)
}

fn find_inner_join(
Expand Down Expand Up @@ -363,6 +370,12 @@ mod tests {
assert_eq!(plan.schema(), optimized_plan.schema())
}

fn assert_optimization_rule_fails(plan: &LogicalPlan) {
let rule = EliminateCrossJoin::new();
let optimized_plan = rule.try_optimize(plan, &OptimizerContext::new()).unwrap();
assert!(optimized_plan.is_none());
}

#[test]
fn eliminate_cross_with_simple_and() -> Result<()> {
let t1 = test_table_scan_with_name("t1")?;
Expand Down Expand Up @@ -531,6 +544,30 @@ mod tests {
Ok(())
}

#[test]
/// See https://github.com/apache/arrow-datafusion/issues/7530
fn eliminate_cross_not_possible_nested_inner_join_with_filter() -> Result<()> {
let t1 = test_table_scan_with_name("t1")?;
let t2 = test_table_scan_with_name("t2")?;
let t3 = test_table_scan_with_name("t3")?;

// could not eliminate to inner join with filter
let plan = LogicalPlanBuilder::from(t1)
.join(
t3,
JoinType::Inner,
(vec!["t1.a"], vec!["t3.a"]),
Some(col("t1.a").gt(lit(20u32))),
)?
.join(t2, JoinType::Inner, (vec!["t1.a"], vec!["t2.a"]), None)?
.filter(col("t1.a").gt(lit(15u32)))?
.build()?;

assert_optimization_rule_fails(&plan);

Ok(())
}

#[test]
/// ```txt
/// filter: a.id = b.id and a.id = c.id
Expand Down