Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimizer: avoid every rule must recursive children in optimizer #4618

Merged
merged 2 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 49 additions & 36 deletions datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
//! on a plan with an empty relation.
//! This rule also removes OFFSET 0 from the [LogicalPlan]
//! This saves time in planning and executing the query.
use crate::{utils, OptimizerConfig, OptimizerRule};
use crate::optimizer::ApplyOrder;
use crate::optimizer::ApplyOrder::BottomUp;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan};

Expand All @@ -40,45 +42,49 @@ impl OptimizerRule for EliminateLimit {
fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
_optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
if let LogicalPlan::Limit(limit) = plan {
match limit.fetch {
Some(fetch) => {
if fetch == 0 {
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: limit.input.schema().clone(),
})));
}
let limit = match plan {
LogicalPlan::Limit(limit) => limit,
_ => return Ok(None),
};

match limit.fetch {
Some(fetch) => {
if fetch == 0 {
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: limit.input.schema().clone(),
})));
}
None => {
if limit.skip == 0 {
let input = &*limit.input;
return Ok(Some(utils::optimize_children(
self,
input,
optimizer_config,
)?));
}
}
None => {
if limit.skip == 0 {
let input = limit.input.as_ref();
// input also can be Limit, so we should apply again.
return Ok(Some(
self.try_optimize(input, _optimizer_config)?
.unwrap_or_else(|| input.clone()),
));
}
}
}
Ok(Some(utils::optimize_children(
self,
plan,
optimizer_config,
)?))
Ok(None)
}

