From ff5eb8f4f25d337c179afcfdc8870b8397125c25 Mon Sep 17 00:00:00 2001 From: Lordworms <48054792+Lordworms@users.noreply.github.com> Date: Fri, 26 Apr 2024 06:45:05 -0500 Subject: [PATCH] refactor `EliminateDuplicatedExpr` optimizer pass to avoid clone (#10218) * refactor eliminate duplicated expr to avoid clone adding dep * fix bugs * change lock * format toml * refactor --- datafusion-cli/Cargo.lock | 1 + datafusion/optimizer/Cargo.toml | 2 +- .../src/eliminate_duplicated_expr.rs | 140 +++++++++++------- 3 files changed, 85 insertions(+), 58 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index ba3e68e4011f..5263b064ff9b 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1323,6 +1323,7 @@ dependencies = [ "datafusion-expr", "datafusion-physical-expr", "hashbrown 0.14.3", + "indexmap 2.2.6", "itertools", "log", "regex-syntax", diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index b1a6953501a6..45ece35c2388 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -47,10 +47,10 @@ datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } datafusion-physical-expr = { workspace = true } hashbrown = { version = "0.14", features = ["raw"] } +indexmap = { workspace = true } itertools = { workspace = true } log = { workspace = true } regex-syntax = "0.8.0" - [dev-dependencies] ctor = { workspace = true } datafusion-sql = { workspace = true } diff --git a/datafusion/optimizer/src/eliminate_duplicated_expr.rs b/datafusion/optimizer/src/eliminate_duplicated_expr.rs index ee44a328f8b3..3dbfc750e899 100644 --- a/datafusion/optimizer/src/eliminate_duplicated_expr.rs +++ b/datafusion/optimizer/src/eliminate_duplicated_expr.rs @@ -19,12 +19,12 @@ use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::Result; -use datafusion_expr::expr::Sort as ExprSort; +use datafusion_common::tree_node::Transformed; +use datafusion_common::{internal_err, Result}; use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::{Aggregate, Expr, Sort}; -use hashbrown::HashSet; - +use indexmap::IndexSet; +use std::hash::{Hash, Hasher}; /// Optimization rule that eliminate duplicated expr. #[derive(Default)] pub struct EliminateDuplicatedExpr; @@ -35,78 +35,104 @@ impl EliminateDuplicatedExpr { Self {} } } - +// use this structure to avoid initial clone +#[derive(Eq, Clone, Debug)] +struct SortExprWrapper { + expr: Expr, +} +impl PartialEq for SortExprWrapper { + fn eq(&self, other: &Self) -> bool { + match (&self.expr, &other.expr) { + (Expr::Sort(own_sort), Expr::Sort(other_sort)) => { + own_sort.expr == other_sort.expr + } + _ => self.expr == other.expr, + } + } +} +impl Hash for SortExprWrapper { + fn hash(&self, state: &mut H) { + match &self.expr { + Expr::Sort(sort) => { + sort.expr.hash(state); + } + _ => { + self.expr.hash(state); + } + } + } +} impl OptimizerRule for EliminateDuplicatedExpr { fn try_optimize( &self, - plan: &LogicalPlan, + _plan: &LogicalPlan, _config: &dyn OptimizerConfig, ) -> Result> { + internal_err!("Should have called EliminateDuplicatedExpr::rewrite") + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { match plan { LogicalPlan::Sort(sort) => { - let normalized_sort_keys = sort + let len = sort.expr.len(); + let unique_exprs: Vec<_> = sort .expr - .iter() - .map(|e| match e { - Expr::Sort(ExprSort { expr, .. }) => { - Expr::Sort(ExprSort::new(expr.clone(), true, false)) - } - _ => e.clone(), - }) - .collect::>(); + .into_iter() + .map(|e| SortExprWrapper { expr: e }) + .collect::>() + .into_iter() + .map(|wrapper| wrapper.expr) + .collect(); - // dedup sort.expr and keep order - let mut dedup_expr = Vec::new(); - let mut dedup_set = HashSet::new(); - sort.expr.iter().zip(normalized_sort_keys.iter()).for_each( - |(expr, normalized_expr)| { - if !dedup_set.contains(normalized_expr) { - dedup_expr.push(expr); - dedup_set.insert(normalized_expr); - } - }, - ); - if dedup_expr.len() == sort.expr.len() { - Ok(None) + let transformed = if len != unique_exprs.len() { + Transformed::yes } else { - Ok(Some(LogicalPlan::Sort(Sort { - expr: dedup_expr.into_iter().cloned().collect::>(), - input: sort.input.clone(), - fetch: sort.fetch, - }))) - } + Transformed::no + }; + + Ok(transformed(LogicalPlan::Sort(Sort { + expr: unique_exprs, + input: sort.input, + fetch: sort.fetch, + }))) } LogicalPlan::Aggregate(agg) => { - // dedup agg.groupby and keep order - let mut dedup_expr = Vec::new(); - let mut dedup_set = HashSet::new(); - agg.group_expr.iter().for_each(|expr| { - if !dedup_set.contains(expr) { - dedup_expr.push(expr.clone()); - dedup_set.insert(expr); - } - }); - if dedup_expr.len() == agg.group_expr.len() { - Ok(None) + let len = agg.group_expr.len(); + + let unique_exprs: Vec = agg + .group_expr + .into_iter() + .collect::>() + .into_iter() + .collect(); + + let transformed = if len != unique_exprs.len() { + Transformed::yes } else { - Ok(Some(LogicalPlan::Aggregate(Aggregate::try_new( - agg.input.clone(), - dedup_expr, - agg.aggr_expr.clone(), - )?))) - } + Transformed::no + }; + + Aggregate::try_new(agg.input, unique_exprs, agg.aggr_expr) + .map(|f| transformed(LogicalPlan::Aggregate(f))) } - _ => Ok(None), + _ => Ok(Transformed::no(plan)), } } - fn name(&self) -> &str { "eliminate_duplicated_expr" } - - fn apply_order(&self) -> Option { - Some(ApplyOrder::TopDown) - } } #[cfg(test)]