From 0f1a20d80fad8251e4da686701f875eff916c75c Mon Sep 17 00:00:00 2001 From: jackwener Date: Wed, 14 Dec 2022 12:06:06 +0800 Subject: [PATCH] optimizer: avoid every rule must recursive children in optimizer --- datafusion/optimizer/src/eliminate_limit.rs | 85 ++++++++------ datafusion/optimizer/src/optimizer.rs | 121 +++++++++++++++++++- 2 files changed, 169 insertions(+), 37 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_limit.rs b/datafusion/optimizer/src/eliminate_limit.rs index 45844e120426..91719b3a80ff 100644 --- a/datafusion/optimizer/src/eliminate_limit.rs +++ b/datafusion/optimizer/src/eliminate_limit.rs @@ -20,7 +20,9 @@ //! on a plan with an empty relation. //! This rule also removes OFFSET 0 from the [LogicalPlan] //! This saves time in planning and executing the query. -use crate::{utils, OptimizerConfig, OptimizerRule}; +use crate::optimizer::ApplyOrder; +use crate::optimizer::ApplyOrder::BottomUp; +use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan}; @@ -40,45 +42,49 @@ impl OptimizerRule for EliminateLimit { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + _optimizer_config: &mut OptimizerConfig, ) -> Result> { - if let LogicalPlan::Limit(limit) = plan { - match limit.fetch { - Some(fetch) => { - if fetch == 0 { - return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation { - produce_one_row: false, - schema: limit.input.schema().clone(), - }))); - } + let limit = match plan { + LogicalPlan::Limit(limit) => limit, + _ => return Ok(None), + }; + + match limit.fetch { + Some(fetch) => { + if fetch == 0 { + return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: limit.input.schema().clone(), + }))); } - None => { - if limit.skip == 0 { - let input = &*limit.input; - return Ok(Some(utils::optimize_children( - self, - input, - optimizer_config, - )?)); - } + } + None => { + if limit.skip == 0 { + let input = limit.input.as_ref(); + // input also can be Limit, so we should apply again. + return Ok(Some( + self.try_optimize(input, _optimizer_config)? + .unwrap_or_else(|| input.clone()), + )); } } } - Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)) + Ok(None) } fn name(&self) -> &str { "eliminate_limit" } + + fn apply_order(&self) -> Option { + Some(BottomUp) + } } #[cfg(test)] mod tests { use super::*; + use crate::optimizer::Optimizer; use crate::push_down_limit::PushDownLimit; use crate::test::*; use datafusion_common::Column; @@ -87,12 +93,18 @@ mod tests { logical_plan::{builder::LogicalPlanBuilder, JoinType}, sum, }; + use std::sync::Arc; fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { - let optimized_plan = EliminateLimit::new() - .try_optimize(plan, &mut OptimizerConfig::new()) - .unwrap() - .expect("failed to optimize plan"); + let optimizer = Optimizer::with_rules(vec![Arc::new(EliminateLimit::new())]); + let optimized_plan = optimizer + .optimize_recursively( + optimizer.rules.get(0).unwrap(), + plan, + &mut OptimizerConfig::new(), + )? + .unwrap_or_else(|| plan.clone()); + let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); assert_eq!(plan.schema(), optimized_plan.schema()); @@ -103,13 +115,14 @@ mod tests { plan: &LogicalPlan, expected: &str, ) -> Result<()> { - let optimized_plan = PushDownLimit::new() - .try_optimize(plan, &mut OptimizerConfig::new()) - .unwrap() - .expect("failed to optimize plan"); - let optimized_plan = EliminateLimit::new() - .try_optimize(&optimized_plan, &mut OptimizerConfig::new()) - .unwrap() + fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} + let mut config = OptimizerConfig::new().with_max_passes(1); + let optimizer = Optimizer::with_rules(vec![ + Arc::new(PushDownLimit::new()), + Arc::new(EliminateLimit::new()), + ]); + let optimized_plan = optimizer + .optimize(plan, &mut config, observe) .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 4e9eadc47e5b..9011c8530e1e 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -59,6 +59,13 @@ pub trait OptimizerRule { /// A human readable name for this optimizer rule fn name(&self) -> &str; + + /// How should the rule be applied by the optimizer? See comments on [`ApplyOrder`] for details. + /// + /// If a rule use default None, its should traverse recursively plan inside itself + fn apply_order(&self) -> Option { + None + } } /// Options to control the DataFusion Optimizer. @@ -147,6 +154,44 @@ pub struct Optimizer { pub rules: Vec>, } +/// If a rule is with `ApplyOrder`, it means the optimizer will derive to handle children instead of +/// recursively handling in rule. +/// We just need handle a subtree pattern itself. +/// +/// Notice: **sometime** result after optimize still can be optimized, we need apply again. +/// +/// Usage Example: Merge Limit (subtree pattern is: Limit-Limit) +/// ```rust +/// use datafusion_expr::{Limit, LogicalPlan, LogicalPlanBuilder}; +/// use datafusion_common::Result; +/// fn merge_limit(parent: &Limit, child: &Limit) -> LogicalPlan { +/// // just for run +/// return parent.input.as_ref().clone(); +/// } +/// fn try_optimize(plan: &LogicalPlan) -> Result> { +/// match plan { +/// LogicalPlan::Limit(limit) => match limit.input.as_ref() { +/// LogicalPlan::Limit(child_limit) => { +/// // merge limit ... +/// let optimized_plan = merge_limit(limit, child_limit); +/// // due to optimized_plan may be optimized again, +/// // for example: plan is Limit-Limit-Limit +/// Ok(Some( +/// try_optimize(&optimized_plan)? +/// .unwrap_or_else(|| optimized_plan.clone()), +/// )) +/// } +/// _ => Ok(None), +/// }, +/// _ => Ok(None), +/// } +/// } +/// ``` +pub enum ApplyOrder { + TopDown, + BottomUp, +} + impl Optimizer { /// Create a new optimizer using the recommended list of rules pub fn new(config: &OptimizerConfig) -> Self { @@ -213,7 +258,7 @@ impl Optimizer { log_plan(&format!("Optimizer input (pass {})", i), &new_plan); for rule in &self.rules { - let result = rule.try_optimize(&new_plan, optimizer_config); + let result = self.optimize_recursively(rule, &new_plan, optimizer_config); match result { Ok(Some(plan)) => { if !plan.schema().equivalent_names_and_types(new_plan.schema()) { @@ -274,6 +319,80 @@ impl Optimizer { debug!("Optimizer took {} ms", start_time.elapsed().as_millis()); Ok(new_plan) } + + fn optimize_node( + &self, + rule: &Arc, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result> { + // TODO: future feature: We can do Batch optimize + rule.try_optimize(plan, optimizer_config) + } + + fn optimize_inputs( + &self, + rule: &Arc, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result> { + let inputs = plan.inputs(); + let result = inputs + .iter() + .map(|sub_plan| self.optimize_recursively(rule, sub_plan, optimizer_config)) + .collect::>>()?; + if result.is_empty() || result.iter().all(|o| o.is_none()) { + return Ok(None); + } + + let new_inputs = result + .into_iter() + .enumerate() + .map(|(i, o)| match o { + Some(plan) => plan, + None => (*(inputs.get(i).unwrap())).clone(), + }) + .collect::>(); + + Ok(Some(plan.with_new_inputs(new_inputs.as_slice())?)) + } + + /// Use a rule to optimize the whole plan. + /// If the rule with `ApplyOrder`, we don't need to recursively handle children in rule. + pub fn optimize_recursively( + &self, + rule: &Arc, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result> { + match rule.apply_order() { + Some(order) => match order { + ApplyOrder::TopDown => { + let optimize_self_opt = + self.optimize_node(rule, plan, optimizer_config)?; + let optimize_inputs_opt = match &optimize_self_opt { + Some(optimized_plan) => { + self.optimize_inputs(rule, optimized_plan, optimizer_config)? + } + _ => self.optimize_inputs(rule, plan, optimizer_config)?, + }; + Ok(optimize_inputs_opt.or(optimize_self_opt)) + } + ApplyOrder::BottomUp => { + let optimize_inputs_opt = + self.optimize_inputs(rule, plan, optimizer_config)?; + let optimize_self_opt = match &optimize_inputs_opt { + Some(optimized_plan) => { + self.optimize_node(rule, optimized_plan, optimizer_config)? + } + _ => self.optimize_node(rule, plan, optimizer_config)?, + }; + Ok(optimize_self_opt.or(optimize_inputs_opt)) + } + }, + _ => rule.try_optimize(plan, optimizer_config), + } + } } /// Log the plan in debug/tracing mode after some part of the optimizer runs