fn name(&self) -> &str {
"eliminate_limit"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(BottomUp)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::optimizer::Optimizer;
use crate::push_down_limit::PushDownLimit;
use crate::test::*;
use datafusion_common::Column;
Expand All @@ -87,12 +93,18 @@ mod tests {
logical_plan::{builder::LogicalPlanBuilder, JoinType},
sum,
};
use std::sync::Arc;

fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
let optimized_plan = EliminateLimit::new()
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let optimizer = Optimizer::with_rules(vec![Arc::new(EliminateLimit::new())]);
let optimized_plan = optimizer
.optimize_recursively(
optimizer.rules.get(0).unwrap(),
plan,
&mut OptimizerConfig::new(),
)?
.unwrap_or_else(|| plan.clone());

let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
assert_eq!(plan.schema(), optimized_plan.schema());
Expand All @@ -103,13 +115,14 @@ mod tests {
plan: &LogicalPlan,
expected: &str,
) -> Result<()> {
let optimized_plan = PushDownLimit::new()
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let optimized_plan = EliminateLimit::new()
.try_optimize(&optimized_plan, &mut OptimizerConfig::new())
.unwrap()
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
let mut config = OptimizerConfig::new().with_max_passes(1);
let optimizer = Optimizer::with_rules(vec![
Arc::new(PushDownLimit::new()),
Arc::new(EliminateLimit::new()),
]);
let optimized_plan = optimizer
.optimize(plan, &mut config, observe)
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
121 changes: 120 additions & 1 deletion datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ pub trait OptimizerRule {

/// A human readable name for this optimizer rule
fn name(&self) -> &str;

/// How should the rule be applied by the optimizer? See comments on [`ApplyOrder`] for details.
///
/// If a rule use default None, its should traverse recursively plan inside itself
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// If a rule use default None, its should traverse recursively plan inside itself
/// How should the rule be applied by the optimizer? See comments on [`ApplyOrder`] for details.
///
/// If a rule returns `None`, the default, its should traverse the plan recursively inside itself

fn apply_order(&self) -> Option<ApplyOrder> {
None
}
Comment on lines +66 to +69
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It' compatible with origin

}

/// Options to control the DataFusion Optimizer.
Expand Down Expand Up @@ -147,6 +154,44 @@ pub struct Optimizer {
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}

/// If a rule is with `ApplyOrder`, it means the optimizer will derive to handle children instead of
/// recursively handling in rule.
/// We just need handle a subtree pattern itself.
///
/// Notice: **sometime** result after optimize still can be optimized, we need apply again.
///
/// Usage Example: Merge Limit (subtree pattern is: Limit-Limit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you

/// ```rust
/// use datafusion_expr::{Limit, LogicalPlan, LogicalPlanBuilder};
/// use datafusion_common::Result;
/// fn merge_limit(parent: &Limit, child: &Limit) -> LogicalPlan {
/// // just for run
/// return parent.input.as_ref().clone();
/// }
/// fn try_optimize(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
/// match plan {
/// LogicalPlan::Limit(limit) => match limit.input.as_ref() {
/// LogicalPlan::Limit(child_limit) => {
/// // merge limit ...
/// let optimized_plan = merge_limit(limit, child_limit);
/// // due to optimized_plan may be optimized again,
/// // for example: plan is Limit-Limit-Limit
/// Ok(Some(
/// try_optimize(&optimized_plan)?
/// .unwrap_or_else(|| optimized_plan.clone()),
/// ))
/// }
/// _ => Ok(None),
/// },
/// _ => Ok(None),
/// }
/// }
/// ```
pub enum ApplyOrder {
TopDown,
BottomUp,
}

impl Optimizer {
/// Create a new optimizer using the recommended list of rules
pub fn new(config: &OptimizerConfig) -> Self {
Expand Down Expand Up @@ -213,7 +258,7 @@ impl Optimizer {
log_plan(&format!("Optimizer input (pass {})", i), &new_plan);

for rule in &self.rules {
let result = rule.try_optimize(&new_plan, optimizer_config);
let result = self.optimize_recursively(rule, &new_plan, optimizer_config);
match result {
Ok(Some(plan)) => {
if !plan.schema().equivalent_names_and_types(new_plan.schema()) {
Expand Down Expand Up @@ -274,6 +319,80 @@ impl Optimizer {
debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}

fn optimize_node(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What value does this function add? In other words, why not call rule.try_optimize directly at the callsite 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to add future TODO:
we can do batch optimize

for rule in rules:
    rule.try_optimize(rule)

&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// TODO: future feature: We can do Batch optimize
rule.try_optimize(plan, optimizer_config)
}

fn optimize_inputs(
&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let inputs = plan.inputs();
let result = inputs
.iter()
.map(|sub_plan| self.optimize_recursively(rule, sub_plan, optimizer_config))
.collect::<Result<Vec<_>>>()?;
if result.is_empty() || result.iter().all(|o| o.is_none()) {
return Ok(None);
}

let new_inputs = result
.into_iter()
.enumerate()
.map(|(i, o)| match o {
Some(plan) => plan,
None => (*(inputs.get(i).unwrap())).clone(),
})
.collect::<Vec<_>>();

Ok(Some(plan.with_new_inputs(new_inputs.as_slice())?))
}

/// Use a rule to optimize the whole plan.
/// If the rule with `ApplyOrder`, we don't need to recursively handle children in rule.
pub fn optimize_recursively(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add docstrings about what this API does?

Also, I wonder if it needs to be pub or if it could be private 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometime we need use it in UT😂

&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match rule.apply_order() {
Some(order) => match order {
ApplyOrder::TopDown => {
let optimize_self_opt =
self.optimize_node(rule, plan, optimizer_config)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a shot at rewriting this stuff to use Result::and_then, etc and I think how you have written it here is clearer 👍

let optimize_inputs_opt = match &optimize_self_opt {
Some(optimized_plan) => {
self.optimize_inputs(rule, optimized_plan, optimizer_config)?
}
_ => self.optimize_inputs(rule, plan, optimizer_config)?,
};
Ok(optimize_inputs_opt.or(optimize_self_opt))
}
ApplyOrder::BottomUp => {
let optimize_inputs_opt =
self.optimize_inputs(rule, plan, optimizer_config)?;
let optimize_self_opt = match &optimize_inputs_opt {
Some(optimized_plan) => {
self.optimize_node(rule, optimized_plan, optimizer_config)?
}
_ => self.optimize_node(rule, plan, optimizer_config)?,
};
Ok(optimize_self_opt.or(optimize_inputs_opt))
}
},
_ => rule.try_optimize(plan, optimizer_config),
}
}
}

/// Log the plan in debug/tracing mode after some part of the optimizer runs
Expand Down