Skip to content

Commit

Permalink
avoid every rule must recursive children in optimizer
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener committed Dec 15, 2022
1 parent da0de9d commit e4fb267
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 33 deletions.
76 changes: 44 additions & 32 deletions datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
//! on a plan with an empty relation.
//! This rule also removes OFFSET 0 from the [LogicalPlan]
//! This saves time in planning and executing the query.
use crate::{utils, OptimizerConfig, OptimizerRule};
use crate::optimizer::ApplyOrder;
use crate::optimizer::ApplyOrder::BottomUp;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan};

Expand Down Expand Up @@ -50,45 +52,49 @@ impl OptimizerRule for EliminateLimit {
fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
_optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
if let LogicalPlan::Limit(limit) = plan {
match limit.fetch {
Some(fetch) => {
if fetch == 0 {
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: limit.input.schema().clone(),
})));
}
let limit = match plan {
LogicalPlan::Limit(limit) => limit,
_ => return Ok(None),
};

match limit.fetch {
Some(fetch) => {
if fetch == 0 {
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: limit.input.schema().clone(),
})));
}
None => {
if limit.skip == 0 {
let input = &*limit.input;
return Ok(Some(utils::optimize_children(
self,
input,
optimizer_config,
)?));
}
}
None => {
if limit.skip == 0 {
let input = limit.input.as_ref();
// input also can be Limit, so we should apply again.
return Ok(Some(
self.try_optimize(input, _optimizer_config)?
.unwrap_or_else(|| input.clone()),
));
}
}
}
Ok(Some(utils::optimize_children(
self,
plan,
optimizer_config,
)?))
Ok(None)
}

fn name(&self) -> &str {
"eliminate_limit"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(BottomUp)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::optimizer::Optimizer;
use crate::push_down_limit::PushDownLimit;
use crate::test::*;
use datafusion_common::Column;
Expand All @@ -97,11 +103,15 @@ mod tests {
logical_plan::{builder::LogicalPlanBuilder, JoinType},
sum,
};
use std::sync::Arc;

fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
let optimized_plan = EliminateLimit::new()
.optimize(plan, &mut OptimizerConfig::new())
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
let optimizer = Optimizer::with_rules(vec![Arc::new(EliminateLimit::new())]);
let optimized_plan = optimizer
.optimize(plan, &mut OptimizerConfig::new(), observe)
.expect("failed to optimize plan");

let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
assert_eq!(plan.schema(), optimized_plan.schema());
Expand All @@ -112,11 +122,13 @@ mod tests {
plan: &LogicalPlan,
expected: &str,
) -> Result<()> {
let optimized_plan = PushDownLimit::new()
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let optimized_plan = EliminateLimit::new()
.optimize(&optimized_plan, &mut OptimizerConfig::new())
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
let optimizer = Optimizer::with_rules(vec![
Arc::new(EliminateLimit::new()),
Arc::new(PushDownLimit::new()),
]);
let optimized_plan = optimizer
.optimize(plan, &mut OptimizerConfig::new(), observe)
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
83 changes: 82 additions & 1 deletion datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ pub trait OptimizerRule {

/// A human readable name for this optimizer rule
fn name(&self) -> &str;

/// If a rule use default None, its should traverse recursively plan inside itself
fn apply_order(&self) -> Option<ApplyOrder> {
None
}
}

/// Options to control the DataFusion Optimizer.
Expand Down Expand Up @@ -157,6 +162,11 @@ pub struct Optimizer {
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}

pub enum ApplyOrder {
TopDown,
BottomUp,
}

impl Optimizer {
/// Create a new optimizer using the recommended list of rules
pub fn new(config: &OptimizerConfig) -> Self {
Expand Down Expand Up @@ -223,7 +233,7 @@ impl Optimizer {
log_plan(&format!("Optimizer input (pass {})", i), &new_plan);

for rule in &self.rules {
let result = rule.try_optimize(&new_plan, optimizer_config);
let result = self.optimize_recursively(rule, &new_plan, optimizer_config);
match result {
Ok(Some(plan)) => {
if !plan.schema().equivalent_names_and_types(new_plan.schema()) {
Expand Down Expand Up @@ -284,6 +294,77 @@ impl Optimizer {
debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}

fn optimize_node(
&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
rule.try_optimize(plan, optimizer_config)
}

fn optimize_inputs(
&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let inputs = plan.inputs();
let result = inputs
.iter()
.map(|sub_plan| self.optimize_recursively(rule, sub_plan, optimizer_config))
.collect::<Result<Vec<_>>>()?;
if result.is_empty() || result.iter().all(|o| o.is_none()) {
return Ok(None);
}

let new_inputs = result
.into_iter()
.enumerate()
.map(|(i, o)| match o {
Some(plan) => plan,
None => (*(inputs.get(i).unwrap())).clone(),
})
.collect::<Vec<_>>();

Ok(Some(plan.with_new_inputs(new_inputs.as_slice())?))
}

pub fn optimize_recursively(
&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match rule.apply_order() {
Some(order) => match order {
ApplyOrder::TopDown => {
let optimize_self_opt =
self.optimize_node(rule, plan, optimizer_config)?;
let optimize_inputs_opt = match &optimize_self_opt {
Some(optimized_plan) => {
self.optimize_inputs(rule, optimized_plan, optimizer_config)?
}
_ => self.optimize_inputs(rule, plan, optimizer_config)?,
};
Ok(optimize_inputs_opt.or(optimize_self_opt))
}
ApplyOrder::BottomUp => {
let optimize_inputs_opt =
self.optimize_inputs(rule, plan, optimizer_config)?;
let optimize_self_opt = match &optimize_inputs_opt {
Some(optimized_plan) => {
self.optimize_node(rule, optimized_plan, optimizer_config)?
}
_ => self.optimize_node(rule, plan, optimizer_config)?,
};
Ok(optimize_self_opt.or(optimize_inputs_opt))
}
},
_ => rule.try_optimize(plan, optimizer_config),
}
}
}

/// Log the plan in debug/tracing mode after some part of the optimizer runs
Expand Down

0 comments on commit e4fb267

Please sign in to comment.