Skip to content

Commit

Permalink
avoid every rule must recursive children in optimizer
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener committed Dec 16, 2022
1 parent 8504cb2 commit 655a6c3
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 37 deletions.
85 changes: 49 additions & 36 deletions datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
//! on a plan with an empty relation.
//! This rule also removes OFFSET 0 from the [LogicalPlan]
//! This saves time in planning and executing the query.
use crate::{utils, OptimizerConfig, OptimizerRule};
use crate::optimizer::ApplyOrder;
use crate::optimizer::ApplyOrder::BottomUp;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan};

Expand All @@ -40,45 +42,49 @@ impl OptimizerRule for EliminateLimit {
fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
_optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
if let LogicalPlan::Limit(limit) = plan {
match limit.fetch {
Some(fetch) => {
if fetch == 0 {
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: limit.input.schema().clone(),
})));
}
let limit = match plan {
LogicalPlan::Limit(limit) => limit,
_ => return Ok(None),
};

match limit.fetch {
Some(fetch) => {
if fetch == 0 {
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: limit.input.schema().clone(),
})));
}
None => {
if limit.skip == 0 {
let input = &*limit.input;
return Ok(Some(utils::optimize_children(
self,
input,
optimizer_config,
)?));
}
}
None => {
if limit.skip == 0 {
let input = limit.input.as_ref();
// input also can be Limit, so we should apply again.
return Ok(Some(
self.try_optimize(input, _optimizer_config)?
.unwrap_or_else(|| input.clone()),
));
}
}
}
Ok(Some(utils::optimize_children(
self,
plan,
optimizer_config,
)?))
Ok(None)
}

