From 57bc522930f121b014afea00f6439b1615c1d212 Mon Sep 17 00:00:00 2001 From: jackwener Date: Sat, 1 Jul 2023 23:24:37 +0800 Subject: [PATCH 1/3] fix: from_plan generate Agg can be with different schema. --- datafusion/expr/src/utils.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 61b0db53fb6f..1db2355451ad 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -827,14 +827,13 @@ pub fn from_plan( window_expr: expr[0..window_expr.len()].to_vec(), schema: schema.clone(), })), - LogicalPlan::Aggregate(Aggregate { - group_expr, schema, .. - }) => Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema( - Arc::new(inputs[0].clone()), - expr[0..group_expr.len()].to_vec(), - expr[group_expr.len()..].to_vec(), - schema.clone(), - )?)), + LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => { + Ok(LogicalPlan::Aggregate(Aggregate::try_new( + Arc::new(inputs[0].clone()), + expr[0..group_expr.len()].to_vec(), + expr[group_expr.len()..].to_vec(), + )?)) + } LogicalPlan::Sort(SortPlan { fetch, .. }) => Ok(LogicalPlan::Sort(SortPlan { expr: expr.to_vec(), input: Arc::new(inputs[0].clone()), From 32f2bc85d5e91787d03b7ffd78f356f0a4d6a3e2 Mon Sep 17 00:00:00 2001 From: jackwener Date: Sat, 1 Jul 2023 23:39:16 +0800 Subject: [PATCH 2/3] fix: from_plan generate Window can be with different schema. --- datafusion/expr/src/logical_plan/builder.rs | 17 ++++++---------- datafusion/expr/src/logical_plan/plan.rs | 22 ++++++++++++++++++--- datafusion/expr/src/utils.rs | 15 ++++++-------- 3 files changed, 31 insertions(+), 23 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 61d540049ca6..48621528377e 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -825,17 +825,11 @@ impl LogicalPlanBuilder { window_expr: impl IntoIterator>, ) -> Result { let window_expr = normalize_cols(window_expr, &self.plan)?; - let all_expr = window_expr.iter(); - validate_unique_names("Windows", all_expr.clone())?; - let mut window_fields: Vec = self.plan.schema().fields().clone(); - window_fields.extend_from_slice(&exprlist_to_fields(all_expr, &self.plan)?); - let metadata = self.plan.schema().metadata().clone(); - - Ok(Self::from(LogicalPlan::Window(Window { - input: Arc::new(self.plan), - window_expr, - schema: Arc::new(DFSchema::new_with_metadata(window_fields, metadata)?), - }))) + validate_unique_names("Windows", &window_expr)?; + Ok(Self::from(LogicalPlan::Window(Window::try_new( + window_expr.into(), + Arc::new(self.plan), + )?))) } /// Apply an aggregate: grouping on the `group_expr` expressions @@ -1229,6 +1223,7 @@ pub fn project( plan: LogicalPlan, expr: impl IntoIterator>, ) -> Result { + // TODO: move it into analyzer let input_schema = plan.schema(); let mut projected_expr = vec![]; for e in expr { diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index cf94052252c8..79d1c2cb3c32 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -18,7 +18,7 @@ //! Logical plan types use crate::expr::{Alias, Exists, InSubquery, Placeholder}; -use crate::expr_rewriter::create_col_from_scalar_expr; +use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols}; use crate::expr_vec_fmt; use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; use crate::logical_plan::extension::UserDefinedLogicalNode; @@ -35,8 +35,8 @@ use datafusion_common::tree_node::{ Transformed, TreeNode, TreeNodeVisitor, VisitRecursion, }; use datafusion_common::{ - plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, - Result, ScalarValue, + plan_err, Column, DFField, DFSchema, DFSchemaRef, DataFusionError, + OwnedTableReference, Result, ScalarValue, }; use std::collections::{HashMap, HashSet}; use std::fmt::{self, Debug, Display, Formatter}; @@ -1400,6 +1400,22 @@ pub struct Window { pub schema: DFSchemaRef, } +impl Window { + /// Create a new window operator. + pub fn try_new(window_expr: Vec, input: Arc) -> Result { + let mut window_fields: Vec = input.schema().fields().clone(); + window_fields + .extend_from_slice(&exprlist_to_fields(window_expr.iter(), input.as_ref())?); + let metadata = input.schema().metadata().clone(); + + Ok(Window { + input: input, + window_expr, + schema: Arc::new(DFSchema::new_with_metadata(window_fields, metadata)?), + }) + } +} + /// Produces rows from a table provider by reference or from the context #[derive(Clone)] pub struct TableScan { diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 1db2355451ad..069ce6df71bc 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -818,15 +818,12 @@ pub fn from_plan( input: Arc::new(inputs[0].clone()), })), }, - LogicalPlan::Window(Window { - window_expr, - schema, - .. - }) => Ok(LogicalPlan::Window(Window { - input: Arc::new(inputs[0].clone()), - window_expr: expr[0..window_expr.len()].to_vec(), - schema: schema.clone(), - })), + LogicalPlan::Window(Window { window_expr, .. }) => { + Ok(LogicalPlan::Window(Window::try_new( + expr[0..window_expr.len()].to_vec(), + Arc::new(inputs[0].clone()), + )?)) + } LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => { Ok(LogicalPlan::Aggregate(Aggregate::try_new( Arc::new(inputs[0].clone()), From 646b24e16b53e6dfaafa64d69cbfb4241dc4d16d Mon Sep 17 00:00:00 2001 From: jackwener Date: Sun, 2 Jul 2023 20:08:19 +0800 Subject: [PATCH 3/3] fix clippy --- datafusion/expr/src/logical_plan/builder.rs | 2 +- datafusion/expr/src/logical_plan/plan.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 48621528377e..9ddf6231c53a 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -827,7 +827,7 @@ impl LogicalPlanBuilder { let window_expr = normalize_cols(window_expr, &self.plan)?; validate_unique_names("Windows", &window_expr)?; Ok(Self::from(LogicalPlan::Window(Window::try_new( - window_expr.into(), + window_expr, Arc::new(self.plan), )?))) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 79d1c2cb3c32..e058708701b9 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -18,7 +18,7 @@ //! Logical plan types use crate::expr::{Alias, Exists, InSubquery, Placeholder}; -use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols}; +use crate::expr_rewriter::create_col_from_scalar_expr; use crate::expr_vec_fmt; use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; use crate::logical_plan::extension::UserDefinedLogicalNode; @@ -1409,7 +1409,7 @@ impl Window { let metadata = input.schema().metadata().clone(); Ok(Window { - input: input, + input, window_expr, schema: Arc::new(DFSchema::new_with_metadata(window_fields, metadata)?), })