From a69f8fcc6be976aae73e0ef65b210c4fbf76ec63 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 6 Dec 2023 16:43:09 +0300 Subject: [PATCH 1/3] Relax schema check for optimize projections. --- datafusion/common/src/dfschema.rs | 13 ++++++ .../optimizer/src/optimize_projections.rs | 5 ++ datafusion/optimizer/src/optimizer.rs | 46 ++++++++++++++++++- datafusion/sqllogictest/test_files/select.slt | 9 ++++ 4 files changed, 72 insertions(+), 1 deletion(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 52cd85675824..20cc98982974 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -499,6 +499,19 @@ impl DFSchema { }) } + /// Returns true if the two schemas have the same data types at same indices. + /// Returns false otherwise. + pub fn equivalent_types(&self, other: &Self) -> bool { + if self.fields().len() != other.fields().len() { + return false; + } + let self_fields = self.fields().iter(); + let other_fields = other.fields().iter(); + self_fields.zip(other_fields).all(|(f1, f2)| { + Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type()) + }) + } + /// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint /// than datatype_is_semantically_equal in that a Dictionary type is logically /// equal to a plain V type, but not semantically equal. Dictionary is also diff --git a/datafusion/optimizer/src/optimize_projections.rs b/datafusion/optimizer/src/optimize_projections.rs index bbf704a83c55..d81fea481133 100644 --- a/datafusion/optimizer/src/optimize_projections.rs +++ b/datafusion/optimizer/src/optimize_projections.rs @@ -70,6 +70,11 @@ impl OptimizerRule for OptimizeProjections { fn apply_order(&self) -> Option { None } + + // This rule may produce schemas with different field names. + fn exact_schema_check(&self) -> bool { + false + } } /// Removes unnecessary columns (e.g Columns that are not referred at the output schema and diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 7af46ed70adf..d1fd3957477c 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -73,6 +73,13 @@ pub trait OptimizerRule { fn apply_order(&self) -> Option { None } + + /// Flag indicates whether schema check will be done + /// in exact mode (names, and types are equal). + /// Otherwise only types will be checked. + fn exact_schema_check(&self) -> bool { + true + } } /// Options to control the DataFusion Optimizer. @@ -287,11 +294,24 @@ impl Optimizer { log_plan(&format!("Optimizer input (pass {i})"), &new_plan); for rule in &self.rules { + let exact_check = rule.exact_schema_check(); let result = self.optimize_recursively(rule, &new_plan, config) .and_then(|plan| { if let Some(plan) = &plan { - assert_schema_is_the_same(rule.name(), &new_plan, plan)?; + if exact_check { + assert_schema_is_the_same( + rule.name(), + &new_plan, + plan, + )?; + } else { + assert_schema_is_the_same_relax( + rule.name(), + &new_plan, + plan, + )?; + } } Ok(plan) }); @@ -445,6 +465,30 @@ pub(crate) fn assert_schema_is_the_same( } } +/// Returns an error if plans have different schemas. +/// It only considers field types during comparison. +pub(crate) fn assert_schema_is_the_same_relax( + rule_name: &str, + prev_plan: &LogicalPlan, + new_plan: &LogicalPlan, +) -> Result<()> { + let equivalent = new_plan.schema().equivalent_types(prev_plan.schema()); + + if !equivalent { + let e = DataFusionError::Internal(format!( + "Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}", + prev_plan.schema(), + new_plan.schema() + )); + Err(DataFusionError::Context( + String::from(rule_name), + Box::new(e), + )) + } else { + Ok(()) + } +} + #[cfg(test)] mod tests { use crate::optimizer::Optimizer; diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index bb81c5a9a138..1a8c42d87249 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -1041,3 +1041,12 @@ drop table annotated_data_finite2; statement ok drop table t; + +statement ok +create table t(x bigint, y bigint) as values (1,2), (1,3); + +query II +select z+1, y from (select x+1 as z, y from t) where y > 1; +---- +3 2 +3 3 From 81c48bb029452868a9fbedf1f3d1987b3de34d0f Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 6 Dec 2023 17:30:18 +0300 Subject: [PATCH 2/3] Minor changes --- datafusion/common/src/dfschema.rs | 13 ------ .../optimizer/src/optimize_projections.rs | 27 +++++++---- datafusion/optimizer/src/optimizer.rs | 46 +------------------ 3 files changed, 20 insertions(+), 66 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 20cc98982974..52cd85675824 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -499,19 +499,6 @@ impl DFSchema { }) } - /// Returns true if the two schemas have the same data types at same indices. - /// Returns false otherwise. - pub fn equivalent_types(&self, other: &Self) -> bool { - if self.fields().len() != other.fields().len() { - return false; - } - let self_fields = self.fields().iter(); - let other_fields = other.fields().iter(); - self_fields.zip(other_fields).all(|(f1, f2)| { - Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type()) - }) - } - /// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint /// than datatype_is_semantically_equal in that a Dictionary type is logically /// equal to a plain V type, but not semantically equal. Dictionary is also diff --git a/datafusion/optimizer/src/optimize_projections.rs b/datafusion/optimizer/src/optimize_projections.rs index d81fea481133..0643e6b4b9ba 100644 --- a/datafusion/optimizer/src/optimize_projections.rs +++ b/datafusion/optimizer/src/optimize_projections.rs @@ -70,11 +70,6 @@ impl OptimizerRule for OptimizeProjections { fn apply_order(&self) -> Option { None } - - // This rule may produce schemas with different field names. - fn exact_schema_check(&self) -> bool { - false - } } /// Removes unnecessary columns (e.g Columns that are not referred at the output schema and @@ -410,9 +405,25 @@ fn merge_consecutive_projections(proj: &Projection) -> Result .iter() .map(|expr| rewrite_expr(expr, prev_projection)) .collect::>>>()?; - new_exprs - .map(|exprs| Projection::try_new(exprs, prev_projection.input.clone())) - .transpose() + if let Some(new_exprs) = new_exprs { + let new_exprs = new_exprs + .into_iter() + .zip(proj.expr.iter()) + .map(|(new_expr, old_expr)| { + let new_name = new_expr.name_for_alias()?; + let old_name = old_expr.name_for_alias()?; + Ok(if new_name != old_name { + // Make sure name is preserved after merging + new_expr.alias(old_name) + } else { + new_expr + }) + }) + .collect::>>()?; + Projection::try_new(new_exprs, prev_projection.input.clone()).map(Some) + } else { + Ok(None) + } } /// Trim Expression diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index d1fd3957477c..7af46ed70adf 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -73,13 +73,6 @@ pub trait OptimizerRule { fn apply_order(&self) -> Option { None } - - /// Flag indicates whether schema check will be done - /// in exact mode (names, and types are equal). - /// Otherwise only types will be checked. - fn exact_schema_check(&self) -> bool { - true - } } /// Options to control the DataFusion Optimizer. @@ -294,24 +287,11 @@ impl Optimizer { log_plan(&format!("Optimizer input (pass {i})"), &new_plan); for rule in &self.rules { - let exact_check = rule.exact_schema_check(); let result = self.optimize_recursively(rule, &new_plan, config) .and_then(|plan| { if let Some(plan) = &plan { - if exact_check { - assert_schema_is_the_same( - rule.name(), - &new_plan, - plan, - )?; - } else { - assert_schema_is_the_same_relax( - rule.name(), - &new_plan, - plan, - )?; - } + assert_schema_is_the_same(rule.name(), &new_plan, plan)?; } Ok(plan) }); @@ -465,30 +445,6 @@ pub(crate) fn assert_schema_is_the_same( } } -/// Returns an error if plans have different schemas. -/// It only considers field types during comparison. -pub(crate) fn assert_schema_is_the_same_relax( - rule_name: &str, - prev_plan: &LogicalPlan, - new_plan: &LogicalPlan, -) -> Result<()> { - let equivalent = new_plan.schema().equivalent_types(prev_plan.schema()); - - if !equivalent { - let e = DataFusionError::Internal(format!( - "Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}", - prev_plan.schema(), - new_plan.schema() - )); - Err(DataFusionError::Context( - String::from(rule_name), - Box::new(e), - )) - } else { - Ok(()) - } -} - #[cfg(test)] mod tests { use crate::optimizer::Optimizer; From e698d7c1a6e85e29dd5fc16411e9fbc24a099662 Mon Sep 17 00:00:00 2001 From: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Date: Thu, 7 Dec 2023 08:48:32 +0300 Subject: [PATCH 3/3] Update datafusion/optimizer/src/optimize_projections.rs Co-authored-by: jakevin --- datafusion/optimizer/src/optimize_projections.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections.rs b/datafusion/optimizer/src/optimize_projections.rs index 0643e6b4b9ba..440e12cc26d7 100644 --- a/datafusion/optimizer/src/optimize_projections.rs +++ b/datafusion/optimizer/src/optimize_projections.rs @@ -410,14 +410,7 @@ fn merge_consecutive_projections(proj: &Projection) -> Result .into_iter() .zip(proj.expr.iter()) .map(|(new_expr, old_expr)| { - let new_name = new_expr.name_for_alias()?; - let old_name = old_expr.name_for_alias()?; - Ok(if new_name != old_name { - // Make sure name is preserved after merging - new_expr.alias(old_name) - } else { - new_expr - }) + new_expr.alias_if_changed(old_expr.name_for_alias()?) }) .collect::>>()?; Projection::try_new(new_exprs, prev_projection.input.clone()).map(Some)