fn name(&self) -> &str {
"eliminate_limit"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(BottomUp)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::optimizer::Optimizer;
use crate::push_down_limit::PushDownLimit;
use crate::test::*;
use datafusion_common::Column;
Expand All @@ -87,12 +93,18 @@ mod tests {
logical_plan::{builder::LogicalPlanBuilder, JoinType},
sum,
};
use std::sync::Arc;

fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
let optimized_plan = EliminateLimit::new()
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let optimizer = Optimizer::with_rules(vec![Arc::new(EliminateLimit::new())]);
let optimized_plan = optimizer
.optimize_recursively(
optimizer.rules.get(0).unwrap(),
plan,
&mut OptimizerConfig::new(),
)?
.unwrap_or_else(|| plan.clone());

let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
assert_eq!(plan.schema(), optimized_plan.schema());
Expand All @@ -103,13 +115,14 @@ mod tests {
plan: &LogicalPlan,
expected: &str,
) -> Result<()> {
let optimized_plan = PushDownLimit::new()
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let optimized_plan = EliminateLimit::new()
.try_optimize(&optimized_plan, &mut OptimizerConfig::new())
.unwrap()
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
let mut config = OptimizerConfig::new().with_max_passes(1);
let optimizer = Optimizer::with_rules(vec![
Arc::new(PushDownLimit::new()),
Arc::new(EliminateLimit::new()),
]);
let optimized_plan = optimizer
.optimize(plan, &mut config, observe)
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
121 changes: 120 additions & 1 deletion datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ pub trait OptimizerRule {

/// A human readable name for this optimizer rule
fn name(&self) -> &str;

/// How should the rule be applied by the optimizer? See comments on [`ApplyOrder`] for details.
///
/// If a rule use default None, its should traverse recursively plan inside itself
fn apply_order(&self) -> Option<ApplyOrder> {
None
}
}

/// Options to control the DataFusion Optimizer.
Expand Down Expand Up @@ -147,6 +154,44 @@ pub struct Optimizer {
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}

/// If a rule is with `ApplyOrder`, it means the optimizer will derive to handle children instead of
/// recursively handling in rule.
/// We just need handle a subtree pattern itself.
///
/// Notice: **sometime** result after optimize still can be optimized, we need apply again.
///
/// Usage Example: Merge Limit (subtree pattern is: Limit-Limit)
/// ```rust
/// use datafusion_expr::{Limit, LogicalPlan, LogicalPlanBuilder};
/// use datafusion_common::Result;
/// fn merge_limit(parent: &Limit, child: &Limit) -> LogicalPlan {
/// // just for run
/// return parent.input.as_ref().clone();
/// }
/// fn try_optimize(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
/// match plan {
/// LogicalPlan::Limit(limit) => match limit.input.as_ref() {
/// LogicalPlan::Limit(child_limit) => {
/// // merge limit ...
/// let optimized_plan = merge_limit(limit, child_limit);
/// // due to optimized_plan may be optimized again,
/// // for example: plan is Limit-Limit-Limit
/// Ok(Some(
/// try_optimize(&optimized_plan)?
/// .unwrap_or_else(|| optimized_plan.clone()),
/// ))
/// }
/// _ => Ok(None),
/// },
/// _ => Ok(None),
/// }
/// }
/// ```
pub enum ApplyOrder {
TopDown,
BottomUp,
}

impl Optimizer {
/// Create a new optimizer using the recommended list of rules
pub fn new(config: &OptimizerConfig) -> Self {
Expand Down Expand Up @@ -213,7 +258,7 @@ impl Optimizer {
log_plan(&format!("Optimizer input (pass {})", i), &new_plan);

for rule in &self.rules {
let result = rule.try_optimize(&new_plan, optimizer_config);
let result = self.optimize_recursively(rule, &new_plan, optimizer_config);
match result {
Ok(Some(plan)) => {
if !plan.schema().equivalent_names_and_types(new_plan.schema()) {
Expand Down Expand Up @@ -274,6 +319,80 @@ impl Optimizer {
debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}

fn optimize_node(
&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// TODO: future feature: We can do Batch optimize
rule.try_optimize(plan, optimizer_config)
}

fn optimize_inputs(
&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let inputs = plan.inputs();
let result = inputs
.iter()
.map(|sub_plan| self.optimize_recursively(rule, sub_plan, optimizer_config))
.collect::<Result<Vec<_>>>()?;
if result.is_empty() || result.iter().all(|o| o.is_none()) {
return Ok(None);
}

let new_inputs = result
.into_iter()
.enumerate()
.map(|(i, o)| match o {
Some(plan) => plan,
None => (*(inputs.get(i).unwrap())).clone(),
})
.collect::<Vec<_>>();

Ok(Some(plan.with_new_inputs(new_inputs.as_slice())?))
}

/// Use a rule to optimize the whole plan.
/// If the rule with `ApplyOrder`, we don't need to recursively handle children in rule.
pub fn optimize_recursively(
&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match rule.apply_order() {
Some(order) => match order {
ApplyOrder::TopDown => {
let optimize_self_opt =
self.optimize_node(rule, plan, optimizer_config)?;
let optimize_inputs_opt = match &optimize_self_opt {
Some(optimized_plan) => {
self.optimize_inputs(rule, optimized_plan, optimizer_config)?
}
_ => self.optimize_inputs(rule, plan, optimizer_config)?,
};
Ok(optimize_inputs_opt.or(optimize_self_opt))
}
ApplyOrder::BottomUp => {
let optimize_inputs_opt =
self.optimize_inputs(rule, plan, optimizer_config)?;
let optimize_self_opt = match &optimize_inputs_opt {
Some(optimized_plan) => {
self.optimize_node(rule, optimized_plan, optimizer_config)?
}
_ => self.optimize_node(rule, plan, optimizer_config)?,
};
Ok(optimize_self_opt.or(optimize_inputs_opt))
}
},
_ => rule.try_optimize(plan, optimizer_config),
}
}
}

/// Log the plan in debug/tracing mode after some part of the optimizer runs
Expand Down

0 comments on commit 655a6c3

Please sign in to comment.