Skip to content

Commit

Permalink
avoid every rule must recursive children in optimizer
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener committed Dec 14, 2022
1 parent da0de9d commit 8156c09
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 44 deletions.
10 changes: 5 additions & 5 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ impl OptimizerRule for MyRule {
"my_rule"
}

fn optimize(
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
) -> Result<Option<LogicalPlan>> {
// recurse down and optimize children first
let plan = utils::optimize_children(self, plan, _config)?;

Expand All @@ -86,12 +86,12 @@ impl OptimizerRule for MyRule {
let mut expr_rewriter = MyExprRewriter {};
let predicate = filter.predicate().clone();
let predicate = predicate.rewrite(&mut expr_rewriter)?;
Ok(LogicalPlan::Filter(Filter::try_new(
Ok(Some(LogicalPlan::Filter(Filter::try_new(
predicate,
filter.input().clone(),
)?))
)?)))
}
_ => Ok(plan.clone()),
_ => Ok(Some(plan.clone())),
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/tests/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,11 @@ impl QueryPlanner for TopKQueryPlanner {
struct TopKOptimizerRule {}
impl OptimizerRule for TopKOptimizerRule {
// Example rewrite pass to insert a user defined LogicalPlanNode
fn optimize(
fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
) -> Result<Option<LogicalPlan>> {
// Note: this code simply looks for the pattern of a Limit followed by a
// Sort and replaces it by a TopK node. It does not handle many
// edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
Expand All @@ -304,20 +304,20 @@ impl OptimizerRule for TopKOptimizerRule {
{
if expr.len() == 1 {
// we found a sort with a single sort expr, replace with a a TopK
return Ok(LogicalPlan::Extension(Extension {
return Ok(Some(LogicalPlan::Extension(Extension {
node: Arc::new(TopKPlanNode {
k: *fetch,
input: self.optimize(input.as_ref(), optimizer_config)?,
expr: expr[0].clone(),
}),
}));
})));
}
}
}

// If we didn't find the Limit/Sort combination, recurse as
// normal and build the result.
optimize_children(self, plan, optimizer_config)
Ok(Some(optimize_children(self, plan, optimizer_config)?))
}

fn name(&self) -> &str {
Expand Down
67 changes: 39 additions & 28 deletions datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
//! on a plan with an empty relation.
//! This rule also removes OFFSET 0 from the [LogicalPlan]
//! This saves time in planning and executing the query.
use crate::{utils, OptimizerConfig, OptimizerRule};
use crate::optimizer::ApplyOrder;
use crate::optimizer::ApplyOrder::BottomUp;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan};

Expand Down Expand Up @@ -50,45 +52,49 @@ impl OptimizerRule for EliminateLimit {
fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
_optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
if let LogicalPlan::Limit(limit) = plan {
match limit.fetch {
Some(fetch) => {
if fetch == 0 {
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: limit.input.schema().clone(),
})));
}
let limit = match plan {
LogicalPlan::Limit(limit) => limit,
_ => return Ok(None),
};

match limit.fetch {
Some(fetch) => {
if fetch == 0 {
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: limit.input.schema().clone(),
})));
}
None => {
if limit.skip == 0 {
let input = &*limit.input;
return Ok(Some(utils::optimize_children(
self,
input,
optimizer_config,
)?));
}
}
None => {
if limit.skip == 0 {
let input = limit.input.as_ref();
// input also can be Limit, so we should apply again.
return Ok(Some(
self.try_optimize(input, _optimizer_config)?
.unwrap_or_else(|| input.clone()),
));
}
}
}
Ok(Some(utils::optimize_children(
self,
plan,
optimizer_config,
)?))
Ok(None)
}

