Skip to content

Commit

Permalink
reuse code utils::optimize_children instead of redundant implementa…
Browse files Browse the repository at this point in the history
…tion. (#4119)
  • Loading branch information
jackwener authored Nov 7, 2022
1 parent 532b4a7 commit 7f0778c
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 212 deletions.
11 changes: 2 additions & 9 deletions datafusion/optimizer/src/eliminate_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
//! Optimizer rule to replace `where false` on a plan with an empty relation.
//! This saves time in planning and executing the query.
//! Note that this rule should be applied after simplify expressions optimizer rule.
use crate::{OptimizerConfig, OptimizerRule};
use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{
logical_plan::{EmptyRelation, LogicalPlan},
utils::from_plan,
Expr,
};

Expand Down Expand Up @@ -61,13 +60,7 @@ impl OptimizerRule for EliminateFilter {
})),
None => {
// Apply the optimization to all inputs of the plan
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|plan| self.optimize(plan, _optimizer_config))
.collect::<Result<Vec<_>>>()?;

from_plan(plan, &plan.expressions(), &new_inputs)
utils::optimize_children(self, plan, _optimizer_config)
}
}
}
Expand Down
80 changes: 34 additions & 46 deletions datafusion/optimizer/src/inline_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
//! Optimizer rule to replace TableScan references
//! such as DataFrames and Views and inlines the LogicalPlan
//! to support further optimization
use crate::{OptimizerConfig, OptimizerRule};
use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::{
logical_plan::LogicalPlan, utils::from_plan, Expr, LogicalPlanBuilder, TableScan,
};
use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder, TableScan};

/// Optimization rule that inlines TableScan that provide a [LogicalPlan]
/// ([DataFrame] / [ViewTable])
Expand All @@ -36,54 +34,44 @@ impl InlineTableScan {
}
}

/// Inline
fn inline_table_scan(plan: &LogicalPlan) -> Result<LogicalPlan> {
match plan {
// Match only on scans without filter / projection / fetch
// Views and DataFrames won't have those added
// during the early stage of planning
LogicalPlan::TableScan(TableScan {
source,
table_name,
filters,
fetch: None,
..
}) if filters.is_empty() => {
if let Some(sub_plan) = source.get_logical_plan() {
// Recursively apply optimization
let plan = inline_table_scan(sub_plan)?;
let plan = LogicalPlanBuilder::from(plan).project_with_alias(
vec![Expr::Wildcard],
Some(table_name.to_string()),
)?;
plan.build()
} else {
// No plan available, return with table scan as is
Ok(plan.clone())
}
}

// Rest: Recurse
_ => {
// apply the optimization to all inputs of the plan
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|plan| inline_table_scan(plan))
.collect::<Result<Vec<_>>>()?;

from_plan(plan, &plan.expressions(), &new_inputs)
}
}
}

impl OptimizerRule for InlineTableScan {
fn optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
inline_table_scan(plan)
match plan {
// Match only on scans without filter / projection / fetch
// Views and DataFrames won't have those added
// during the early stage of planning
LogicalPlan::TableScan(TableScan {
source,
table_name,
filters,
fetch: None,
..
}) if filters.is_empty() => {
if let Some(sub_plan) = source.get_logical_plan() {
// Recursively apply optimization
let plan =
utils::optimize_children(self, sub_plan, _optimizer_config)?;
let plan = LogicalPlanBuilder::from(plan).project_with_alias(
vec![Expr::Wildcard],
Some(table_name.to_string()),
)?;
plan.build()
} else {
// No plan available, return with table scan as is
Ok(plan.clone())
}
}

// Rest: Recurse
_ => {
// apply the optimization to all inputs of the plan
utils::optimize_children(self, plan, _optimizer_config)
}
}
}

fn name(&self) -> &str {
Expand Down
45 changes: 16 additions & 29 deletions datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use crate::{OptimizerConfig, OptimizerRule};
use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::expr::BinaryExpr;
use datafusion_expr::logical_plan::Filter;
use datafusion_expr::utils::from_plan;
use datafusion_expr::{Expr, LogicalPlan, Operator};
use std::sync::Arc;

Expand Down Expand Up @@ -122,38 +121,27 @@ impl RewriteDisjunctivePredicate {
pub fn new() -> Self {
Self::default()
}
fn rewrite_disjunctive_predicate(plan: &LogicalPlan) -> Result<LogicalPlan> {
}

impl OptimizerRule for RewriteDisjunctivePredicate {
fn optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = predicate(filter.predicate())?;
let rewritten_predicate = rewrite_predicate(predicate);
let rewritten_expr = normalize_predicate(rewritten_predicate);
Ok(LogicalPlan::Filter(Filter::try_new(
rewritten_expr,
Arc::new(Self::rewrite_disjunctive_predicate(filter.input())?),
Arc::new(Self::optimize(self, filter.input(), _optimizer_config)?),
)?))
}
_ => {
let expr = plan.expressions();
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|input| Self::rewrite_disjunctive_predicate(input))
.collect::<Result<Vec<_>>>()?;
from_plan(plan, &expr, &new_inputs)
}
_ => utils::optimize_children(self, plan, _optimizer_config),
}
}
}

impl OptimizerRule for RewriteDisjunctivePredicate {
fn optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Self::rewrite_disjunctive_predicate(plan)
}

fn name(&self) -> &str {
"rewrite_disjunctive_predicate"
Expand Down Expand Up @@ -362,7 +350,6 @@ fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
}

#[cfg(test)]

mod tests {
use crate::rewrite_disjunctive_predicate::{
normalize_predicate, predicate, rewrite_predicate, Predicate,
Expand Down Expand Up @@ -392,7 +379,7 @@ mod tests {
},
Predicate::Other {
expr: Box::new(gt_expr.clone())
}
},
]
},
Predicate::And {
Expand All @@ -402,9 +389,9 @@ mod tests {
},
Predicate::Other {
expr: Box::new(lt_expr.clone())
}
},
]
}
},
]
}
);
Expand All @@ -423,9 +410,9 @@ mod tests {
},
Predicate::Other {
expr: Box::new(lt_expr.clone())
}
},
]
}
},
]
}
);
Expand Down
Loading

0 comments on commit 7f0778c

Please sign in to comment.