From e4fb267b63d92f4af23101580d1b7cbf94d23c1d Mon Sep 17 00:00:00 2001 From: jackwener Date: Wed, 14 Dec 2022 12:06:06 +0800 Subject: [PATCH] avoid every rule must recursive children in optimizer --- datafusion/optimizer/src/eliminate_limit.rs | 76 +++++++++++-------- datafusion/optimizer/src/optimizer.rs | 83 ++++++++++++++++++++- 2 files changed, 126 insertions(+), 33 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_limit.rs b/datafusion/optimizer/src/eliminate_limit.rs index b60b16e83bc5a..656a795b568f7 100644 --- a/datafusion/optimizer/src/eliminate_limit.rs +++ b/datafusion/optimizer/src/eliminate_limit.rs @@ -20,7 +20,9 @@ //! on a plan with an empty relation. //! This rule also removes OFFSET 0 from the [LogicalPlan] //! This saves time in planning and executing the query. -use crate::{utils, OptimizerConfig, OptimizerRule}; +use crate::optimizer::ApplyOrder; +use crate::optimizer::ApplyOrder::BottomUp; +use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan}; @@ -50,45 +52,49 @@ impl OptimizerRule for EliminateLimit { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + _optimizer_config: &mut OptimizerConfig, ) -> Result> { - if let LogicalPlan::Limit(limit) = plan { - match limit.fetch { - Some(fetch) => { - if fetch == 0 { - return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation { - produce_one_row: false, - schema: limit.input.schema().clone(), - }))); - } + let limit = match plan { + LogicalPlan::Limit(limit) => limit, + _ => return Ok(None), + }; + + match limit.fetch { + Some(fetch) => { + if fetch == 0 { + return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: limit.input.schema().clone(), + }))); } - None => { - if limit.skip == 0 { - let input = &*limit.input; - return Ok(Some(utils::optimize_children( - self, - input, - optimizer_config, - )?)); - } + } + None => { + if limit.skip == 0 { + let input = limit.input.as_ref(); + // input also can be Limit, so we should apply again. + return Ok(Some( + self.try_optimize(input, _optimizer_config)? + .unwrap_or_else(|| input.clone()), + )); } } } - Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)) + Ok(None) } fn name(&self) -> &str { "eliminate_limit" } + + fn apply_order(&self) -> Option { + Some(BottomUp) + } } #[cfg(test)] mod tests { use super::*; + use crate::optimizer::Optimizer; use crate::push_down_limit::PushDownLimit; use crate::test::*; use datafusion_common::Column; @@ -97,11 +103,15 @@ mod tests { logical_plan::{builder::LogicalPlanBuilder, JoinType}, sum, }; + use std::sync::Arc; fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { - let optimized_plan = EliminateLimit::new() - .optimize(plan, &mut OptimizerConfig::new()) + fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} + let optimizer = Optimizer::with_rules(vec![Arc::new(EliminateLimit::new())]); + let optimized_plan = optimizer + .optimize(plan, &mut OptimizerConfig::new(), observe) .expect("failed to optimize plan"); + let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); assert_eq!(plan.schema(), optimized_plan.schema()); @@ -112,11 +122,13 @@ mod tests { plan: &LogicalPlan, expected: &str, ) -> Result<()> { - let optimized_plan = PushDownLimit::new() - .optimize(plan, &mut OptimizerConfig::new()) - .expect("failed to optimize plan"); - let optimized_plan = EliminateLimit::new() - .optimize(&optimized_plan, &mut OptimizerConfig::new()) + fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} + let optimizer = Optimizer::with_rules(vec![ + Arc::new(EliminateLimit::new()), + Arc::new(PushDownLimit::new()), + ]); + let optimized_plan = optimizer + .optimize(plan, &mut OptimizerConfig::new(), observe) .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index dd4783ceb3b1c..219e87274e25a 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -69,6 +69,11 @@ pub trait OptimizerRule { /// A human readable name for this optimizer rule fn name(&self) -> &str; + + /// If a rule use default None, its should traverse recursively plan inside itself + fn apply_order(&self) -> Option { + None + } } /// Options to control the DataFusion Optimizer. @@ -157,6 +162,11 @@ pub struct Optimizer { pub rules: Vec>, } +pub enum ApplyOrder { + TopDown, + BottomUp, +} + impl Optimizer { /// Create a new optimizer using the recommended list of rules pub fn new(config: &OptimizerConfig) -> Self { @@ -223,7 +233,7 @@ impl Optimizer { log_plan(&format!("Optimizer input (pass {})", i), &new_plan); for rule in &self.rules { - let result = rule.try_optimize(&new_plan, optimizer_config); + let result = self.optimize_recursively(rule, &new_plan, optimizer_config); match result { Ok(Some(plan)) => { if !plan.schema().equivalent_names_and_types(new_plan.schema()) { @@ -284,6 +294,77 @@ impl Optimizer { debug!("Optimizer took {} ms", start_time.elapsed().as_millis()); Ok(new_plan) } + + fn optimize_node( + &self, + rule: &Arc, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result> { + rule.try_optimize(plan, optimizer_config) + } + + fn optimize_inputs( + &self, + rule: &Arc, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result> { + let inputs = plan.inputs(); + let result = inputs + .iter() + .map(|sub_plan| self.optimize_recursively(rule, sub_plan, optimizer_config)) + .collect::>>()?; + if result.is_empty() || result.iter().all(|o| o.is_none()) { + return Ok(None); + } + + let new_inputs = result + .into_iter() + .enumerate() + .map(|(i, o)| match o { + Some(plan) => plan, + None => (*(inputs.get(i).unwrap())).clone(), + }) + .collect::>(); + + Ok(Some(plan.with_new_inputs(new_inputs.as_slice())?)) + } + + pub fn optimize_recursively( + &self, + rule: &Arc, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result> { + match rule.apply_order() { + Some(order) => match order { + ApplyOrder::TopDown => { + let optimize_self_opt = + self.optimize_node(rule, plan, optimizer_config)?; + let optimize_inputs_opt = match &optimize_self_opt { + Some(optimized_plan) => { + self.optimize_inputs(rule, optimized_plan, optimizer_config)? + } + _ => self.optimize_inputs(rule, plan, optimizer_config)?, + }; + Ok(optimize_inputs_opt.or(optimize_self_opt)) + } + ApplyOrder::BottomUp => { + let optimize_inputs_opt = + self.optimize_inputs(rule, plan, optimizer_config)?; + let optimize_self_opt = match &optimize_inputs_opt { + Some(optimized_plan) => { + self.optimize_node(rule, optimized_plan, optimizer_config)? + } + _ => self.optimize_node(rule, plan, optimizer_config)?, + }; + Ok(optimize_self_opt.or(optimize_inputs_opt)) + } + }, + _ => rule.try_optimize(plan, optimizer_config), + } + } } /// Log the plan in debug/tracing mode after some part of the optimizer runs