fn name(&self) -> &str {
"eliminate_limit"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(BottomUp)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::optimizer::Optimizer;
use crate::push_down_limit::PushDownLimit;
use crate::test::*;
use datafusion_common::Column;
Expand All @@ -97,11 +103,14 @@ mod tests {
logical_plan::{builder::LogicalPlanBuilder, JoinType},
sum,
};
use std::sync::Arc;

fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
let optimized_plan = EliminateLimit::new()
.optimize(plan, &mut OptimizerConfig::new())
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");

let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
assert_eq!(plan.schema(), optimized_plan.schema());
Expand All @@ -113,10 +122,12 @@ mod tests {
expected: &str,
) -> Result<()> {
let optimized_plan = PushDownLimit::new()
.optimize(plan, &mut OptimizerConfig::new())
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let optimized_plan = EliminateLimit::new()
.optimize(&optimized_plan, &mut OptimizerConfig::new())
.try_optimize(&optimized_plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
100 changes: 94 additions & 6 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ pub trait OptimizerRule {

/// A human readable name for this optimizer rule
fn name(&self) -> &str;

/// If a rule use default None, its should traverse recursively plan inside itself
fn apply_order(&self) -> Option<ApplyOrder> {
None
}
}

/// Options to control the DataFusion Optimizer.
Expand Down Expand Up @@ -157,6 +162,11 @@ pub struct Optimizer {
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}

pub enum ApplyOrder {
TopDown,
BottomUp,
}

impl Optimizer {
/// Create a new optimizer using the recommended list of rules
pub fn new(config: &OptimizerConfig) -> Self {
Expand Down Expand Up @@ -223,7 +233,7 @@ impl Optimizer {
log_plan(&format!("Optimizer input (pass {})", i), &new_plan);

for rule in &self.rules {
let result = rule.try_optimize(&new_plan, optimizer_config);
let result = self.optimize_recursively(rule, &new_plan, optimizer_config);
match result {
Ok(Some(plan)) => {
if !plan.schema().equivalent_names_and_types(new_plan.schema()) {
Expand Down Expand Up @@ -284,6 +294,84 @@ impl Optimizer {
debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}

fn optimize_node(
&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// TODO: future feature: We can do Batch optimize
// for rule in self.rules {
// let result = rule.optimize(&plan, optimizer_config);
// plan = result?;
// self.stats.count_rule(rule);
// }
// }
rule.try_optimize(plan, optimizer_config)
}

fn optimize_inputs(
&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let inputs = plan.inputs();
let result = inputs
.iter()
.map(|sub_plan| self.optimize_recursively(rule, sub_plan, optimizer_config))
.collect::<Result<Vec<_>>>()?;
if result.is_empty() || result.iter().all(|o| o.is_none()) {
return Ok(None);
}

let new_inputs = result
.into_iter()
.enumerate()
.map(|(i, o)| match o {
Some(plan) => plan,
None => (*(inputs.get(i).unwrap())).clone(),
})
.collect::<Vec<_>>();

Ok(Some(plan.with_new_inputs(new_inputs.as_slice())?))
}

pub fn optimize_recursively(
&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match rule.apply_order() {
Some(order) => match order {
ApplyOrder::TopDown => {
let optimize_self_opt =
self.optimize_node(rule, plan, optimizer_config)?;
let optimize_inputs_opt = match &optimize_self_opt {
Some(optimized_plan) => {
self.optimize_inputs(rule, optimized_plan, optimizer_config)?
}
_ => self.optimize_inputs(rule, plan, optimizer_config)?,
};
Ok(optimize_inputs_opt.or(optimize_self_opt))
}
ApplyOrder::BottomUp => {
let optimize_inputs_opt =
self.optimize_inputs(rule, plan, optimizer_config)?;
let optimize_self_opt = match &optimize_inputs_opt {
Some(optimized_plan) => {
self.optimize_node(rule, optimized_plan, optimizer_config)?
}
_ => self.optimize_node(rule, plan, optimizer_config)?,
};
Ok(optimize_self_opt.or(optimize_inputs_opt))
}
},
_ => rule.try_optimize(plan, optimizer_config),
}
}
}

/// Log the plan in debug/tracing mode after some part of the optimizer runs
Expand Down Expand Up @@ -407,11 +495,11 @@ mod tests {
struct BadRule {}

impl OptimizerRule for BadRule {
fn optimize(
fn try_optimize(
&self,
_plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
) -> datafusion_common::Result<Option<LogicalPlan>> {
Err(DataFusionError::Plan("rule failed".to_string()))
}

Expand All @@ -424,13 +512,13 @@ mod tests {
struct GetTableScanRule {}

impl OptimizerRule for GetTableScanRule {
fn optimize(
fn try_optimize(
&self,
_plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
) -> datafusion_common::Result<Option<LogicalPlan>> {
let table_scan = test_table_scan()?;
LogicalPlanBuilder::from(table_scan).build()
Ok(Some(LogicalPlanBuilder::from(table_scan).build()?))
}

fn name(&self) -> &str {
Expand Down

0 comments on commit 8156c09

Please sign in to comment.