Skip to content

Commit

Permalink
refactor EliminateDuplicatedExpr optimizer pass to avoid clone (#10218
Browse files Browse the repository at this point in the history
)

* refactor eliminate duplicated expr to avoid clone

adding dep

* fix bugs

* change lock

* format toml

* refactor
  • Loading branch information
Lordworms authored Apr 26, 2024
1 parent 9c8873a commit ff5eb8f
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 58 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion/optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
hashbrown = { version = "0.14", features = ["raw"] }
indexmap = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
regex-syntax = "0.8.0"

[dev-dependencies]
ctor = { workspace = true }
datafusion-sql = { workspace = true }
Expand Down
140 changes: 83 additions & 57 deletions datafusion/optimizer/src/eliminate_duplicated_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::expr::Sort as ExprSort;
use datafusion_common::tree_node::Transformed;
use datafusion_common::{internal_err, Result};
use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::{Aggregate, Expr, Sort};
use hashbrown::HashSet;

use indexmap::IndexSet;
use std::hash::{Hash, Hasher};
/// Optimization rule that eliminate duplicated expr.
#[derive(Default)]
pub struct EliminateDuplicatedExpr;
Expand All @@ -35,78 +35,104 @@ impl EliminateDuplicatedExpr {
Self {}
}
}

// use this structure to avoid initial clone
#[derive(Eq, Clone, Debug)]
struct SortExprWrapper {
expr: Expr,
}
impl PartialEq for SortExprWrapper {
fn eq(&self, other: &Self) -> bool {
match (&self.expr, &other.expr) {
(Expr::Sort(own_sort), Expr::Sort(other_sort)) => {
own_sort.expr == other_sort.expr
}
_ => self.expr == other.expr,
}
}
}
impl Hash for SortExprWrapper {
fn hash<H: Hasher>(&self, state: &mut H) {
match &self.expr {
Expr::Sort(sort) => {
sort.expr.hash(state);
}
_ => {
self.expr.hash(state);
}
}
}
}
impl OptimizerRule for EliminateDuplicatedExpr {
fn try_optimize(
&self,
plan: &LogicalPlan,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
internal_err!("Should have called EliminateDuplicatedExpr::rewrite")
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}

fn supports_rewrite(&self) -> bool {
true
}

fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Sort(sort) => {
let normalized_sort_keys = sort
let len = sort.expr.len();
let unique_exprs: Vec<_> = sort
.expr
.iter()
.map(|e| match e {
Expr::Sort(ExprSort { expr, .. }) => {
Expr::Sort(ExprSort::new(expr.clone(), true, false))
}
_ => e.clone(),
})
.collect::<Vec<_>>();
.into_iter()
.map(|e| SortExprWrapper { expr: e })
.collect::<IndexSet<_>>()
.into_iter()
.map(|wrapper| wrapper.expr)
.collect();

// dedup sort.expr and keep order
let mut dedup_expr = Vec::new();
let mut dedup_set = HashSet::new();
sort.expr.iter().zip(normalized_sort_keys.iter()).for_each(
|(expr, normalized_expr)| {
if !dedup_set.contains(normalized_expr) {
dedup_expr.push(expr);
dedup_set.insert(normalized_expr);
}
},
);
if dedup_expr.len() == sort.expr.len() {
Ok(None)
let transformed = if len != unique_exprs.len() {
Transformed::yes
} else {
Ok(Some(LogicalPlan::Sort(Sort {
expr: dedup_expr.into_iter().cloned().collect::<Vec<_>>(),
input: sort.input.clone(),
fetch: sort.fetch,
})))
}
Transformed::no
};

Ok(transformed(LogicalPlan::Sort(Sort {
expr: unique_exprs,
input: sort.input,
fetch: sort.fetch,
})))
}
LogicalPlan::Aggregate(agg) => {
// dedup agg.groupby and keep order
let mut dedup_expr = Vec::new();
let mut dedup_set = HashSet::new();
agg.group_expr.iter().for_each(|expr| {
if !dedup_set.contains(expr) {
dedup_expr.push(expr.clone());
dedup_set.insert(expr);
}
});
if dedup_expr.len() == agg.group_expr.len() {
Ok(None)
let len = agg.group_expr.len();

let unique_exprs: Vec<Expr> = agg
.group_expr
.into_iter()
.collect::<IndexSet<_>>()
.into_iter()
.collect();

let transformed = if len != unique_exprs.len() {
Transformed::yes
} else {
Ok(Some(LogicalPlan::Aggregate(Aggregate::try_new(
agg.input.clone(),
dedup_expr,
agg.aggr_expr.clone(),
)?)))
}
Transformed::no
};

Aggregate::try_new(agg.input, unique_exprs, agg.aggr_expr)
.map(|f| transformed(LogicalPlan::Aggregate(f)))
}
_ => Ok(None),
_ => Ok(Transformed::no(plan)),
}
}

fn name(&self) -> &str {
"eliminate_duplicated_expr"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}
}

#[cfg(test)]
Expand Down

0 comments on commit ff5eb8f

Please sign in to comment.