From edd7990498baa31457c17cad1ff12e85a74685f8 Mon Sep 17 00:00:00 2001 From: jackwener Date: Mon, 26 Dec 2022 16:30:00 +0800 Subject: [PATCH 1/2] remove `SubqueryFilterToJoin` --- .../optimizer/src/decorrelate_where_in.rs | 182 +++++++- datafusion/optimizer/src/lib.rs | 1 - datafusion/optimizer/src/optimizer.rs | 2 - .../optimizer/src/subquery_filter_to_join.rs | 421 ------------------ 4 files changed, 179 insertions(+), 427 deletions(-) delete mode 100644 datafusion/optimizer/src/subquery_filter_to_join.rs diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs index bcc12072316b..e49151614107 100644 --- a/datafusion/optimizer/src/decorrelate_where_in.rs +++ b/datafusion/optimizer/src/decorrelate_where_in.rs @@ -219,11 +219,187 @@ mod tests { use super::*; use crate::test::*; use datafusion_common::Result; - use datafusion_expr::{ - col, in_subquery, lit, logical_plan::LogicalPlanBuilder, not_in_subquery, - }; + use datafusion_expr::{and, binary_expr, col, in_subquery, lit, logical_plan::LogicalPlanBuilder, not_in_subquery, Operator, or}; use std::ops::Add; + fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { + assert_optimized_plan_eq_display_indent( + Arc::new(DecorrelateWhereIn::new()), + plan, + expected, + ); + Ok(()) + } + + fn test_subquery_with_name(name: &str) -> Result> { + let table_scan = test_table_scan_with_name(name)?; + Ok(Arc::new( + LogicalPlanBuilder::from(table_scan) + .project(vec![col("c")])? + .build()?, + )) + } + + /// Test for several IN subquery expressions + #[test] + fn in_subquery_multiple() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(and( + in_subquery(col("c"), test_subquery_with_name("sq_1")?), + in_subquery(col("b"), test_subquery_with_name("sq_2")?), + ))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: test.b [b:UInt32]\ + \n LeftSemi Join: test.b = sq_2.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.c = sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: sq_1.c [c:UInt32]\ + \n TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: sq_2.c [c:UInt32]\ + \n TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_equal(&plan, expected) + } + + /// Test for IN subquery with additional AND filter + #[test] + fn in_subquery_with_and_filters() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(and( + in_subquery(col("c"), test_subquery_with_name("sq")?), + and( + binary_expr(col("a"), Operator::Eq, lit(1_u32)), + binary_expr(col("b"), Operator::Lt, lit(30_u32)), + ), + ))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: test.b [b:UInt32]\ + \n Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: sq.c [c:UInt32]\ + \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_equal(&plan, expected) + } + + /// Test for IN subquery with additional OR filter + /// filter expression not modified + #[test] + fn in_subquery_with_or_filters() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(or( + and( + binary_expr(col("a"), Operator::Eq, lit(1_u32)), + binary_expr(col("b"), Operator::Lt, lit(30_u32)), + ), + in_subquery(col("c"), test_subquery_with_name("sq")?), + ))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: test.b [b:UInt32]\ + \n Filter: test.a = UInt32(1) AND test.b < UInt32(30) OR test.c IN () [a:UInt32, b:UInt32, c:UInt32]\ + \n Subquery: [c:UInt32]\ + \n Projection: sq.c [c:UInt32]\ + \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_equal(&plan, expected) + } + + #[test] + fn in_subquery_with_and_or_filters() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(and( + or( + binary_expr(col("a"), Operator::Eq, lit(1_u32)), + in_subquery(col("b"), test_subquery_with_name("sq1")?), + ), + in_subquery(col("c"), test_subquery_with_name("sq2")?), + ))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: test.b [b:UInt32]\ + \n Filter: (test.a = UInt32(1) OR test.b IN ()) AND test.c IN () [a:UInt32, b:UInt32, c:UInt32]\ + \n Subquery: [c:UInt32]\ + \n Projection: sq1.c [c:UInt32]\ + \n TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]\ + \n Subquery: [c:UInt32]\ + \n Projection: sq2.c [c:UInt32]\ + \n TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_equal(&plan, expected) + } + + /// Test for nested IN subqueries + #[test] + fn in_subquery_nested() -> Result<()> { + let table_scan = test_table_scan()?; + + let subquery = LogicalPlanBuilder::from(test_table_scan_with_name("sq")?) + .filter(in_subquery(col("a"), test_subquery_with_name("sq_nested")?))? + .project(vec![col("a")])? + .build()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .filter(in_subquery(col("b"), Arc::new(subquery)))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: test.b [b:UInt32]\ + \n LeftSemi Join: test.b = sq.a [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: sq.a [a:UInt32]\ + \n LeftSemi Join: sq.a = sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: sq_nested.c [c:UInt32]\ + \n TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_equal(&plan, expected) + } + + /// Test for filter input modification in case filter not supported + /// Outer filter expression not modified while inner converted to join + #[test] + fn in_subquery_input_modified() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(in_subquery(col("c"), test_subquery_with_name("sq_inner")?))? + .project(vec![col("b"), col("c")])? + .alias("wrapped")? + .filter(or( + binary_expr(col("b"), Operator::Lt, lit(30_u32)), + in_subquery(col("c"), test_subquery_with_name("sq_outer")?), + ))? + .project(vec![col("b")])? + .build()?; + + let expected = "Projection: wrapped.b [b:UInt32]\ + \n Filter: wrapped.b < UInt32(30) OR wrapped.c IN () [b:UInt32, c:UInt32]\ + \n Subquery: [c:UInt32]\ + \n Projection: sq_outer.c [c:UInt32]\ + \n TableScan: sq_outer [a:UInt32, b:UInt32, c:UInt32]\ + \n SubqueryAlias: wrapped [b:UInt32, c:UInt32]\ + \n Projection: test.b, test.c [b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.c = sq_inner.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: sq_inner.c [c:UInt32]\ + \n TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_equal(&plan, expected) + } + #[cfg(test)] #[ctor::ctor] fn init() { diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index a4804ca5b0da..5ffd307481ec 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -32,7 +32,6 @@ pub mod push_down_projection; pub mod scalar_subquery_to_join; pub mod simplify_expressions; pub mod single_distinct_to_groupby; -pub mod subquery_filter_to_join; pub mod type_coercion; pub mod utils; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 6fe94e792e6d..625eb0068071 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -34,7 +34,6 @@ use crate::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate; use crate::scalar_subquery_to_join::ScalarSubqueryToJoin; use crate::simplify_expressions::SimplifyExpressions; use crate::single_distinct_to_groupby::SingleDistinctToGroupBy; -use crate::subquery_filter_to_join::SubqueryFilterToJoin; use crate::type_coercion::TypeCoercion; use crate::unwrap_cast_in_comparison::UnwrapCastInComparison; use chrono::{DateTime, Utc}; @@ -242,7 +241,6 @@ impl Optimizer { Arc::new(DecorrelateWhereExists::new()), Arc::new(DecorrelateWhereIn::new()), Arc::new(ScalarSubqueryToJoin::new()), - Arc::new(SubqueryFilterToJoin::new()), // simplify expressions does not simplify expressions in subqueries, so we // run it again after running the optimizations that potentially converted // subqueries to joins diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs b/datafusion/optimizer/src/subquery_filter_to_join.rs deleted file mode 100644 index 696f911a1020..000000000000 --- a/datafusion/optimizer/src/subquery_filter_to_join.rs +++ /dev/null @@ -1,421 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Optimizer rule for rewriting subquery filters to joins -//! -//! It handles standalone parts of logical conjunction expressions, i.e. -//! ```text -//! WHERE t1.f IN (SELECT f FROM t2) AND t2.f = 'x' -//! ``` -//! will be rewritten, but -//! ```text -//! WHERE t1.f IN (SELECT f FROM t2) OR t2.f = 'x' -//! ``` -//! won't -use crate::optimizer::ApplyOrder; -use crate::{utils, OptimizerConfig, OptimizerRule}; -use datafusion_common::{DataFusionError, Result}; -use datafusion_expr::{ - expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}, - logical_plan::{ - builder::build_join_schema, Filter, Join, JoinConstraint, JoinType, LogicalPlan, - }, - Expr, -}; -use std::sync::Arc; - -/// Optimizer rule for rewriting subquery filters to joins -#[derive(Default)] -pub struct SubqueryFilterToJoin {} - -impl SubqueryFilterToJoin { - #[allow(missing_docs)] - pub fn new() -> Self { - Self {} - } -} - -impl OptimizerRule for SubqueryFilterToJoin { - fn try_optimize( - &self, - plan: &LogicalPlan, - _config: &dyn OptimizerConfig, - ) -> Result> { - match plan { - LogicalPlan::Filter(filter) => { - // Apply optimizer rule to current input - let input = filter.input.as_ref().clone(); - - // Splitting filter expression into components by AND - let filters = utils::split_conjunction(&filter.predicate); - - // Searching for subquery-based filters - let (subquery_filters, regular_filters): (Vec<&Expr>, Vec<&Expr>) = - filters - .into_iter() - .partition(|&e| matches!(e, Expr::InSubquery { .. })); - - // Check all subquery filters could be rewritten - // - // In case of expressions which could not be rewritten - // return original filter with optimized input - let mut subqueries_in_regular = vec![]; - regular_filters.iter().try_for_each(|&e| { - extract_subquery_filters(e, &mut subqueries_in_regular) - })?; - - if !subqueries_in_regular.is_empty() { - return Ok(Some(LogicalPlan::Filter(Filter::try_new( - filter.predicate.clone(), - Arc::new(input), - )?))); - }; - - // Add subquery joins to new_input - // optimized_input value should retain for possible optimization rollback - let opt_result = subquery_filters.iter().try_fold( - input.clone(), - |input, &e| match e { - Expr::InSubquery { - expr, - subquery, - negated, - } => { - let right_input = self.try_optimize( - &subquery.subquery, - _config - )?.unwrap_or_else(||subquery.subquery.as_ref().clone()); - let right_schema = right_input.schema(); - if right_schema.fields().len() != 1 { - return Err(DataFusionError::Plan( - "Only single column allowed in InSubquery" - .to_string(), - )); - }; - - let right_key = right_schema.field(0).qualified_column(); - let left_key = match *expr.clone() { - Expr::Column(col) => col, - _ => return Err(DataFusionError::NotImplemented( - "Filtering by expression not implemented for InSubquery" - .to_string(), - )), - }; - - let join_type = if *negated { - JoinType::LeftAnti - } else { - JoinType::LeftSemi - }; - - let schema = build_join_schema( - input.schema(), - right_schema, - &join_type, - )?; - - Ok(LogicalPlan::Join(Join { - left: Arc::new(input), - right: Arc::new(right_input), - on: vec![(Expr::Column(left_key), Expr::Column(right_key))], - filter: None, - join_type, - join_constraint: JoinConstraint::On, - schema: Arc::new(schema), - null_equals_null: false, - })) - } - _ => Err(DataFusionError::Plan( - "Unknown expression while rewriting subquery to joins" - .to_string(), - )), - } - ); - - // In case of expressions which could not be rewritten - // return original filter with optimized input - let new_input = match opt_result { - Ok(plan) => plan, - Err(_) => { - return Ok(Some(LogicalPlan::Filter(Filter::try_new( - filter.predicate.clone(), - Arc::new(input), - )?))) - } - }; - - // Apply regular filters to join output if some or just return join - if regular_filters.is_empty() { - Ok(Some(new_input)) - } else { - Ok(Some(utils::add_filter(new_input, ®ular_filters)?)) - } - } - _ => Ok(None), - } - } - - fn name(&self) -> &str { - "subquery_filter_to_join" - } - - fn apply_order(&self) -> Option { - Some(ApplyOrder::TopDown) - } -} - -fn extract_subquery_filters(expression: &Expr, extracted: &mut Vec) -> Result<()> { - struct InSubqueryVisitor<'a> { - accum: &'a mut Vec, - } - - impl ExpressionVisitor for InSubqueryVisitor<'_> { - fn pre_visit(self, expr: &Expr) -> Result> { - if let Expr::InSubquery { .. } = expr { - self.accum.push(expr.to_owned()); - } - Ok(Recursion::Continue(self)) - } - } - - expression.accept(InSubqueryVisitor { accum: extracted })?; - Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::test::*; - use datafusion_expr::{ - and, binary_expr, col, in_subquery, lit, logical_plan::LogicalPlanBuilder, - not_in_subquery, or, Operator, - }; - - fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { - assert_optimized_plan_eq_display_indent( - Arc::new(SubqueryFilterToJoin::new()), - plan, - expected, - ); - Ok(()) - } - - fn test_subquery_with_name(name: &str) -> Result> { - let table_scan = test_table_scan_with_name(name)?; - Ok(Arc::new( - LogicalPlanBuilder::from(table_scan) - .project(vec![col("c")])? - .build()?, - )) - } - - /// Test for single IN subquery filter - #[test] - fn in_subquery_simple() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .filter(in_subquery(col("c"), test_subquery_with_name("sq")?))? - .project(vec![col("test.b")])? - .build()?; - - let expected = "Projection: test.b [b:UInt32]\ - \n LeftSemi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: sq.c [c:UInt32]\ - \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; - - assert_optimized_plan_equal(&plan, expected) - } - - /// Test for single NOT IN subquery filter - #[test] - fn not_in_subquery_simple() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .filter(not_in_subquery(col("c"), test_subquery_with_name("sq")?))? - .project(vec![col("test.b")])? - .build()?; - - let expected = "Projection: test.b [b:UInt32]\ - \n LeftAnti Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: sq.c [c:UInt32]\ - \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; - - assert_optimized_plan_equal(&plan, expected) - } - - /// Test for several IN subquery expressions - #[test] - fn in_subquery_multiple() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .filter(and( - in_subquery(col("c"), test_subquery_with_name("sq_1")?), - in_subquery(col("b"), test_subquery_with_name("sq_2")?), - ))? - .project(vec![col("test.b")])? - .build()?; - - let expected = "Projection: test.b [b:UInt32]\ - \n LeftSemi Join: test.b = sq_2.c [a:UInt32, b:UInt32, c:UInt32]\ - \n LeftSemi Join: test.c = sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: sq_1.c [c:UInt32]\ - \n TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: sq_2.c [c:UInt32]\ - \n TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]"; - - assert_optimized_plan_equal(&plan, expected) - } - - /// Test for IN subquery with additional AND filter - #[test] - fn in_subquery_with_and_filters() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .filter(and( - in_subquery(col("c"), test_subquery_with_name("sq")?), - and( - binary_expr(col("a"), Operator::Eq, lit(1_u32)), - binary_expr(col("b"), Operator::Lt, lit(30_u32)), - ), - ))? - .project(vec![col("test.b")])? - .build()?; - - let expected = "Projection: test.b [b:UInt32]\ - \n Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]\ - \n LeftSemi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: sq.c [c:UInt32]\ - \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; - - assert_optimized_plan_equal(&plan, expected) - } - - /// Test for IN subquery with additional OR filter - /// filter expression not modified - #[test] - fn in_subquery_with_or_filters() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .filter(or( - and( - binary_expr(col("a"), Operator::Eq, lit(1_u32)), - binary_expr(col("b"), Operator::Lt, lit(30_u32)), - ), - in_subquery(col("c"), test_subquery_with_name("sq")?), - ))? - .project(vec![col("test.b")])? - .build()?; - - let expected = "Projection: test.b [b:UInt32]\ - \n Filter: test.a = UInt32(1) AND test.b < UInt32(30) OR test.c IN () [a:UInt32, b:UInt32, c:UInt32]\ - \n Subquery: [c:UInt32]\ - \n Projection: sq.c [c:UInt32]\ - \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - - assert_optimized_plan_equal(&plan, expected) - } - - #[test] - fn in_subquery_with_and_or_filters() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .filter(and( - or( - binary_expr(col("a"), Operator::Eq, lit(1_u32)), - in_subquery(col("b"), test_subquery_with_name("sq1")?), - ), - in_subquery(col("c"), test_subquery_with_name("sq2")?), - ))? - .project(vec![col("test.b")])? - .build()?; - - let expected = "Projection: test.b [b:UInt32]\ - \n Filter: (test.a = UInt32(1) OR test.b IN ()) AND test.c IN () [a:UInt32, b:UInt32, c:UInt32]\ - \n Subquery: [c:UInt32]\ - \n Projection: sq1.c [c:UInt32]\ - \n TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]\ - \n Subquery: [c:UInt32]\ - \n Projection: sq2.c [c:UInt32]\ - \n TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - - assert_optimized_plan_equal(&plan, expected) - } - - /// Test for nested IN subqueries - #[test] - fn in_subquery_nested() -> Result<()> { - let table_scan = test_table_scan()?; - - let subquery = LogicalPlanBuilder::from(test_table_scan_with_name("sq")?) - .filter(in_subquery(col("a"), test_subquery_with_name("sq_nested")?))? - .project(vec![col("a")])? - .build()?; - - let plan = LogicalPlanBuilder::from(table_scan) - .filter(in_subquery(col("b"), Arc::new(subquery)))? - .project(vec![col("test.b")])? - .build()?; - - let expected = "Projection: test.b [b:UInt32]\ - \n LeftSemi Join: test.b = sq.a [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: sq.a [a:UInt32]\ - \n LeftSemi Join: sq.a = sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: sq_nested.c [c:UInt32]\ - \n TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]"; - - assert_optimized_plan_equal(&plan, expected) - } - - /// Test for filter input modification in case filter not supported - /// Outer filter expression not modified while inner converted to join - #[test] - fn in_subquery_input_modified() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .filter(in_subquery(col("c"), test_subquery_with_name("sq_inner")?))? - .project(vec![col("b"), col("c")])? - .alias("wrapped")? - .filter(or( - binary_expr(col("b"), Operator::Lt, lit(30_u32)), - in_subquery(col("c"), test_subquery_with_name("sq_outer")?), - ))? - .project(vec![col("b")])? - .build()?; - - let expected = "Projection: wrapped.b [b:UInt32]\ - \n Filter: wrapped.b < UInt32(30) OR wrapped.c IN () [b:UInt32, c:UInt32]\ - \n Subquery: [c:UInt32]\ - \n Projection: sq_outer.c [c:UInt32]\ - \n TableScan: sq_outer [a:UInt32, b:UInt32, c:UInt32]\ - \n SubqueryAlias: wrapped [b:UInt32, c:UInt32]\ - \n Projection: test.b, test.c [b:UInt32, c:UInt32]\ - \n LeftSemi Join: test.c = sq_inner.c [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: sq_inner.c [c:UInt32]\ - \n TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]"; - - assert_optimized_plan_equal(&plan, expected) - } -} From 9af19bdff696f3c5065e9d9fe6e3307c6a200ed8 Mon Sep 17 00:00:00 2001 From: jackwener Date: Mon, 26 Dec 2022 16:37:51 +0800 Subject: [PATCH 2/2] correct UT --- .../optimizer/src/decorrelate_where_in.rs | 60 +++++++++++-------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs index e49151614107..c2a80ac2bb11 100644 --- a/datafusion/optimizer/src/decorrelate_where_in.rs +++ b/datafusion/optimizer/src/decorrelate_where_in.rs @@ -219,7 +219,10 @@ mod tests { use super::*; use crate::test::*; use datafusion_common::Result; - use datafusion_expr::{and, binary_expr, col, in_subquery, lit, logical_plan::LogicalPlanBuilder, not_in_subquery, Operator, or}; + use datafusion_expr::{ + and, binary_expr, col, in_subquery, lit, logical_plan::LogicalPlanBuilder, + not_in_subquery, or, Operator, + }; use std::ops::Add; fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { @@ -253,13 +256,15 @@ mod tests { .build()?; let expected = "Projection: test.b [b:UInt32]\ - \n LeftSemi Join: test.b = sq_2.c [a:UInt32, b:UInt32, c:UInt32]\ - \n LeftSemi Join: test.c = sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.b = __sq_2.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: sq_1.c [c:UInt32]\ - \n TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: sq_2.c [c:UInt32]\ - \n TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]"; + \n SubqueryAlias: __sq_1 [c:UInt32]\ + \n Projection: sq_1.c AS c [c:UInt32]\ + \n TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]\ + \n SubqueryAlias: __sq_2 [c:UInt32]\ + \n Projection: sq_2.c AS c [c:UInt32]\ + \n TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_equal(&plan, expected) } @@ -281,10 +286,11 @@ mod tests { let expected = "Projection: test.b [b:UInt32]\ \n Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]\ - \n LeftSemi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: sq.c [c:UInt32]\ - \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; + \n SubqueryAlias: __sq_1 [c:UInt32]\ + \n Projection: sq.c AS c [c:UInt32]\ + \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_equal(&plan, expected) } @@ -330,14 +336,15 @@ mod tests { .build()?; let expected = "Projection: test.b [b:UInt32]\ - \n Filter: (test.a = UInt32(1) OR test.b IN ()) AND test.c IN () [a:UInt32, b:UInt32, c:UInt32]\ + \n Filter: test.a = UInt32(1) OR test.b IN () [a:UInt32, b:UInt32, c:UInt32]\ \n Subquery: [c:UInt32]\ \n Projection: sq1.c [c:UInt32]\ \n TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]\ - \n Subquery: [c:UInt32]\ - \n Projection: sq2.c [c:UInt32]\ - \n TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; + \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ + \n SubqueryAlias: __sq_1 [c:UInt32]\ + \n Projection: sq2.c AS c [c:UInt32]\ + \n TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_equal(&plan, expected) } @@ -358,13 +365,15 @@ mod tests { .build()?; let expected = "Projection: test.b [b:UInt32]\ - \n LeftSemi Join: test.b = sq.a [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.b = __sq_1.a [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: sq.a [a:UInt32]\ - \n LeftSemi Join: sq.a = sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: sq_nested.c [c:UInt32]\ - \n TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]"; + \n SubqueryAlias: __sq_1 [a:UInt32]\ + \n Projection: sq.a AS a [a:UInt32]\ + \n LeftSemi Join: sq.a = __sq_2.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\ + \n SubqueryAlias: __sq_2 [c:UInt32]\ + \n Projection: sq_nested.c AS c [c:UInt32]\ + \n TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_equal(&plan, expected) } @@ -385,17 +394,18 @@ mod tests { .project(vec![col("b")])? .build()?; - let expected = "Projection: wrapped.b [b:UInt32]\ + let expected = "Projection: wrapped.b [b:UInt32]\ \n Filter: wrapped.b < UInt32(30) OR wrapped.c IN () [b:UInt32, c:UInt32]\ \n Subquery: [c:UInt32]\ \n Projection: sq_outer.c [c:UInt32]\ \n TableScan: sq_outer [a:UInt32, b:UInt32, c:UInt32]\ \n SubqueryAlias: wrapped [b:UInt32, c:UInt32]\ \n Projection: test.b, test.c [b:UInt32, c:UInt32]\ - \n LeftSemi Join: test.c = sq_inner.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: sq_inner.c [c:UInt32]\ - \n TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]"; + \n SubqueryAlias: __sq_1 [c:UInt32]\ + \n Projection: sq_inner.c AS c [c:UInt32]\ + \n TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_equal(&plan, expected) }