diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index ababb52020d7..78911313efaf 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -16,9 +16,9 @@ use crate::datasource::datasource::TableProviderFilterPushDown; use crate::execution::context::ExecutionProps; -use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection}; +use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection, Union}; use crate::logical_plan::{ - and, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan, + and, col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan, }; use crate::logical_plan::{DFSchema, Expr}; use crate::optimizer::optimizer::OptimizerRule; @@ -394,8 +394,29 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { // sort is filter-commutable push_down(&state, plan) } - LogicalPlan::Union(_) => { - // union all is filter-commutable + LogicalPlan::Union(Union { + inputs: _, + schema, + alias: _, + }) => { + // union changing all qualifiers while building logical plan so we need + // to rewrite filters to push unqualified columns to inputs + let projection = schema + .fields() + .iter() + .map(|field| (field.qualified_name(), col(field.name()))) + .collect::>(); + + // rewriting predicate expressions using unqualified names as replacements + if !projection.is_empty() { + for (predicate, columns) in state.filters.iter_mut() { + *predicate = rewrite(predicate, &projection)?; + + columns.clear(); + utils::expr_to_columns(predicate, columns)?; + } + } + push_down(&state, plan) } LogicalPlan::Limit(Limit { input, .. }) => { @@ -574,7 +595,9 @@ fn rewrite(expr: &Expr, projection: &HashMap) -> Result { mod tests { use super::*; use crate::datasource::TableProvider; - use crate::logical_plan::{lit, sum, DFSchema, Expr, LogicalPlanBuilder, Operator}; + use crate::logical_plan::{ + lit, sum, union_with_alias, DFSchema, Expr, LogicalPlanBuilder, Operator, + }; use crate::physical_plan::ExecutionPlan; use crate::test::*; use crate::{logical_plan::col, prelude::JoinType}; @@ -901,6 +924,27 @@ mod tests { Ok(()) } + #[test] + fn union_all_with_alias() -> Result<()> { + let table_scan = test_table_scan()?; + let union = + union_with_alias(table_scan.clone(), table_scan, Some("t".to_string()))?; + + let plan = LogicalPlanBuilder::from(union) + .filter(col("t.a").eq(lit(1i64)))? + .build()?; + + // filter appears below Union without relation qualifier + let expected = "\ + Union\ + \n Filter: #a = Int64(1)\ + \n TableScan: test projection=None\ + \n Filter: #a = Int64(1)\ + \n TableScan: test projection=None"; + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + /// verifies that filters with the same columns are correctly placed #[test] fn filter_2_breaks_limits() -> Result<()> {