diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index cb9d461c5d8d..f7362e3c71fa 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -700,13 +700,12 @@ impl DefaultPhysicalPlanner { LogicalPlan::Distinct(Distinct {input}) => { // Convert distinct to groupby with no aggregations let group_expr = expand_wildcard(input.schema(), input)?; - let aggregate = LogicalPlan::Aggregate(Aggregate::try_new( + let aggregate = LogicalPlan::Aggregate(Aggregate::try_new_with_schema( input.clone(), group_expr, vec![], - input.schema().clone() - )? - ); + input.schema().clone() // input schema and aggregate schema are the same in this case + )?); Ok(self.create_initial_plan(&aggregate, session_state).await?) } LogicalPlan::Projection(Projection { input, expr, .. }) => { diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 3850871b0eae..522639984075 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -21,9 +21,7 @@ use crate::expr_rewriter::{ coerce_plan_expr_for_schema, normalize_col, normalize_cols, rewrite_sort_cols_by_aggs, }; use crate::type_coercion::binary::comparison_coercion; -use crate::utils::{ - columnize_expr, exprlist_to_fields, from_plan, grouping_set_to_exprlist, -}; +use crate::utils::{columnize_expr, exprlist_to_fields, from_plan}; use crate::{and, binary_expr, Operator}; use crate::{ logical_plan::{ @@ -700,20 +698,10 @@ impl LogicalPlanBuilder { ) -> Result { let group_expr = normalize_cols(group_expr, &self.plan)?; let aggr_expr = normalize_cols(aggr_expr, &self.plan)?; - - let grouping_expr: Vec = grouping_set_to_exprlist(group_expr.as_slice())?; - - let all_expr = grouping_expr.iter().chain(aggr_expr.iter()); - validate_unique_names("Aggregations", all_expr.clone())?; - let aggr_schema = DFSchema::new_with_metadata( - exprlist_to_fields(all_expr, &self.plan)?, - self.plan.schema().metadata().clone(), - )?; Ok(Self::from(LogicalPlan::Aggregate(Aggregate::try_new( Arc::new(self.plan.clone()), group_expr, aggr_expr, - DFSchemaRef::new(aggr_schema), )?))) } @@ -847,7 +835,7 @@ pub fn build_join_schema( } /// Errors if one or more expressions have equal names. -fn validate_unique_names<'a>( +pub(crate) fn validate_unique_names<'a>( node_name: &str, expressions: impl IntoIterator, ) -> Result<()> { diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 896f79a20e93..7f5c95ab1436 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -15,9 +15,12 @@ // specific language governing permissions and limitations // under the License. +use crate::logical_plan::builder::validate_unique_names; use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; use crate::logical_plan::extension::UserDefinedLogicalNode; -use crate::utils::{exprlist_to_fields, grouping_set_expr_count}; +use crate::utils::{ + exprlist_to_fields, grouping_set_expr_count, grouping_set_to_exprlist, +}; use crate::{Expr, TableProviderFilterPushDown, TableSource}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::{plan_err, Column, DFSchema, DFSchemaRef, DataFusionError}; @@ -1334,10 +1337,31 @@ pub struct Aggregate { } impl Aggregate { + /// Create a new aggregate operator. pub fn try_new( input: Arc, group_expr: Vec, aggr_expr: Vec, + ) -> datafusion_common::Result { + let grouping_expr: Vec = grouping_set_to_exprlist(group_expr.as_slice())?; + let all_expr = grouping_expr.iter().chain(aggr_expr.iter()); + validate_unique_names("Aggregations", all_expr.clone())?; + let schema = DFSchema::new_with_metadata( + exprlist_to_fields(all_expr, &input)?, + input.schema().metadata().clone(), + )?; + Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema)) + } + + /// Create a new aggregate operator using the provided schema to avoid the overhead of + /// building the schema again when the schema is already known. + /// + /// This method should only be called when you are absolutely sure that the schema being + /// provided is correct for the aggregate. If in doubt, call [try_new] instead. + pub fn try_new_with_schema( + input: Arc, + group_expr: Vec, + aggr_expr: Vec, schema: DFSchemaRef, ) -> datafusion_common::Result { if group_expr.is_empty() && aggr_expr.is_empty() { diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 21f2c95f6a01..501b4a8f11fc 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -414,7 +414,7 @@ pub fn from_plan( })), LogicalPlan::Aggregate(Aggregate { group_expr, schema, .. - }) => Ok(LogicalPlan::Aggregate(Aggregate::try_new( + }) => Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema( Arc::new(inputs[0].clone()), expr[0..group_expr.len()].to_vec(), expr[group_expr.len()..].to_vec(), diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index d2b30e77f871..552c03d3d79e 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -189,7 +189,7 @@ fn optimize( let new_aggr_expr = pop_expr(&mut new_expr)?; let new_group_expr = pop_expr(&mut new_expr)?; - Ok(LogicalPlan::Aggregate(Aggregate::try_new( + Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema( Arc::new(new_input), new_group_expr, new_aggr_expr, diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index 6ee9b5dc74ca..051a0ed745b2 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -303,8 +303,8 @@ fn optimize_plan( LogicalPlan::Aggregate(Aggregate { group_expr, aggr_expr, - schema, input, + .. }) => { // aggregate: // * remove any aggregate expression that is not required @@ -312,7 +312,6 @@ fn optimize_plan( // Find distinct group by exprs in the case where we have a grouping set let all_group_expr: Vec = grouping_set_to_exprlist(group_expr)?; - exprlist_to_columns(&all_group_expr, &mut new_required_columns)?; // Gather all columns needed for expressions in this Aggregate @@ -320,7 +319,6 @@ fn optimize_plan( aggr_expr.iter().try_for_each(|expr| { let name = &expr.name()?; let column = Column::from_name(name); - if required_columns.contains(&column) { new_aggr_expr.push(expr.clone()); new_required_columns.insert(column); @@ -332,16 +330,6 @@ fn optimize_plan( } })?; - let new_schema = DFSchema::new_with_metadata( - schema - .fields() - .iter() - .filter(|x| new_required_columns.contains(&x.qualified_column())) - .cloned() - .collect(), - schema.metadata().clone(), - )?; - Ok(LogicalPlan::Aggregate(Aggregate::try_new( Arc::new(optimize_plan( _optimizer, @@ -352,7 +340,6 @@ fn optimize_plan( )?), group_expr.clone(), new_aggr_expr, - DFSchemaRef::new(new_schema), )?)) } // scans: diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 0dbbb0bbfebe..6d3343763796 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -119,7 +119,6 @@ fn optimize(plan: &LogicalPlan) -> Result { input.clone(), inner_group_exprs, Vec::new(), - Arc::new(inner_schema.clone()), )?); let inner_agg = optimize_children(&grouped_aggr)?; @@ -152,7 +151,6 @@ fn optimize(plan: &LogicalPlan) -> Result { Arc::new(inner_agg), outer_group_exprs, new_aggr_exprs, - outer_aggr_schema, )?); Ok(LogicalPlan::Projection(Projection::try_new_with_schema(