Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build aggregate schema in Aggregate::try_new #3739

Merged
merged 8 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,13 +700,12 @@ impl DefaultPhysicalPlanner {
LogicalPlan::Distinct(Distinct {input}) => {
// Convert distinct to groupby with no aggregations
let group_expr = expand_wildcard(input.schema(), input)?;
let aggregate = LogicalPlan::Aggregate(Aggregate::try_new(
let aggregate = LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
input.clone(),
group_expr,
vec![],
input.schema().clone()
)?
);
input.schema().clone() // input schema and aggregate schema are the same in this case
)?);
Ok(self.create_initial_plan(&aggregate, session_state).await?)
}
LogicalPlan::Projection(Projection { input, expr, .. }) => {
Expand Down
16 changes: 2 additions & 14 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ use crate::expr_rewriter::{
coerce_plan_expr_for_schema, normalize_col, normalize_cols, rewrite_sort_cols_by_aggs,
};
use crate::type_coercion::binary::comparison_coercion;
use crate::utils::{
columnize_expr, exprlist_to_fields, from_plan, grouping_set_to_exprlist,
};
use crate::utils::{columnize_expr, exprlist_to_fields, from_plan};
use crate::{and, binary_expr, Operator};
use crate::{
logical_plan::{
Expand Down Expand Up @@ -700,20 +698,10 @@ impl LogicalPlanBuilder {
) -> Result<Self> {
let group_expr = normalize_cols(group_expr, &self.plan)?;
let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;

let grouping_expr: Vec<Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;

let all_expr = grouping_expr.iter().chain(aggr_expr.iter());
validate_unique_names("Aggregations", all_expr.clone())?;
let aggr_schema = DFSchema::new_with_metadata(
exprlist_to_fields(all_expr, &self.plan)?,
self.plan.schema().metadata().clone(),
)?;
Comment on lines -704 to -711
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is moved into Aggregate::try_new

Ok(Self::from(LogicalPlan::Aggregate(Aggregate::try_new(
Arc::new(self.plan.clone()),
group_expr,
aggr_expr,
DFSchemaRef::new(aggr_schema),
)?)))
}

Expand Down Expand Up @@ -847,7 +835,7 @@ pub fn build_join_schema(
}

/// Errors if one or more expressions have equal names.
fn validate_unique_names<'a>(
pub(crate) fn validate_unique_names<'a>(
node_name: &str,
expressions: impl IntoIterator<Item = &'a Expr>,
) -> Result<()> {
Expand Down
26 changes: 25 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use crate::logical_plan::builder::validate_unique_names;
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
use crate::utils::{exprlist_to_fields, grouping_set_expr_count};
use crate::utils::{
exprlist_to_fields, grouping_set_expr_count, grouping_set_to_exprlist,
};
use crate::{Expr, TableProviderFilterPushDown, TableSource};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::{plan_err, Column, DFSchema, DFSchemaRef, DataFusionError};
Expand Down Expand Up @@ -1334,10 +1337,31 @@ pub struct Aggregate {
}

impl Aggregate {
/// Create a new aggregate operator.
pub fn try_new(
input: Arc<LogicalPlan>,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
) -> datafusion_common::Result<Self> {
let grouping_expr: Vec<Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
let all_expr = grouping_expr.iter().chain(aggr_expr.iter());
validate_unique_names("Aggregations", all_expr.clone())?;
let schema = DFSchema::new_with_metadata(
exprlist_to_fields(all_expr, &input)?,
input.schema().metadata().clone(),
)?;
Comment on lines +1346 to +1352
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the code moved from LogicalPlanBuilder::aggregate

Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
}

/// Create a new aggregate operator using the provided schema to avoid the overhead of
/// building the schema again when the schema is already known.
///
/// This method should only be called when you are absolutely sure that the schema being
/// provided is correct for the aggregate. If in doubt, call [try_new] instead.
pub fn try_new_with_schema(
input: Arc<LogicalPlan>,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
schema: DFSchemaRef,
) -> datafusion_common::Result<Self> {
if group_expr.is_empty() && aggr_expr.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ pub fn from_plan(
})),
LogicalPlan::Aggregate(Aggregate {
group_expr, schema, ..
}) => Ok(LogicalPlan::Aggregate(Aggregate::try_new(
}) => Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
Arc::new(inputs[0].clone()),
expr[0..group_expr.len()].to_vec(),
expr[group_expr.len()..].to_vec(),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ fn optimize(
let new_aggr_expr = pop_expr(&mut new_expr)?;
let new_group_expr = pop_expr(&mut new_expr)?;

Ok(LogicalPlan::Aggregate(Aggregate::try_new(
Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
Arc::new(new_input),
new_group_expr,
new_aggr_expr,
Expand Down
15 changes: 1 addition & 14 deletions datafusion/optimizer/src/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,24 +303,22 @@ fn optimize_plan(
LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
schema,
input,
..
}) => {
// aggregate:
// * remove any aggregate expression that is not required
// * construct the new set of required columns

// Find distinct group by exprs in the case where we have a grouping set
let all_group_expr: Vec<Expr> = grouping_set_to_exprlist(group_expr)?;

exprlist_to_columns(&all_group_expr, &mut new_required_columns)?;

// Gather all columns needed for expressions in this Aggregate
let mut new_aggr_expr = Vec::new();
aggr_expr.iter().try_for_each(|expr| {
let name = &expr.name()?;
let column = Column::from_name(name);

if required_columns.contains(&column) {
new_aggr_expr.push(expr.clone());
new_required_columns.insert(column);
Expand All @@ -332,16 +330,6 @@ fn optimize_plan(
}
})?;

let new_schema = DFSchema::new_with_metadata(
schema
.fields()
.iter()
.filter(|x| new_required_columns.contains(&x.qualified_column()))
.cloned()
.collect(),
schema.metadata().clone(),
)?;
Comment on lines -335 to -343
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was problematic and is now replaced with the code used elsewhere

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I agree having the caller have to specify (correctly) the aggregate schema is a recipe for disaster


Ok(LogicalPlan::Aggregate(Aggregate::try_new(
Arc::new(optimize_plan(
_optimizer,
Expand All @@ -352,7 +340,6 @@ fn optimize_plan(
)?),
group_expr.clone(),
new_aggr_expr,
DFSchemaRef::new(new_schema),
)?))
}
// scans:
Expand Down
2 changes: 0 additions & 2 deletions datafusion/optimizer/src/single_distinct_to_groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
input.clone(),
inner_group_exprs,
Vec::new(),
Arc::new(inner_schema.clone()),
)?);
let inner_agg = optimize_children(&grouped_aggr)?;

Expand Down Expand Up @@ -152,7 +151,6 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
Arc::new(inner_agg),
outer_group_exprs,
new_aggr_exprs,
outer_aggr_schema,
)?);

Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
Expand Down