diff --git a/datafusion/core/src/execution/session_state_defaults.rs b/datafusion/core/src/execution/session_state_defaults.rs index 92f649781cfd..33bf01cf35cd 100644 --- a/datafusion/core/src/execution/session_state_defaults.rs +++ b/datafusion/core/src/execution/session_state_defaults.rs @@ -94,6 +94,8 @@ impl SessionStateDefaults { feature = "unicode_expressions" ))] Arc::new(functions::planner::UserDefinedFunctionPlanner), + Arc::new(functions_aggregate::planner::AggregateFunctionPlanner), + Arc::new(functions_window::planner::WindowFunctionPlanner), ]; expr_planners diff --git a/datafusion/core/tests/dataframe/dataframe_functions.rs b/datafusion/core/tests/dataframe/dataframe_functions.rs index 29c24948fbf0..33f32e8f0f66 100644 --- a/datafusion/core/tests/dataframe/dataframe_functions.rs +++ b/datafusion/core/tests/dataframe/dataframe_functions.rs @@ -22,6 +22,7 @@ use arrow::{ array::{Int32Array, StringArray}, record_batch::RecordBatch, }; +use datafusion_functions_aggregate::count::count_all; use std::sync::Arc; use datafusion::error::Result; @@ -31,7 +32,7 @@ use datafusion::prelude::*; use datafusion::assert_batches_eq; use datafusion_common::{DFSchema, ScalarValue}; use datafusion_expr::expr::Alias; -use datafusion_expr::ExprSchemable; +use datafusion_expr::{table_scan, ExprSchemable, LogicalPlanBuilder}; use datafusion_functions_aggregate::expr_fn::{approx_median, approx_percentile_cont}; use datafusion_functions_nested::map::map; @@ -1123,3 +1124,34 @@ async fn test_fn_map() -> Result<()> { Ok(()) } + +/// Call count wildcard from dataframe API +#[tokio::test] +async fn test_count_wildcard() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("a", DataType::UInt32, false), + Field::new("b", DataType::UInt32, false), + Field::new("c", DataType::UInt32, false), + ]); + + let table_scan = table_scan(Some("test"), &schema, None)?.build()?; + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("b")], vec![count_all()]) + .unwrap() + .project(vec![count_all()]) + .unwrap() + .sort(vec![count_all().sort(true, false)]) + .unwrap() + .build() + .unwrap(); + + let expected = "Sort: count(*) ASC NULLS LAST [count(*):Int64]\ + \n Projection: count(*) [count(*):Int64]\ + \n Aggregate: groupBy=[[test.b]], aggr=[[count(*)]] [b:UInt32, count(*):Int64]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; + + let formatted_plan = plan.display_indent_schema().to_string(); + assert_eq!(formatted_plan, expected); + + Ok(()) +} diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index d545157607c7..b05029e8e3b1 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -32,7 +32,8 @@ use arrow::datatypes::{ }; use arrow::error::ArrowError; use arrow::util::pretty::pretty_format_batches; -use datafusion_functions_aggregate::count::count_udaf; +use datafusion_expr::utils::COUNT_STAR_EXPANSION; +use datafusion_functions_aggregate::count::{count_all, count_udaf}; use datafusion_functions_aggregate::expr_fn::{ array_agg, avg, count, count_distinct, max, median, min, sum, }; @@ -72,7 +73,7 @@ use datafusion_expr::expr::{GroupingSet, Sort, WindowFunction}; use datafusion_expr::var_provider::{VarProvider, VarType}; use datafusion_expr::{ cast, col, create_udf, exists, in_subquery, lit, out_ref_col, placeholder, - scalar_subquery, when, wildcard, Expr, ExprFunctionExt, ExprSchemable, LogicalPlan, + scalar_subquery, when, Expr, ExprFunctionExt, ExprSchemable, LogicalPlan, ScalarFunctionImplementation, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, }; @@ -2463,8 +2464,8 @@ async fn test_count_wildcard_on_sort() -> Result<()> { let df_results = ctx .table("t1") .await? - .aggregate(vec![col("b")], vec![count(wildcard())])? - .sort(vec![count(wildcard()).sort(true, false)])? + .aggregate(vec![col("b")], vec![count_all()])? + .sort(vec![count_all().sort(true, false)])? .explain(false, false)? .collect() .await?; @@ -2498,8 +2499,8 @@ async fn test_count_wildcard_on_where_in() -> Result<()> { Arc::new( ctx.table("t2") .await? - .aggregate(vec![], vec![count(wildcard())])? - .select(vec![count(wildcard())])? + .aggregate(vec![], vec![count_all()])? + .select(vec![count_all()])? .into_optimized_plan()?, ), ))? @@ -2532,8 +2533,8 @@ async fn test_count_wildcard_on_where_exist() -> Result<()> { .filter(exists(Arc::new( ctx.table("t2") .await? - .aggregate(vec![], vec![count(wildcard())])? - .select(vec![count(wildcard())])? + .aggregate(vec![], vec![count_all()])? + .select(vec![count_all()])? .into_unoptimized_plan(), // Usually, into_optimized_plan() should be used here, but due to // https://github.com/apache/datafusion/issues/5771, @@ -2568,7 +2569,7 @@ async fn test_count_wildcard_on_window() -> Result<()> { .await? .select(vec![Expr::WindowFunction(WindowFunction::new( WindowFunctionDefinition::AggregateUDF(count_udaf()), - vec![wildcard()], + vec![Expr::Literal(COUNT_STAR_EXPANSION)], )) .order_by(vec![Sort::new(col("a"), false, true)]) .window_frame(WindowFrame::new_bounds( @@ -2599,17 +2600,16 @@ async fn test_count_wildcard_on_aggregate() -> Result<()> { let sql_results = ctx .sql("select count(*) from t1") .await? - .select(vec![col("count(*)")])? .explain(false, false)? .collect() .await?; - // add `.select(vec![count(wildcard())])?` to make sure we can analyze all node instead of just top node. + // add `.select(vec![count_wildcard()])?` to make sure we can analyze all node instead of just top node. let df_results = ctx .table("t1") .await? - .aggregate(vec![], vec![count(wildcard())])? - .select(vec![count(wildcard())])? + .aggregate(vec![], vec![count_all()])? + .select(vec![count_all()])? .explain(false, false)? .collect() .await?; @@ -2646,8 +2646,8 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> { ctx.table("t2") .await? .filter(out_ref_col(DataType::UInt32, "t1.a").eq(col("t2.a")))? - .aggregate(vec![], vec![count(wildcard())])? - .select(vec![col(count(wildcard()).to_string())])? + .aggregate(vec![], vec![count_all()])? + .select(vec![col(count_all().to_string())])? .into_unoptimized_plan(), )) .gt(lit(ScalarValue::UInt8(Some(0)))), diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index d4b5ae8b2820..128d1d0aa4b6 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -780,7 +780,7 @@ async fn explain_logical_plan_only() { let expected = vec![ vec![ "logical_plan", - "Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]\ + "Aggregate: groupBy=[[]], aggr=[[count(*)]]\ \n SubqueryAlias: t\ \n Projection: \ \n Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"), Int64(2), Int64(150))" diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index df79b3568ce6..f8baf9c94b3c 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -2294,7 +2294,6 @@ impl Display for SchemaDisplay<'_> { | Expr::OuterReferenceColumn(..) | Expr::Placeholder(_) | Expr::Wildcard { .. } => write!(f, "{}", self.0), - Expr::AggregateFunction(AggregateFunction { func, params }) => { match func.schema_name(params) { Ok(name) => { diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs index 04cc26c910cb..a2ed0592efdb 100644 --- a/datafusion/expr/src/planner.rs +++ b/datafusion/expr/src/planner.rs @@ -25,9 +25,12 @@ use datafusion_common::{ config::ConfigOptions, file_options::file_type::FileType, not_impl_err, DFSchema, Result, TableReference, }; -use sqlparser::ast; +use sqlparser::ast::{self, NullTreatment}; -use crate::{AggregateUDF, Expr, GetFieldAccess, ScalarUDF, TableSource, WindowUDF}; +use crate::{ + AggregateUDF, Expr, GetFieldAccess, ScalarUDF, SortExpr, TableSource, WindowFrame, + WindowFunctionDefinition, WindowUDF, +}; /// Provides the `SQL` query planner meta-data about tables and /// functions referenced in SQL statements, without a direct dependency on the @@ -138,7 +141,7 @@ pub trait ExprPlanner: Debug + Send + Sync { /// Plan an array literal, such as `[1, 2, 3]` /// - /// Returns origin expression arguments if not possible + /// Returns original expression arguments if not possible fn plan_array_literal( &self, exprs: Vec, @@ -149,14 +152,14 @@ pub trait ExprPlanner: Debug + Send + Sync { /// Plan a `POSITION` expression, such as `POSITION( in )` /// - /// returns origin expression arguments if not possible + /// Returns original expression arguments if not possible fn plan_position(&self, args: Vec) -> Result>> { Ok(PlannerResult::Original(args)) } /// Plan a dictionary literal, such as `{ key: value, ...}` /// - /// Returns origin expression arguments if not possible + /// Returns original expression arguments if not possible fn plan_dictionary_literal( &self, expr: RawDictionaryExpr, @@ -167,14 +170,14 @@ pub trait ExprPlanner: Debug + Send + Sync { /// Plan an extract expression, such as`EXTRACT(month FROM foo)` /// - /// Returns origin expression arguments if not possible + /// Returns original expression arguments if not possible fn plan_extract(&self, args: Vec) -> Result>> { Ok(PlannerResult::Original(args)) } /// Plan an substring expression, such as `SUBSTRING( [FROM ] [FOR ])` /// - /// Returns origin expression arguments if not possible + /// Returns original expression arguments if not possible fn plan_substring(&self, args: Vec) -> Result>> { Ok(PlannerResult::Original(args)) } @@ -195,14 +198,14 @@ pub trait ExprPlanner: Debug + Send + Sync { /// Plans an overlay expression, such as `overlay(str PLACING substr FROM pos [FOR count])` /// - /// Returns origin expression arguments if not possible + /// Returns original expression arguments if not possible fn plan_overlay(&self, args: Vec) -> Result>> { Ok(PlannerResult::Original(args)) } /// Plans a `make_map` expression, such as `make_map(key1, value1, key2, value2, ...)` /// - /// Returns origin expression arguments if not possible + /// Returns original expression arguments if not possible fn plan_make_map(&self, args: Vec) -> Result>> { Ok(PlannerResult::Original(args)) } @@ -230,6 +233,23 @@ pub trait ExprPlanner: Debug + Send + Sync { fn plan_any(&self, expr: RawBinaryExpr) -> Result> { Ok(PlannerResult::Original(expr)) } + + /// Plans aggregate functions, such as `COUNT()` + /// + /// Returns original expression arguments if not possible + fn plan_aggregate( + &self, + expr: RawAggregateExpr, + ) -> Result> { + Ok(PlannerResult::Original(expr)) + } + + /// Plans window functions, such as `COUNT()` + /// + /// Returns original expression arguments if not possible + fn plan_window(&self, expr: RawWindowExpr) -> Result> { + Ok(PlannerResult::Original(expr)) + } } /// An operator with two arguments to plan @@ -266,6 +286,30 @@ pub struct RawDictionaryExpr { pub values: Vec, } +/// This structure is used by `AggregateFunctionPlanner` to plan operators with +/// custom expressions. +#[derive(Debug, Clone)] +pub struct RawAggregateExpr { + pub func: Arc, + pub args: Vec, + pub distinct: bool, + pub filter: Option>, + pub order_by: Option>, + pub null_treatment: Option, +} + +/// This structure is used by `WindowFunctionPlanner` to plan operators with +/// custom expressions. +#[derive(Debug, Clone)] +pub struct RawWindowExpr { + pub func_def: WindowFunctionDefinition, + pub args: Vec, + pub partition_by: Vec, + pub order_by: Vec, + pub window_frame: WindowFrame, + pub null_treatment: Option, +} + /// Result of planning a raw expr with [`ExprPlanner`] #[derive(Debug, Clone)] pub enum PlannerResult { diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 2b9e2bddd184..ae7196c9b10f 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -515,9 +515,9 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { null_treatment, } = params; - let mut schema_name = String::new(); + let mut display_name = String::new(); - schema_name.write_fmt(format_args!( + display_name.write_fmt(format_args!( "{}({}{})", self.name(), if *distinct { "DISTINCT " } else { "" }, @@ -525,17 +525,22 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { ))?; if let Some(nt) = null_treatment { - schema_name.write_fmt(format_args!(" {}", nt))?; + display_name.write_fmt(format_args!(" {}", nt))?; } if let Some(fe) = filter { - schema_name.write_fmt(format_args!(" FILTER (WHERE {fe})"))?; + display_name.write_fmt(format_args!(" FILTER (WHERE {fe})"))?; } - if let Some(order_by) = order_by { - schema_name - .write_fmt(format_args!(" ORDER BY [{}]", expr_vec_fmt!(order_by)))?; + if let Some(ob) = order_by { + display_name.write_fmt(format_args!( + " ORDER BY [{}]", + ob.iter() + .map(|o| format!("{o}")) + .collect::>() + .join(", ") + ))?; } - Ok(schema_name) + Ok(display_name) } /// Returns the user-defined display name of function, given the arguments diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index cb59042ef468..c11329d7f5b3 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -17,11 +17,15 @@ use ahash::RandomState; use datafusion_common::stats::Precision; +use datafusion_expr::expr::{ + schema_name_from_exprs, schema_name_from_sorts, AggregateFunctionParams, + WindowFunctionParams, +}; use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator; use datafusion_macros::user_doc; use datafusion_physical_expr::expressions; use std::collections::HashSet; -use std::fmt::Debug; +use std::fmt::{Debug, Write}; use std::mem::{size_of, size_of_val}; use std::ops::BitAnd; use std::sync::Arc; @@ -47,11 +51,11 @@ use datafusion_common::{ downcast_value, internal_err, not_impl_err, Result, ScalarValue, }; use datafusion_expr::function::StateFieldsArgs; +use datafusion_expr::{expr_vec_fmt, Expr, ReversedUDAF, StatisticsArgs, TypeSignature}; use datafusion_expr::{ function::AccumulatorArgs, utils::format_state_name, Accumulator, AggregateUDFImpl, Documentation, EmitTo, GroupsAccumulator, SetMonotonicity, Signature, Volatility, }; -use datafusion_expr::{Expr, ReversedUDAF, StatisticsArgs, TypeSignature}; use datafusion_functions_aggregate_common::aggregate::count_distinct::{ BytesDistinctCountAccumulator, FloatDistinctCountAccumulator, PrimitiveDistinctCountAccumulator, @@ -79,6 +83,11 @@ pub fn count_distinct(expr: Expr) -> Expr { )) } +/// Creates aggregation to count all rows, equivalent to `COUNT(*)`, `COUNT()`, `COUNT(1)` +pub fn count_all() -> Expr { + count(Expr::Literal(COUNT_STAR_EXPANSION)) +} + #[user_doc( doc_section(label = "General Functions"), description = "Returns the number of non-null values in the specified column. To include null values in the total count, use `count(*)`.", @@ -139,6 +148,185 @@ impl AggregateUDFImpl for Count { "count" } + fn schema_name(&self, params: &AggregateFunctionParams) -> Result { + let AggregateFunctionParams { + args, + distinct, + filter, + order_by, + null_treatment, + } = params; + + let mut schema_name = String::new(); + + if is_count_wildcard(args) { + schema_name.write_str("count(*)")?; + } else { + schema_name.write_fmt(format_args!( + "{}({}{})", + self.name(), + if *distinct { "DISTINCT " } else { "" }, + schema_name_from_exprs(args)? + ))?; + } + + if let Some(null_treatment) = null_treatment { + schema_name.write_fmt(format_args!(" {}", null_treatment))?; + } + + if let Some(filter) = filter { + schema_name.write_fmt(format_args!(" FILTER (WHERE {filter})"))?; + }; + + if let Some(order_by) = order_by { + schema_name.write_fmt(format_args!( + " ORDER BY [{}]", + schema_name_from_sorts(order_by)? + ))?; + }; + + Ok(schema_name) + } + + fn window_function_schema_name( + &self, + params: &WindowFunctionParams, + ) -> Result { + let WindowFunctionParams { + args, + partition_by, + order_by, + window_frame, + null_treatment, + } = params; + + let mut schema_name = String::new(); + + if is_count_wildcard(args) { + schema_name.write_str("count(*)")?; + } else { + schema_name.write_fmt(format_args!( + "{}({})", + self.name(), + schema_name_from_exprs(args)? + ))?; + } + + if let Some(null_treatment) = null_treatment { + schema_name.write_fmt(format_args!(" {}", null_treatment))?; + } + + if !partition_by.is_empty() { + schema_name.write_fmt(format_args!( + " PARTITION BY [{}]", + schema_name_from_exprs(partition_by)? + ))?; + } + + if !order_by.is_empty() { + schema_name.write_fmt(format_args!( + " ORDER BY [{}]", + schema_name_from_sorts(order_by)? + ))?; + }; + + schema_name.write_fmt(format_args!(" {window_frame}"))?; + + Ok(schema_name) + } + + fn display_name(&self, params: &AggregateFunctionParams) -> Result { + let AggregateFunctionParams { + args, + distinct, + filter, + order_by, + null_treatment, + } = params; + + let mut display_name = String::new(); + + if is_count_wildcard(args) { + display_name.write_str("count(*)")?; + } else { + display_name.write_fmt(format_args!( + "{}({}{})", + self.name(), + if *distinct { "DISTINCT " } else { "" }, + args.iter() + .map(|arg| format!("{arg}")) + .collect::>() + .join(", ") + ))?; + } + + if let Some(nt) = null_treatment { + display_name.write_fmt(format_args!(" {}", nt))?; + } + if let Some(fe) = filter { + display_name.write_fmt(format_args!(" FILTER (WHERE {fe})"))?; + } + if let Some(ob) = order_by { + display_name.write_fmt(format_args!( + " ORDER BY [{}]", + ob.iter() + .map(|o| format!("{o}")) + .collect::>() + .join(", ") + ))?; + } + + Ok(display_name) + } + + fn window_function_display_name( + &self, + params: &WindowFunctionParams, + ) -> Result { + let WindowFunctionParams { + args, + partition_by, + order_by, + window_frame, + null_treatment, + } = params; + + let mut display_name = String::new(); + + if is_count_wildcard(args) { + display_name.write_str("count(*)")?; + } else { + display_name.write_fmt(format_args!( + "{}({})", + self.name(), + expr_vec_fmt!(args) + ))?; + } + + if let Some(null_treatment) = null_treatment { + display_name.write_fmt(format_args!(" {}", null_treatment))?; + } + + if !partition_by.is_empty() { + display_name.write_fmt(format_args!( + " PARTITION BY [{}]", + expr_vec_fmt!(partition_by) + ))?; + } + + if !order_by.is_empty() { + display_name + .write_fmt(format_args!(" ORDER BY [{}]", expr_vec_fmt!(order_by)))?; + }; + + display_name.write_fmt(format_args!( + " {} BETWEEN {} AND {}", + window_frame.units, window_frame.start_bound, window_frame.end_bound + ))?; + + Ok(display_name) + } + fn signature(&self) -> &Signature { &self.signature } @@ -359,6 +547,15 @@ impl AggregateUDFImpl for Count { } } +fn is_count_wildcard(args: &[Expr]) -> bool { + match args { + [] => true, // count() + // All const should be coerced to int64 or rejected by the signature + [Expr::Literal(ScalarValue::Int64(Some(_)))] => true, // count(1) + _ => false, // More than one argument or non-matching cases + } +} + #[derive(Debug)] struct CountAccumulator { count: i64, diff --git a/datafusion/functions-aggregate/src/lib.rs b/datafusion/functions-aggregate/src/lib.rs index f4bdb53efd55..a5c84298e9d5 100644 --- a/datafusion/functions-aggregate/src/lib.rs +++ b/datafusion/functions-aggregate/src/lib.rs @@ -64,28 +64,29 @@ pub mod macros; pub mod approx_distinct; +pub mod approx_median; +pub mod approx_percentile_cont; +pub mod approx_percentile_cont_with_weight; pub mod array_agg; +pub mod average; +pub mod bit_and_or_xor; +pub mod bool_and_or; pub mod correlation; pub mod count; pub mod covariance; pub mod first_last; +pub mod grouping; pub mod hyperloglog; pub mod median; pub mod min_max; +pub mod nth_value; pub mod regr; pub mod stddev; +pub mod string_agg; pub mod sum; pub mod variance; -pub mod approx_median; -pub mod approx_percentile_cont; -pub mod approx_percentile_cont_with_weight; -pub mod average; -pub mod bit_and_or_xor; -pub mod bool_and_or; -pub mod grouping; -pub mod nth_value; -pub mod string_agg; +pub mod planner; use crate::approx_percentile_cont::approx_percentile_cont_udaf; use crate::approx_percentile_cont_with_weight::approx_percentile_cont_with_weight_udaf; diff --git a/datafusion/functions-aggregate/src/planner.rs b/datafusion/functions-aggregate/src/planner.rs new file mode 100644 index 000000000000..1f0a42c4c71b --- /dev/null +++ b/datafusion/functions-aggregate/src/planner.rs @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! SQL planning extensions like [`AggregateFunctionPlanner`] + +use datafusion_common::Result; +use datafusion_expr::{ + expr::AggregateFunction, + lit, + planner::{ExprPlanner, PlannerResult, RawAggregateExpr}, + utils::COUNT_STAR_EXPANSION, + Expr, +}; + +#[derive(Debug)] +pub struct AggregateFunctionPlanner; + +impl ExprPlanner for AggregateFunctionPlanner { + fn plan_aggregate( + &self, + expr: RawAggregateExpr, + ) -> Result> { + if expr.func.name() == "count" + && (expr.args.len() == 1 && matches!(expr.args[0], Expr::Wildcard { .. }) + || expr.args.is_empty()) + { + let RawAggregateExpr { + func, + args: _, + distinct, + filter, + order_by, + null_treatment, + } = expr; + return Ok(PlannerResult::Planned(Expr::AggregateFunction( + AggregateFunction::new_udf( + func, + vec![lit(COUNT_STAR_EXPANSION)], + distinct, + filter, + order_by, + null_treatment, + ), + ))); + } + + Ok(PlannerResult::Original(expr)) + } +} diff --git a/datafusion/functions-window/src/lib.rs b/datafusion/functions-window/src/lib.rs index 0d932bf84725..718b0bf1587b 100644 --- a/datafusion/functions-window/src/lib.rs +++ b/datafusion/functions-window/src/lib.rs @@ -45,6 +45,9 @@ pub mod nth_value; pub mod ntile; pub mod rank; pub mod row_number; + +pub mod planner; + mod utils; /// Fluent-style API for creating `Expr`s diff --git a/datafusion/functions-window/src/planner.rs b/datafusion/functions-window/src/planner.rs new file mode 100644 index 000000000000..8f48ca8b18dc --- /dev/null +++ b/datafusion/functions-window/src/planner.rs @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! SQL planning extensions like [`WindowFunctionPlanner`] + +use datafusion_common::Result; +use datafusion_expr::{ + expr::WindowFunction, + lit, + planner::{ExprPlanner, PlannerResult, RawWindowExpr}, + utils::COUNT_STAR_EXPANSION, + Expr, ExprFunctionExt, +}; + +#[derive(Debug)] +pub struct WindowFunctionPlanner; + +impl ExprPlanner for WindowFunctionPlanner { + fn plan_window(&self, expr: RawWindowExpr) -> Result> { + if expr.func_def.name() == "count" + && (expr.args.len() == 1 && matches!(expr.args[0], Expr::Wildcard { .. }) + || expr.args.is_empty()) + { + let RawWindowExpr { + func_def, + args: _, + partition_by, + order_by, + window_frame, + null_treatment, + } = expr; + return Ok(PlannerResult::Planned( + Expr::WindowFunction(WindowFunction::new( + func_def, + vec![lit(COUNT_STAR_EXPANSION)], + )) + .partition_by(partition_by) + .order_by(order_by) + .window_frame(window_frame) + .null_treatment(null_treatment) + .build()?, + )); + } + + Ok(PlannerResult::Original(expr)) + } +} diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs deleted file mode 100644 index f517761b1e33..000000000000 --- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs +++ /dev/null @@ -1,277 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::analyzer::AnalyzerRule; - -use crate::utils::NamePreserver; -use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::Result; -use datafusion_expr::expr::{AggregateFunction, AggregateFunctionParams, WindowFunction}; -use datafusion_expr::utils::COUNT_STAR_EXPANSION; -use datafusion_expr::{lit, Expr, LogicalPlan, WindowFunctionDefinition}; - -/// Rewrite `Count(Expr::Wildcard)` to `Count(Expr::Literal)`. -/// -/// Resolves issue: -#[derive(Default, Debug)] -pub struct CountWildcardRule {} - -impl CountWildcardRule { - pub fn new() -> Self { - Self {} - } -} - -impl AnalyzerRule for CountWildcardRule { - fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result { - plan.transform_down_with_subqueries(analyze_internal).data() - } - - fn name(&self) -> &str { - "count_wildcard_rule" - } -} - -fn is_wildcard(expr: &Expr) -> bool { - matches!(expr, Expr::Wildcard { .. }) -} - -fn is_count_star_aggregate(aggregate_function: &AggregateFunction) -> bool { - matches!(aggregate_function, - AggregateFunction { - func, - params: AggregateFunctionParams { args, .. }, - } if func.name() == "count" && (args.len() == 1 && is_wildcard(&args[0]) || args.is_empty())) -} - -fn is_count_star_window_aggregate(window_function: &WindowFunction) -> bool { - let args = &window_function.params.args; - matches!(window_function.fun, - WindowFunctionDefinition::AggregateUDF(ref udaf) - if udaf.name() == "count" && (args.len() == 1 && is_wildcard(&args[0]) || args.is_empty())) -} - -fn analyze_internal(plan: LogicalPlan) -> Result> { - let name_preserver = NamePreserver::new(&plan); - plan.map_expressions(|expr| { - let original_name = name_preserver.save(&expr); - let transformed_expr = expr.transform_up(|expr| match expr { - Expr::WindowFunction(mut window_function) - if is_count_star_window_aggregate(&window_function) => - { - window_function.params.args = vec![lit(COUNT_STAR_EXPANSION)]; - Ok(Transformed::yes(Expr::WindowFunction(window_function))) - } - Expr::AggregateFunction(mut aggregate_function) - if is_count_star_aggregate(&aggregate_function) => - { - aggregate_function.params.args = vec![lit(COUNT_STAR_EXPANSION)]; - Ok(Transformed::yes(Expr::AggregateFunction( - aggregate_function, - ))) - } - _ => Ok(Transformed::no(expr)), - })?; - Ok(transformed_expr.update_data(|data| original_name.restore(data))) - }) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::test::*; - use arrow::datatypes::DataType; - use datafusion_common::ScalarValue; - use datafusion_expr::expr::Sort; - use datafusion_expr::ExprFunctionExt; - use datafusion_expr::{ - col, exists, in_subquery, logical_plan::LogicalPlanBuilder, out_ref_col, - scalar_subquery, wildcard, WindowFrame, WindowFrameBound, WindowFrameUnits, - }; - use datafusion_functions_aggregate::count::count_udaf; - use datafusion_functions_aggregate::expr_fn::max; - use std::sync::Arc; - - use datafusion_functions_aggregate::expr_fn::{count, sum}; - - fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> { - assert_analyzed_plan_eq_display_indent( - Arc::new(CountWildcardRule::new()), - plan, - expected, - ) - } - - #[test] - fn test_count_wildcard_on_sort() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("b")], vec![count(wildcard())])? - .project(vec![count(wildcard())])? - .sort(vec![count(wildcard()).sort(true, false)])? - .build()?; - let expected = "Sort: count(*) ASC NULLS LAST [count(*):Int64]\ - \n Projection: count(*) [count(*):Int64]\ - \n Aggregate: groupBy=[[test.b]], aggr=[[count(Int64(1)) AS count(*)]] [b:UInt32, count(*):Int64]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_plan_eq(plan, expected) - } - - #[test] - fn test_count_wildcard_on_where_in() -> Result<()> { - let table_scan_t1 = test_table_scan_with_name("t1")?; - let table_scan_t2 = test_table_scan_with_name("t2")?; - - let plan = LogicalPlanBuilder::from(table_scan_t1) - .filter(in_subquery( - col("a"), - Arc::new( - LogicalPlanBuilder::from(table_scan_t2) - .aggregate(Vec::::new(), vec![count(wildcard())])? - .project(vec![count(wildcard())])? - .build()?, - ), - ))? - .build()?; - - let expected = "Filter: t1.a IN () [a:UInt32, b:UInt32, c:UInt32]\ - \n Subquery: [count(*):Int64]\ - \n Projection: count(*) [count(*):Int64]\ - \n Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] [count(*):Int64]\ - \n TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]"; - assert_plan_eq(plan, expected) - } - - #[test] - fn test_count_wildcard_on_where_exists() -> Result<()> { - let table_scan_t1 = test_table_scan_with_name("t1")?; - let table_scan_t2 = test_table_scan_with_name("t2")?; - - let plan = LogicalPlanBuilder::from(table_scan_t1) - .filter(exists(Arc::new( - LogicalPlanBuilder::from(table_scan_t2) - .aggregate(Vec::::new(), vec![count(wildcard())])? - .project(vec![count(wildcard())])? - .build()?, - )))? - .build()?; - - let expected = "Filter: EXISTS () [a:UInt32, b:UInt32, c:UInt32]\ - \n Subquery: [count(*):Int64]\ - \n Projection: count(*) [count(*):Int64]\ - \n Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] [count(*):Int64]\ - \n TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]"; - assert_plan_eq(plan, expected) - } - - #[test] - fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> { - let table_scan_t1 = test_table_scan_with_name("t1")?; - let table_scan_t2 = test_table_scan_with_name("t2")?; - - let plan = LogicalPlanBuilder::from(table_scan_t1) - .filter( - scalar_subquery(Arc::new( - LogicalPlanBuilder::from(table_scan_t2) - .filter(out_ref_col(DataType::UInt32, "t1.a").eq(col("t2.a")))? - .aggregate( - Vec::::new(), - vec![count(lit(COUNT_STAR_EXPANSION))], - )? - .project(vec![count(lit(COUNT_STAR_EXPANSION))])? - .build()?, - )) - .gt(lit(ScalarValue::UInt8(Some(0)))), - )? - .project(vec![col("t1.a"), col("t1.b")])? - .build()?; - - let expected = "Projection: t1.a, t1.b [a:UInt32, b:UInt32]\ - \n Filter: () > UInt8(0) [a:UInt32, b:UInt32, c:UInt32]\ - \n Subquery: [count(Int64(1)):Int64]\ - \n Projection: count(Int64(1)) [count(Int64(1)):Int64]\ - \n Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] [count(Int64(1)):Int64]\ - \n Filter: outer_ref(t1.a) = t2.a [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]"; - assert_plan_eq(plan, expected) - } - #[test] - fn test_count_wildcard_on_window() -> Result<()> { - let table_scan = test_table_scan()?; - - let plan = LogicalPlanBuilder::from(table_scan) - .window(vec![Expr::WindowFunction(WindowFunction::new( - WindowFunctionDefinition::AggregateUDF(count_udaf()), - vec![wildcard()], - )) - .order_by(vec![Sort::new(col("a"), false, true)]) - .window_frame(WindowFrame::new_bounds( - WindowFrameUnits::Range, - WindowFrameBound::Preceding(ScalarValue::UInt32(Some(6))), - WindowFrameBound::Following(ScalarValue::UInt32(Some(2))), - )) - .build()?])? - .project(vec![count(wildcard())])? - .build()?; - - let expected = "Projection: count(Int64(1)) AS count(*) [count(*):Int64]\ - \n WindowAggr: windowExpr=[[count(Int64(1)) ORDER BY [test.a DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING AS count(*) ORDER BY [test.a DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING]] [a:UInt32, b:UInt32, c:UInt32, count(*) ORDER BY [test.a DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING:Int64]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_plan_eq(plan, expected) - } - - #[test] - fn test_count_wildcard_on_aggregate() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(Vec::::new(), vec![count(wildcard())])? - .project(vec![count(wildcard())])? - .build()?; - - let expected = "Projection: count(*) [count(*):Int64]\ - \n Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] [count(*):Int64]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_plan_eq(plan, expected) - } - - #[test] - fn test_count_wildcard_on_non_count_aggregate() -> Result<()> { - let table_scan = test_table_scan()?; - let res = LogicalPlanBuilder::from(table_scan) - .aggregate(Vec::::new(), vec![sum(wildcard())]); - assert!(res.is_err()); - Ok(()) - } - - #[test] - fn test_count_wildcard_on_nesting() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(Vec::::new(), vec![max(count(wildcard()))])? - .project(vec![count(wildcard())])? - .build()?; - - let expected = "Projection: count(Int64(1)) AS count(*) [count(*):Int64]\ - \n Aggregate: groupBy=[[]], aggr=[[max(count(Int64(1))) AS max(count(*))]] [max(count(*)):Int64;N]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_plan_eq(plan, expected) - } -} diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index 9d0ac6b54cf4..c506616d142e 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -28,7 +28,6 @@ use datafusion_common::Result; use datafusion_expr::expr_rewriter::FunctionRewrite; use datafusion_expr::{InvariantLevel, LogicalPlan}; -use crate::analyzer::count_wildcard_rule::CountWildcardRule; use crate::analyzer::expand_wildcard_rule::ExpandWildcardRule; use crate::analyzer::inline_table_scan::InlineTableScan; use crate::analyzer::resolve_grouping_function::ResolveGroupingFunction; @@ -37,7 +36,6 @@ use crate::utils::log_plan; use self::function_rewrite::ApplyFunctionRewrites; -pub mod count_wildcard_rule; pub mod expand_wildcard_rule; pub mod function_rewrite; pub mod inline_table_scan; @@ -106,7 +104,6 @@ impl Analyzer { // [Expr::Wildcard] should be expanded before [TypeCoercion] Arc::new(ResolveGroupingFunction::new()), Arc::new(TypeCoercion::new()), - Arc::new(CountWildcardRule::new()), ]; Self::with_rules(rules) } diff --git a/datafusion/optimizer/tests/optimizer_integration.rs b/datafusion/optimizer/tests/optimizer_integration.rs index a33ecbc3a1fb..b59acd72a26d 100644 --- a/datafusion/optimizer/tests/optimizer_integration.rs +++ b/datafusion/optimizer/tests/optimizer_integration.rs @@ -23,11 +23,14 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; use datafusion_common::config::ConfigOptions; use datafusion_common::{assert_contains, plan_err, Result, TableReference}; +use datafusion_expr::planner::ExprPlanner; use datafusion_expr::sqlparser::dialect::PostgreSqlDialect; use datafusion_expr::test::function_stub::sum_udaf; use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF}; use datafusion_functions_aggregate::average::avg_udaf; use datafusion_functions_aggregate::count::count_udaf; +use datafusion_functions_aggregate::planner::AggregateFunctionPlanner; +use datafusion_functions_window::planner::WindowFunctionPlanner; use datafusion_optimizer::analyzer::type_coercion::TypeCoercionRewriter; use datafusion_optimizer::analyzer::Analyzer; use datafusion_optimizer::optimizer::Optimizer; @@ -195,7 +198,7 @@ fn between_date32_plus_interval() -> Result<()> { WHERE col_date32 between '1998-03-18' AND cast('1998-03-18' as date) + INTERVAL '90 days'"; let plan = test_sql(sql)?; let expected = - "Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]\ + "Aggregate: groupBy=[[]], aggr=[[count(*)]]\ \n Projection: \ \n Filter: test.col_date32 >= Date32(\"1998-03-18\") AND test.col_date32 <= Date32(\"1998-06-16\")\ \n TableScan: test projection=[col_date32]"; @@ -209,7 +212,7 @@ fn between_date64_plus_interval() -> Result<()> { WHERE col_date64 between '1998-03-18T00:00:00' AND cast('1998-03-18' as date) + INTERVAL '90 days'"; let plan = test_sql(sql)?; let expected = - "Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]\ + "Aggregate: groupBy=[[]], aggr=[[count(*)]]\ \n Projection: \ \n Filter: test.col_date64 >= Date64(\"1998-03-18\") AND test.col_date64 <= Date64(\"1998-06-16\")\ \n TableScan: test projection=[col_date64]"; @@ -266,7 +269,7 @@ fn push_down_filter_groupby_expr_contains_alias() { let sql = "SELECT * FROM (SELECT (col_int32 + col_uint32) AS c, count(*) FROM test GROUP BY 1) where c > 3"; let plan = test_sql(sql).unwrap(); let expected = "Projection: test.col_int32 + test.col_uint32 AS c, count(*)\ - \n Aggregate: groupBy=[[test.col_int32 + CAST(test.col_uint32 AS Int32)]], aggr=[[count(Int64(1)) AS count(*)]]\ + \n Aggregate: groupBy=[[test.col_int32 + CAST(test.col_uint32 AS Int32)]], aggr=[[count(*)]]\ \n Filter: test.col_int32 + CAST(test.col_uint32 AS Int32) > Int32(3)\ \n TableScan: test projection=[col_int32, col_uint32]"; assert_eq!(expected, format!("{plan}")); @@ -311,7 +314,7 @@ fn eliminate_redundant_null_check_on_count() { let plan = test_sql(sql).unwrap(); let expected = "\ Projection: test.col_int32, count(*) AS c\ - \n Aggregate: groupBy=[[test.col_int32]], aggr=[[count(Int64(1)) AS count(*)]]\ + \n Aggregate: groupBy=[[test.col_int32]], aggr=[[count(*)]]\ \n TableScan: test projection=[col_int32]"; assert_eq!(expected, format!("{plan}")); } @@ -422,7 +425,12 @@ fn test_sql(sql: &str) -> Result { let context_provider = MyContextProvider::default() .with_udaf(sum_udaf()) .with_udaf(count_udaf()) - .with_udaf(avg_udaf()); + .with_udaf(avg_udaf()) + .with_expr_planners(vec![ + Arc::new(AggregateFunctionPlanner), + Arc::new(WindowFunctionPlanner), + ]); + let sql_to_rel = SqlToRel::new(&context_provider); let plan = sql_to_rel.sql_statement_to_plan(statement.clone())?; @@ -440,6 +448,7 @@ fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} struct MyContextProvider { options: ConfigOptions, udafs: HashMap>, + expr_planners: Vec>, } impl MyContextProvider { @@ -448,6 +457,11 @@ impl MyContextProvider { self.udafs.insert(udaf.name().to_lowercase(), udaf); self } + + fn with_expr_planners(mut self, expr_planners: Vec>) -> Self { + self.expr_planners = expr_planners; + self + } } impl ContextProvider for MyContextProvider { @@ -516,6 +530,10 @@ impl ContextProvider for MyContextProvider { fn udwf_names(&self) -> Vec { Vec::new() } + + fn get_expr_planners(&self) -> &[Arc] { + &self.expr_planners + } } struct MyTableSource { diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 1cf3dcb289a6..035749a78941 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -23,7 +23,7 @@ use datafusion_common::{ DFSchema, Dependency, Result, }; use datafusion_expr::expr::{ScalarFunction, Unnest}; -use datafusion_expr::planner::PlannerResult; +use datafusion_expr::planner::{PlannerResult, RawAggregateExpr, RawWindowExpr}; use datafusion_expr::{ expr, qualified_wildcard, wildcard, Expr, ExprFunctionExt, ExprSchemable, WindowFrame, WindowFunctionDefinition, @@ -315,15 +315,38 @@ impl SqlToRel<'_, S> { }; if let Ok(fun) = self.find_window_func(&name) { - return Expr::WindowFunction(expr::WindowFunction::new( - fun, - self.function_args_to_expr(args, schema, planner_context)?, - )) - .partition_by(partition_by) - .order_by(order_by) - .window_frame(window_frame) - .null_treatment(null_treatment) - .build(); + let args = self.function_args_to_expr(args, schema, planner_context)?; + let mut window_expr = RawWindowExpr { + func_def: fun, + args, + partition_by, + order_by, + window_frame, + null_treatment, + }; + + for planner in self.context_provider.get_expr_planners().iter() { + match planner.plan_window(window_expr)? { + PlannerResult::Planned(expr) => return Ok(expr), + PlannerResult::Original(expr) => window_expr = expr, + } + } + + let RawWindowExpr { + func_def, + args, + partition_by, + order_by, + window_frame, + null_treatment, + } = window_expr; + + return Expr::WindowFunction(expr::WindowFunction::new(func_def, args)) + .partition_by(partition_by) + .order_by(order_by) + .window_frame(window_frame) + .null_treatment(null_treatment) + .build(); } } else { // User defined aggregate functions (UDAF) have precedence in case it has the same name as a scalar built-in function @@ -341,8 +364,33 @@ impl SqlToRel<'_, S> { .map(|e| self.sql_expr_to_logical_expr(*e, schema, planner_context)) .transpose()? .map(Box::new); + + let mut aggregate_expr = RawAggregateExpr { + func: fm, + args, + distinct, + filter, + order_by, + null_treatment, + }; + for planner in self.context_provider.get_expr_planners().iter() { + match planner.plan_aggregate(aggregate_expr)? { + PlannerResult::Planned(expr) => return Ok(expr), + PlannerResult::Original(expr) => aggregate_expr = expr, + } + } + + let RawAggregateExpr { + func, + args, + distinct, + filter, + order_by, + null_treatment, + } = aggregate_expr; + return Ok(Expr::AggregateFunction(expr::AggregateFunction::new_udf( - fm, + func, args, distinct, filter, diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 1df18302687e..9c0d6316adb2 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -1461,14 +1461,14 @@ fn select_simple_aggregate_with_groupby_and_column_is_in_aggregate_and_groupby() fn select_simple_aggregate_with_groupby_can_use_positions() { quick_test( "SELECT state, age AS b, count(1) FROM person GROUP BY 1, 2", - "Projection: person.state, person.age AS b, count(Int64(1))\ - \n Aggregate: groupBy=[[person.state, person.age]], aggr=[[count(Int64(1))]]\ + "Projection: person.state, person.age AS b, count(*)\ + \n Aggregate: groupBy=[[person.state, person.age]], aggr=[[count(*)]]\ \n TableScan: person", ); quick_test( "SELECT state, age AS b, count(1) FROM person GROUP BY 2, 1", - "Projection: person.state, person.age AS b, count(Int64(1))\ - \n Aggregate: groupBy=[[person.age, person.state]], aggr=[[count(Int64(1))]]\ + "Projection: person.state, person.age AS b, count(*)\ + \n Aggregate: groupBy=[[person.age, person.state]], aggr=[[count(*)]]\ \n TableScan: person", ); } @@ -1630,8 +1630,8 @@ fn test_wildcard() { #[test] fn select_count_one() { let sql = "SELECT count(1) FROM person"; - let expected = "Projection: count(Int64(1))\ - \n Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]\ + let expected = "Projection: count(*)\ + \n Aggregate: groupBy=[[]], aggr=[[count(*)]]\ \n TableScan: person"; quick_test(sql, expected); } diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 7caa81d64e5b..f175973f92a1 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -6276,6 +6276,26 @@ physical_plan 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5], file_type=csv, has_header=true +# test count(null) case (null with type) + +statement count 0 +create table t(a int, b int) as values (1, 3), (2, 4), (3, 5); + +query I +select count(null::bigint) from t; +---- +0 + +query TT +explain select count(null::bigint) from t; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[count(Int64(NULL)) AS count(NULL)]] +02)--TableScan: t projection=[] +physical_plan +01)AggregateExec: mode=Single, gby=[], aggr=[count(NULL)] +02)--DataSourceExec: partitions=1, partition_sizes=[1] + ####### # Group median test ####### diff --git a/datafusion/sqllogictest/test_files/avro.slt b/datafusion/sqllogictest/test_files/avro.slt index 80bf0bc2dd5a..20179e0c5bdc 100644 --- a/datafusion/sqllogictest/test_files/avro.slt +++ b/datafusion/sqllogictest/test_files/avro.slt @@ -243,7 +243,7 @@ query TT EXPLAIN SELECT count(*) from alltypes_plain ---- logical_plan -01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] +01)Aggregate: groupBy=[[]], aggr=[[count(*)]] 02)--TableScan: alltypes_plain projection=[] physical_plan 01)AggregateExec: mode=Final, gby=[], aggr=[count(*)] diff --git a/datafusion/sqllogictest/test_files/coalesce.slt b/datafusion/sqllogictest/test_files/coalesce.slt index 5f2d2f0d1da9..e7cf31dc690b 100644 --- a/datafusion/sqllogictest/test_files/coalesce.slt +++ b/datafusion/sqllogictest/test_files/coalesce.slt @@ -442,4 +442,4 @@ drop table test query T select coalesce(arrow_cast('', 'Utf8View'), arrow_cast('', 'Dictionary(UInt32, Utf8)')); ---- -(empty) \ No newline at end of file +(empty) diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 7dd85b3ae2d8..f39ff56ce449 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -631,4 +631,3 @@ COPY source_table to '/tmp/table.parquet' (row_group_size 55 + 102); # Copy using execution.keep_partition_by_columns with an invalid value query error DataFusion error: Invalid or Unsupported Configuration: provided value for 'execution.keep_partition_by_columns' was not recognized: "invalid_value" COPY source_table to '/tmp/table.parquet' OPTIONS (execution.keep_partition_by_columns invalid_value); - diff --git a/datafusion/sqllogictest/test_files/count_star_rule.slt b/datafusion/sqllogictest/test_files/count_star_rule.slt index d660257b609d..0efd9e99889f 100644 --- a/datafusion/sqllogictest/test_files/count_star_rule.slt +++ b/datafusion/sqllogictest/test_files/count_star_rule.slt @@ -31,44 +31,44 @@ query TT EXPLAIN SELECT COUNT() FROM (SELECT 1 AS a, 2 AS b) AS t; ---- logical_plan -01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count()]] +01)Aggregate: groupBy=[[]], aggr=[[count(*)]] 02)--SubqueryAlias: t 03)----EmptyRelation physical_plan -01)ProjectionExec: expr=[1 as count()] +01)ProjectionExec: expr=[1 as count(*)] 02)--PlaceholderRowExec query TT EXPLAIN SELECT t1.a, COUNT() FROM t1 GROUP BY t1.a; ---- logical_plan -01)Aggregate: groupBy=[[t1.a]], aggr=[[count(Int64(1)) AS count()]] +01)Aggregate: groupBy=[[t1.a]], aggr=[[count(*)]] 02)--TableScan: t1 projection=[a] physical_plan -01)AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count()] +01)AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)] 02)--CoalesceBatchesExec: target_batch_size=8192 03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count()] +05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)] 06)----------DataSourceExec: partitions=1, partition_sizes=[1] query TT EXPLAIN SELECT t1.a, COUNT() AS cnt FROM t1 GROUP BY t1.a HAVING COUNT() > 0; ---- logical_plan -01)Projection: t1.a, count() AS cnt -02)--Filter: count() > Int64(0) -03)----Aggregate: groupBy=[[t1.a]], aggr=[[count(Int64(1)) AS count()]] +01)Projection: t1.a, count(*) AS cnt +02)--Filter: count(*) > Int64(0) +03)----Aggregate: groupBy=[[t1.a]], aggr=[[count(*)]] 04)------TableScan: t1 projection=[a] physical_plan -01)ProjectionExec: expr=[a@0 as a, count()@1 as cnt] +01)ProjectionExec: expr=[a@0 as a, count(*)@1 as cnt] 02)--CoalesceBatchesExec: target_batch_size=8192 -03)----FilterExec: count()@1 > 0 -04)------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count()] +03)----FilterExec: count(*)@1 > 0 +04)------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)] 05)--------CoalesceBatchesExec: target_batch_size=8192 06)----------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -08)--------------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count()] +08)--------------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)] 09)----------------DataSourceExec: partitions=1, partition_sizes=[1] query II @@ -80,12 +80,12 @@ query TT EXPLAIN SELECT a, COUNT() OVER (PARTITION BY a) AS count_a FROM t1; ---- logical_plan -01)Projection: t1.a, count() PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS count_a -02)--WindowAggr: windowExpr=[[count(Int64(1)) PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS count() PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] +01)Projection: t1.a, count(*) PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS count_a +02)--WindowAggr: windowExpr=[[count(*) PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] 03)----TableScan: t1 projection=[a] physical_plan -01)ProjectionExec: expr=[a@0 as a, count() PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as count_a] -02)--WindowAggExec: wdw=[count() PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "count() PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] +01)ProjectionExec: expr=[a@0 as a, count(*) PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as count_a] +02)--WindowAggExec: wdw=[count(*) PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "count(*) PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] 03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] 04)------DataSourceExec: partitions=1, partition_sizes=[1] diff --git a/datafusion/sqllogictest/test_files/ddl.slt b/datafusion/sqllogictest/test_files/ddl.slt index aefc2672b539..6f75a7d7f8fd 100644 --- a/datafusion/sqllogictest/test_files/ddl.slt +++ b/datafusion/sqllogictest/test_files/ddl.slt @@ -827,4 +827,3 @@ drop table table_with_pk; statement ok set datafusion.catalog.information_schema = false; - diff --git a/datafusion/sqllogictest/test_files/errors.slt b/datafusion/sqllogictest/test_files/errors.slt index a35a4d6f28dc..dc7a53adf889 100644 --- a/datafusion/sqllogictest/test_files/errors.slt +++ b/datafusion/sqllogictest/test_files/errors.slt @@ -184,4 +184,4 @@ query error DataFusion error: Schema error: No field named ammp\. Did you mean ' select ammp from a; statement ok -drop table a; \ No newline at end of file +drop table a; diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 037565ce05f9..0d5eab6cf56d 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -178,7 +178,6 @@ logical_plan after inline_table_scan SAME TEXT AS ABOVE logical_plan after expand_wildcard_rule SAME TEXT AS ABOVE logical_plan after resolve_grouping_function SAME TEXT AS ABOVE logical_plan after type_coercion SAME TEXT AS ABOVE -logical_plan after count_wildcard_rule SAME TEXT AS ABOVE analyzed_logical_plan SAME TEXT AS ABOVE logical_plan after eliminate_nested_union SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE @@ -427,7 +426,7 @@ logical_plan 02)--TableScan: t1 projection=[a] 03)--SubqueryAlias: __correlated_sq_1 04)----Projection: -05)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] +05)------Aggregate: groupBy=[[]], aggr=[[count(*)]] 06)--------TableScan: t2 projection=[] physical_plan 01)NestedLoopJoinExec: join_type=LeftSemi diff --git a/datafusion/sqllogictest/test_files/insert.slt b/datafusion/sqllogictest/test_files/insert.slt index ee76ee1c5511..32428fdef765 100644 --- a/datafusion/sqllogictest/test_files/insert.slt +++ b/datafusion/sqllogictest/test_files/insert.slt @@ -61,7 +61,7 @@ logical_plan 02)--Projection: sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2 03)----Sort: aggregate_test_100.c1 ASC NULLS LAST 04)------Projection: sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1 -05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] +05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] 06)----------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan 01)DataSinkExec: sink=MemoryTable (partitions=1) @@ -122,7 +122,7 @@ FROM aggregate_test_100 logical_plan 01)Dml: op=[Insert Into] table=[table_without_values] 02)--Projection: sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2 -03)----WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] +03)----WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] 04)------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan 01)DataSinkExec: sink=MemoryTable (partitions=1) @@ -172,7 +172,7 @@ logical_plan 02)--Projection: a1 AS a1, a2 AS a2 03)----Sort: aggregate_test_100.c1 ASC NULLS LAST 04)------Projection: sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a1, count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a2, aggregate_test_100.c1 -05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] +05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] 06)----------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan 01)DataSinkExec: sink=MemoryTable (partitions=8) diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index ee1d67c5e26d..752e8ce0e4ff 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -352,7 +352,7 @@ logical_plan 02)--Projection: sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2 03)----Sort: aggregate_test_100.c1 ASC NULLS LAST 04)------Projection: sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1 -05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] +05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] 06)----------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan 01)DataSinkExec: sink=ParquetSink(file_groups=[]) @@ -414,7 +414,7 @@ FROM aggregate_test_100 logical_plan 01)Dml: op=[Insert Into] table=[table_without_values] 02)--Projection: sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2 -03)----WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] +03)----WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] 04)------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan 01)DataSinkExec: sink=ParquetSink(file_groups=[]) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 5d311bc43293..5b5368f6b0f4 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -1396,7 +1396,7 @@ group by t1_id ---- logical_plan 01)Projection: count(*) -02)--Aggregate: groupBy=[[join_t1.t1_id]], aggr=[[count(Int64(1)) AS count(*)]] +02)--Aggregate: groupBy=[[join_t1.t1_id]], aggr=[[count(*)]] 03)----Projection: join_t1.t1_id 04)------Inner Join: join_t1.t1_id = join_t2.t2_id 05)--------TableScan: join_t1 projection=[t1_id] @@ -4442,7 +4442,7 @@ FROM my_catalog.my_schema.table_with_many_types AS l JOIN my_catalog.my_schema.table_with_many_types AS r ON l.binary_col = r.binary_col ---- logical_plan -01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] +01)Aggregate: groupBy=[[]], aggr=[[count(*)]] 02)--Projection: 03)----Inner Join: l.binary_col = r.binary_col 04)------SubqueryAlias: l diff --git a/datafusion/sqllogictest/test_files/json.slt b/datafusion/sqllogictest/test_files/json.slt index dd310f7f2bf6..466bba556697 100644 --- a/datafusion/sqllogictest/test_files/json.slt +++ b/datafusion/sqllogictest/test_files/json.slt @@ -54,7 +54,7 @@ query TT EXPLAIN SELECT count(*) from json_test ---- logical_plan -01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] +01)Aggregate: groupBy=[[]], aggr=[[count(*)]] 02)--TableScan: json_test projection=[] physical_plan 01)AggregateExec: mode=Final, gby=[], aggr=[count(*)] diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 4e74b27b875f..b4487be850ac 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -307,7 +307,7 @@ query TT EXPLAIN SELECT COUNT(*) FROM (SELECT a FROM t1 LIMIT 3 OFFSET 11); ---- logical_plan -01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] +01)Aggregate: groupBy=[[]], aggr=[[count(*)]] 02)--Limit: skip=11, fetch=3 03)----TableScan: t1 projection=[], fetch=14 physical_plan @@ -325,7 +325,7 @@ query TT EXPLAIN SELECT COUNT(*) FROM (SELECT a FROM t1 LIMIT 3 OFFSET 8); ---- logical_plan -01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] +01)Aggregate: groupBy=[[]], aggr=[[count(*)]] 02)--Limit: skip=8, fetch=3 03)----TableScan: t1 projection=[], fetch=11 physical_plan @@ -343,7 +343,7 @@ query TT EXPLAIN SELECT COUNT(*) FROM (SELECT a FROM t1 OFFSET 8); ---- logical_plan -01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] +01)Aggregate: groupBy=[[]], aggr=[[count(*)]] 02)--Limit: skip=8, fetch=None 03)----TableScan: t1 projection=[] physical_plan @@ -360,7 +360,7 @@ query TT EXPLAIN SELECT COUNT(*) FROM (SELECT a FROM t1 WHERE a > 3 LIMIT 3 OFFSET 6); ---- logical_plan -01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] +01)Aggregate: groupBy=[[]], aggr=[[count(*)]] 02)--Projection: 03)----Limit: skip=6, fetch=3 04)------Filter: t1.a > Int32(3) diff --git a/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt b/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt index de6a153f58d9..8c87af75ed16 100644 --- a/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt +++ b/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt @@ -48,8 +48,8 @@ FROM test_table t GROUP BY 1, 2, 3, 4 ---- logical_plan -01)Projection: t.c1, Int64(99999), t.c5 + t.c8, Utf8("test"), count(Int64(1)) -02)--Aggregate: groupBy=[[t.c1, t.c5 + t.c8]], aggr=[[count(Int64(1))]] +01)Projection: t.c1, Int64(99999), t.c5 + t.c8, Utf8("test"), count(*) +02)--Aggregate: groupBy=[[t.c1, t.c5 + t.c8]], aggr=[[count(*)]] 03)----SubqueryAlias: t 04)------TableScan: test_table projection=[c1, c5, c8] @@ -60,8 +60,8 @@ FROM test_table t group by 1, 2, 3 ---- logical_plan -01)Projection: Int64(123), Int64(456), Int64(789), count(Int64(1)), avg(t.c12) -02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1)), avg(t.c12)]] +01)Projection: Int64(123), Int64(456), Int64(789), count(*), avg(t.c12) +02)--Aggregate: groupBy=[[]], aggr=[[count(*), avg(t.c12)]] 03)----SubqueryAlias: t 04)------TableScan: test_table projection=[c12] @@ -72,8 +72,8 @@ FROM test_table t GROUP BY 1, 2 ---- logical_plan -01)Projection: Date32("2023-05-04") AS dt, Boolean(true) AS today_filter, count(Int64(1)) -02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +01)Projection: Date32("2023-05-04") AS dt, Boolean(true) AS today_filter, count(*) +02)--Aggregate: groupBy=[[]], aggr=[[count(*)]] 03)----SubqueryAlias: t 04)------TableScan: test_table projection=[] @@ -90,8 +90,8 @@ FROM test_table t GROUP BY 1 ---- logical_plan -01)Projection: Boolean(true) AS NOT date_part(Utf8("MONTH"),now()) BETWEEN Int64(50) AND Int64(60), count(Int64(1)) -02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +01)Projection: Boolean(true) AS NOT date_part(Utf8("MONTH"),now()) BETWEEN Int64(50) AND Int64(60), count(*) +02)--Aggregate: groupBy=[[]], aggr=[[count(*)]] 03)----SubqueryAlias: t 04)------TableScan: test_table projection=[] diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index e12bdca37e6f..dcd373546d79 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -1541,7 +1541,7 @@ LIMIT 4) GROUP BY c2; ---- logical_plan -01)Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[count(Int64(1)) AS count(*)]] +01)Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[count(*)]] 02)--Projection: aggregate_test_100.c2 03)----Sort: aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST, fetch=4 04)------Projection: aggregate_test_100.c2, aggregate_test_100.c1 diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 264392fc1017..c847f433f7fc 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -555,7 +555,7 @@ logical_plan 03)----Subquery: 04)------Projection: count(*) 05)--------Filter: sum(outer_ref(t1.t1_int) + t2.t2_id) > Int64(0) -06)----------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*), sum(CAST(outer_ref(t1.t1_int) + t2.t2_id AS Int64))]] +06)----------Aggregate: groupBy=[[]], aggr=[[count(*), sum(CAST(outer_ref(t1.t1_int) + t2.t2_id AS Int64))]] 07)------------Filter: outer_ref(t1.t1_name) = t2.t2_name 08)--------------TableScan: t2 09)----TableScan: t1 projection=[t1_id, t1_name, t1_int] @@ -738,7 +738,7 @@ explain select (select count(*) from t1) as b logical_plan 01)Projection: __scalar_sq_1.count(*) AS b 02)--SubqueryAlias: __scalar_sq_1 -03)----Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] +03)----Aggregate: groupBy=[[]], aggr=[[count(*)]] 04)------TableScan: t1 projection=[] #simple_uncorrelated_scalar_subquery2 @@ -746,13 +746,13 @@ query TT explain select (select count(*) from t1) as b, (select count(1) from t2) ---- logical_plan -01)Projection: __scalar_sq_1.count(*) AS b, __scalar_sq_2.count(Int64(1)) AS count(Int64(1)) +01)Projection: __scalar_sq_1.count(*) AS b, __scalar_sq_2.count(*) AS count(*) 02)--Left Join: 03)----SubqueryAlias: __scalar_sq_1 -04)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] +04)------Aggregate: groupBy=[[]], aggr=[[count(*)]] 05)--------TableScan: t1 projection=[] 06)----SubqueryAlias: __scalar_sq_2 -07)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +07)------Aggregate: groupBy=[[]], aggr=[[count(*)]] 08)--------TableScan: t2 projection=[] statement ok @@ -762,20 +762,20 @@ query TT explain select (select count(*) from t1) as b, (select count(1) from t2) ---- logical_plan -01)Projection: __scalar_sq_1.count(*) AS b, __scalar_sq_2.count(Int64(1)) AS count(Int64(1)) +01)Projection: __scalar_sq_1.count(*) AS b, __scalar_sq_2.count(*) AS count(*) 02)--Left Join: 03)----SubqueryAlias: __scalar_sq_1 -04)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] +04)------Aggregate: groupBy=[[]], aggr=[[count(*)]] 05)--------TableScan: t1 projection=[] 06)----SubqueryAlias: __scalar_sq_2 -07)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +07)------Aggregate: groupBy=[[]], aggr=[[count(*)]] 08)--------TableScan: t2 projection=[] physical_plan -01)ProjectionExec: expr=[count(*)@0 as b, count(Int64(1))@1 as count(Int64(1))] +01)ProjectionExec: expr=[count(*)@0 as b, count(*)@1 as count(*)] 02)--NestedLoopJoinExec: join_type=Left 03)----ProjectionExec: expr=[4 as count(*)] 04)------PlaceholderRowExec -05)----ProjectionExec: expr=[4 as count(Int64(1))] +05)----ProjectionExec: expr=[4 as count(*)] 06)------PlaceholderRowExec statement ok @@ -796,7 +796,7 @@ logical_plan 03)----TableScan: t1 projection=[t1_id, t1_int] 04)----SubqueryAlias: __scalar_sq_1 05)------Projection: count(*), t2.t2_int, Boolean(true) AS __always_true -06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS count(*)]] +06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]] 07)----------TableScan: t2 projection=[t2_int] query II rowsort @@ -818,7 +818,7 @@ logical_plan 03)----TableScan: t1 projection=[t1_id, t1_int] 04)----SubqueryAlias: __scalar_sq_1 05)------Projection: count(*), t2.t2_int, Boolean(true) AS __always_true -06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS count(*)]] +06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]] 07)----------TableScan: t2 projection=[t2_int] query II rowsort @@ -839,7 +839,7 @@ logical_plan 03)----TableScan: t1 projection=[t1_id, t1_int] 04)----SubqueryAlias: __scalar_sq_1 05)------Projection: count(*) AS _cnt, t2.t2_int, Boolean(true) AS __always_true -06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS count(*)]] +06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]] 07)----------TableScan: t2 projection=[t2_int] query II rowsort @@ -860,7 +860,7 @@ logical_plan 03)----TableScan: t1 projection=[t1_id, t1_int] 04)----SubqueryAlias: __scalar_sq_1 05)------Projection: count(*) + Int64(2) AS _cnt, t2.t2_int, Boolean(true) AS __always_true -06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS count(*)]] +06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]] 07)----------TableScan: t2 projection=[t2_int] query II rowsort @@ -883,7 +883,7 @@ logical_plan 05)--------TableScan: t1 projection=[t1_id, t1_int] 06)--------SubqueryAlias: __scalar_sq_1 07)----------Projection: count(*), t2.t2_id, Boolean(true) AS __always_true -08)------------Aggregate: groupBy=[[t2.t2_id]], aggr=[[count(Int64(1)) AS count(*)]] +08)------------Aggregate: groupBy=[[t2.t2_id]], aggr=[[count(*)]] 09)--------------TableScan: t2 projection=[t2_id] query I rowsort @@ -905,7 +905,7 @@ logical_plan 04)----SubqueryAlias: __scalar_sq_1 05)------Projection: count(*) + Int64(2) AS cnt_plus_2, t2.t2_int 06)--------Filter: count(*) > Int64(1) -07)----------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS count(*)]] +07)----------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]] 08)------------TableScan: t2 projection=[t2_int] query II rowsort @@ -927,7 +927,7 @@ logical_plan 03)----TableScan: t1 projection=[t1_id, t1_int] 04)----SubqueryAlias: __scalar_sq_1 05)------Projection: count(*) + Int64(2) AS cnt_plus_2, t2.t2_int, count(*), Boolean(true) AS __always_true -06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS count(*)]] +06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]] 07)----------TableScan: t2 projection=[t2_int] query II rowsort @@ -951,7 +951,7 @@ logical_plan 06)----------TableScan: t1 projection=[t1_int] 07)--------SubqueryAlias: __scalar_sq_1 08)----------Projection: count(*), t2.t2_int, Boolean(true) AS __always_true -09)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS count(*)]] +09)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]] 10)--------------TableScan: t2 projection=[t2_int] query I rowsort @@ -972,7 +972,7 @@ logical_plan 05)--------TableScan: t1 projection=[t1_int] 06)--------SubqueryAlias: __scalar_sq_1 07)----------Projection: count(*) AS cnt, t2.t2_int, Boolean(true) AS __always_true -08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS count(*)]] +08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]] 09)--------------TableScan: t2 projection=[t2_int] @@ -1002,7 +1002,7 @@ logical_plan 05)--------TableScan: t1 projection=[t1_int] 06)--------SubqueryAlias: __scalar_sq_1 07)----------Projection: count(*) + Int64(1) + Int64(1) AS cnt_plus_two, t2.t2_int, count(*), Boolean(true) AS __always_true -08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS count(*)]] +08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]] 09)--------------TableScan: t2 projection=[t2_int] query I rowsort @@ -1031,7 +1031,7 @@ logical_plan 05)--------TableScan: t1 projection=[t1_int] 06)--------SubqueryAlias: __scalar_sq_1 07)----------Projection: CASE WHEN count(*) = Int64(1) THEN Int64(NULL) ELSE count(*) END AS cnt, t2.t2_int, Boolean(true) AS __always_true -08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS count(*)]] +08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]] 09)--------------TableScan: t2 projection=[t2_int] diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part index 2616b7b75b30..6a41ecb51bf4 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part @@ -42,7 +42,7 @@ explain select logical_plan 01)Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST 02)--Projection: lineitem.l_returnflag, lineitem.l_linestatus, sum(lineitem.l_quantity) AS sum_qty, sum(lineitem.l_extendedprice) AS sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, avg(lineitem.l_quantity) AS avg_qty, avg(lineitem.l_extendedprice) AS avg_price, avg(lineitem.l_discount) AS avg_disc, count(*) AS count_order -03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(__common_expr_1) AS sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(__common_expr_1 * (Decimal128(Some(1),20,0) + lineitem.l_tax)) AS sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1)) AS count(*)]] +03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(__common_expr_1) AS sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(__common_expr_1 * (Decimal128(Some(1),20,0) + lineitem.l_tax)) AS sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)]] 04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS __common_expr_1, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus 05)--------Filter: lineitem.l_shipdate <= Date32("1998-09-02") 06)----------TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], partial_filters=[lineitem.l_shipdate <= Date32("1998-09-02")] diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q13.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q13.slt.part index eb41445c3c13..68532733c661 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q13.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q13.slt.part @@ -42,7 +42,7 @@ limit 10; logical_plan 01)Sort: custdist DESC NULLS FIRST, c_orders.c_count DESC NULLS FIRST, fetch=10 02)--Projection: c_orders.c_count, count(*) AS custdist -03)----Aggregate: groupBy=[[c_orders.c_count]], aggr=[[count(Int64(1)) AS count(*)]] +03)----Aggregate: groupBy=[[c_orders.c_count]], aggr=[[count(*)]] 04)------SubqueryAlias: c_orders 05)--------Projection: count(orders.o_orderkey) AS c_count 06)----------Aggregate: groupBy=[[customer.c_custkey]], aggr=[[count(orders.o_orderkey)]] diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q21.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q21.slt.part index 9e39732689da..eb10f4c8d195 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q21.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q21.slt.part @@ -60,7 +60,7 @@ order by logical_plan 01)Sort: numwait DESC NULLS FIRST, supplier.s_name ASC NULLS LAST 02)--Projection: supplier.s_name, count(*) AS numwait -03)----Aggregate: groupBy=[[supplier.s_name]], aggr=[[count(Int64(1)) AS count(*)]] +03)----Aggregate: groupBy=[[supplier.s_name]], aggr=[[count(*)]] 04)------Projection: supplier.s_name 05)--------LeftAnti Join: l1.l_orderkey = __correlated_sq_2.l_orderkey Filter: __correlated_sq_2.l_suppkey != l1.l_suppkey 06)----------LeftSemi Join: l1.l_orderkey = __correlated_sq_1.l_orderkey Filter: __correlated_sq_1.l_suppkey != l1.l_suppkey diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part index 9ad99361256c..af8b7948c1cf 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part @@ -58,7 +58,7 @@ order by logical_plan 01)Sort: custsale.cntrycode ASC NULLS LAST 02)--Projection: custsale.cntrycode, count(*) AS numcust, sum(custsale.c_acctbal) AS totacctbal -03)----Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[count(Int64(1)) AS count(*), sum(custsale.c_acctbal)]] +03)----Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[count(*), sum(custsale.c_acctbal)]] 04)------SubqueryAlias: custsale 05)--------Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal 06)----------Inner Join: Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __scalar_sq_2.avg(customer.c_acctbal) diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q4.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q4.slt.part index fb93850ab095..766b21c22f24 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q4.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q4.slt.part @@ -42,7 +42,7 @@ order by logical_plan 01)Sort: orders.o_orderpriority ASC NULLS LAST 02)--Projection: orders.o_orderpriority, count(*) AS order_count -03)----Aggregate: groupBy=[[orders.o_orderpriority]], aggr=[[count(Int64(1)) AS count(*)]] +03)----Aggregate: groupBy=[[orders.o_orderpriority]], aggr=[[count(*)]] 04)------Projection: orders.o_orderpriority 05)--------LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey 06)----------Projection: orders.o_orderkey, orders.o_orderpriority diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index dfac9c031074..57207f00f7ab 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -449,7 +449,7 @@ SELECT count(*) FROM ( ---- logical_plan 01)Projection: count(*) -02)--Aggregate: groupBy=[[t1.name]], aggr=[[count(Int64(1)) AS count(*)]] +02)--Aggregate: groupBy=[[t1.name]], aggr=[[count(*)]] 03)----Union 04)------Aggregate: groupBy=[[t1.name]], aggr=[[]] 05)--------TableScan: t1 projection=[name] @@ -493,7 +493,7 @@ logical_plan 02)--Union 03)----Projection: count(*) AS cnt 04)------Limit: skip=0, fetch=3 -05)--------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] +05)--------Aggregate: groupBy=[[]], aggr=[[count(*)]] 06)----------SubqueryAlias: a 07)------------Projection: 08)--------------Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[]] @@ -651,7 +651,7 @@ select x, y from (select 1 as x , max(10) as y) b logical_plan 01)Union 02)--Projection: count(*) AS count, a.n -03)----Aggregate: groupBy=[[a.n]], aggr=[[count(Int64(1)) AS count(*)]] +03)----Aggregate: groupBy=[[a.n]], aggr=[[count(*)]] 04)------SubqueryAlias: a 05)--------Projection: Int64(5) AS n 06)----------EmptyRelation diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index ca4713e7d516..6c00af879e76 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -1305,7 +1305,7 @@ EXPLAIN SELECT ---- logical_plan 01)Projection: sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING -02)--WindowAggr: windowExpr=[[count(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] +02)--WindowAggr: windowExpr=[[count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] 03)----Projection: aggregate_test_100.c1, aggregate_test_100.c2, sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING 04)------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] 05)--------TableScan: aggregate_test_100 projection=[c1, c2, c4] @@ -1765,7 +1765,7 @@ EXPLAIN SELECT count(*) as global_count FROM ---- logical_plan 01)Projection: count(*) AS global_count -02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] +02)--Aggregate: groupBy=[[]], aggr=[[count(*)]] 03)----SubqueryAlias: a 04)------Projection: 05)--------Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[]] @@ -2571,10 +2571,10 @@ logical_plan 01)Projection: sum1, sum2, sum3, min1, min2, min3, max1, max2, max3, cnt1, cnt2, sumr1, sumr2, sumr3, minr1, minr2, minr3, maxr1, maxr2, maxr3, cntr1, cntr2, sum4, cnt3 02)--Sort: annotated_data_finite.inc_col DESC NULLS FIRST, fetch=5 03)----Projection: sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS sum1, sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING AS sum2, sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING AS sum3, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS min1, min(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING AS min2, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING AS min3, max(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS max1, max(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING AS max2, max(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING AS max3, count(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING AS cnt1, count(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS cnt2, sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING AS sumr1, sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING AS sumr2, sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sumr3, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS minr1, min(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING AS minr2, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING AS minr3, max(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS maxr1, max(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING AS maxr2, max(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING AS maxr3, count(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING AS cntr1, count(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS cntr2, sum(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS sum4, count(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS cnt3, annotated_data_finite.inc_col -04)------WindowAggr: windowExpr=[[sum(__common_expr_1 AS annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, count(Int64(1)) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS count(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]] +04)------WindowAggr: windowExpr=[[sum(__common_expr_1 AS annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, count(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]] 05)--------Projection: __common_expr_1, annotated_data_finite.inc_col, sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, min(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, max(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, max(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, max(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, count(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING, count(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, min(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, max(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, max(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, max(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, count(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING, count(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING -06)----------WindowAggr: windowExpr=[[sum(__common_expr_2 AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, sum(__common_expr_1 AS annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, sum(__common_expr_2 AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, min(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, max(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, max(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, max(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, count(Int64(1)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING AS count(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING, count(Int64(1)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS count(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]] -07)------------WindowAggr: windowExpr=[[sum(__common_expr_2 AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, sum(__common_expr_1 AS annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, sum(__common_expr_1 AS annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, min(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, max(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, max(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, max(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, count(Int64(1)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING AS count(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING, count(Int64(1)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS count(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]] +06)----------WindowAggr: windowExpr=[[sum(__common_expr_2 AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, sum(__common_expr_1 AS annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, sum(__common_expr_2 AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, min(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, max(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, max(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, max(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, count(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING, count(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]] +07)------------WindowAggr: windowExpr=[[sum(__common_expr_2 AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, sum(__common_expr_1 AS annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, sum(__common_expr_1 AS annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, min(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, max(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, max(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, max(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, count(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING, count(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]] 08)--------------Projection: CAST(annotated_data_finite.desc_col AS Int64) AS __common_expr_1, CAST(annotated_data_finite.inc_col AS Int64) AS __common_expr_2, annotated_data_finite.ts, annotated_data_finite.inc_col, annotated_data_finite.desc_col 09)----------------TableScan: annotated_data_finite projection=[ts, inc_col, desc_col] physical_plan @@ -4112,7 +4112,7 @@ EXPLAIN select count(*) over (partition by a order by a) from (select * from a w ---- logical_plan 01)Projection: count(*) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW -02)--WindowAggr: windowExpr=[[count(Int64(1)) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS count(*) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +02)--WindowAggr: windowExpr=[[count(*) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] 03)----Filter: a.a = Int64(1) 04)------TableScan: a projection=[a] physical_plan diff --git a/datafusion/substrait/tests/cases/consumer_integration.rs b/datafusion/substrait/tests/cases/consumer_integration.rs index 219f656bb471..086c0858115b 100644 --- a/datafusion/substrait/tests/cases/consumer_integration.rs +++ b/datafusion/substrait/tests/cases/consumer_integration.rs @@ -50,9 +50,9 @@ mod tests { let plan_str = tpch_plan_to_string(1).await?; assert_eq!( plan_str, - "Projection: LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS, sum(LINEITEM.L_QUANTITY) AS SUM_QTY, sum(LINEITEM.L_EXTENDEDPRICE) AS SUM_BASE_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT) AS SUM_DISC_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX) AS SUM_CHARGE, avg(LINEITEM.L_QUANTITY) AS AVG_QTY, avg(LINEITEM.L_EXTENDEDPRICE) AS AVG_PRICE, avg(LINEITEM.L_DISCOUNT) AS AVG_DISC, count(Int64(1)) AS COUNT_ORDER\ + "Projection: LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS, sum(LINEITEM.L_QUANTITY) AS SUM_QTY, sum(LINEITEM.L_EXTENDEDPRICE) AS SUM_BASE_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT) AS SUM_DISC_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX) AS SUM_CHARGE, avg(LINEITEM.L_QUANTITY) AS AVG_QTY, avg(LINEITEM.L_EXTENDEDPRICE) AS AVG_PRICE, avg(LINEITEM.L_DISCOUNT) AS AVG_DISC, count(*) AS COUNT_ORDER\ \n Sort: LINEITEM.L_RETURNFLAG ASC NULLS LAST, LINEITEM.L_LINESTATUS ASC NULLS LAST\ - \n Aggregate: groupBy=[[LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS]], aggr=[[sum(LINEITEM.L_QUANTITY), sum(LINEITEM.L_EXTENDEDPRICE), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX), avg(LINEITEM.L_QUANTITY), avg(LINEITEM.L_EXTENDEDPRICE), avg(LINEITEM.L_DISCOUNT), count(Int64(1))]]\ + \n Aggregate: groupBy=[[LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS]], aggr=[[sum(LINEITEM.L_QUANTITY), sum(LINEITEM.L_EXTENDEDPRICE), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX), avg(LINEITEM.L_QUANTITY), avg(LINEITEM.L_EXTENDEDPRICE), avg(LINEITEM.L_DISCOUNT), count(*)]]\ \n Projection: LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS, LINEITEM.L_QUANTITY, LINEITEM.L_EXTENDEDPRICE, LINEITEM.L_EXTENDEDPRICE * (CAST(Int32(1) AS Decimal128(15, 2)) - LINEITEM.L_DISCOUNT), LINEITEM.L_EXTENDEDPRICE * (CAST(Int32(1) AS Decimal128(15, 2)) - LINEITEM.L_DISCOUNT) * (CAST(Int32(1) AS Decimal128(15, 2)) + LINEITEM.L_TAX), LINEITEM.L_DISCOUNT\ \n Filter: LINEITEM.L_SHIPDATE <= Date32(\"1998-12-01\") - IntervalDayTime(\"IntervalDayTime { days: 0, milliseconds: 10368000 }\")\ \n TableScan: LINEITEM" @@ -119,9 +119,9 @@ mod tests { let plan_str = tpch_plan_to_string(4).await?; assert_eq!( plan_str, - "Projection: ORDERS.O_ORDERPRIORITY, count(Int64(1)) AS ORDER_COUNT\ + "Projection: ORDERS.O_ORDERPRIORITY, count(*) AS ORDER_COUNT\ \n Sort: ORDERS.O_ORDERPRIORITY ASC NULLS LAST\ - \n Aggregate: groupBy=[[ORDERS.O_ORDERPRIORITY]], aggr=[[count(Int64(1))]]\ + \n Aggregate: groupBy=[[ORDERS.O_ORDERPRIORITY]], aggr=[[count(*)]]\ \n Projection: ORDERS.O_ORDERPRIORITY\ \n Filter: ORDERS.O_ORDERDATE >= CAST(Utf8(\"1993-07-01\") AS Date32) AND ORDERS.O_ORDERDATE < CAST(Utf8(\"1993-10-01\") AS Date32) AND EXISTS ()\ \n Subquery:\ @@ -269,10 +269,10 @@ mod tests { let plan_str = tpch_plan_to_string(13).await?; assert_eq!( plan_str, - "Projection: count(ORDERS.O_ORDERKEY) AS C_COUNT, count(Int64(1)) AS CUSTDIST\ - \n Sort: count(Int64(1)) DESC NULLS FIRST, count(ORDERS.O_ORDERKEY) DESC NULLS FIRST\ - \n Projection: count(ORDERS.O_ORDERKEY), count(Int64(1))\ - \n Aggregate: groupBy=[[count(ORDERS.O_ORDERKEY)]], aggr=[[count(Int64(1))]]\ + "Projection: count(ORDERS.O_ORDERKEY) AS C_COUNT, count(*) AS CUSTDIST\ + \n Sort: count(*) DESC NULLS FIRST, count(ORDERS.O_ORDERKEY) DESC NULLS FIRST\ + \n Projection: count(ORDERS.O_ORDERKEY), count(*)\ + \n Aggregate: groupBy=[[count(ORDERS.O_ORDERKEY)]], aggr=[[count(*)]]\ \n Projection: count(ORDERS.O_ORDERKEY)\ \n Aggregate: groupBy=[[CUSTOMER.C_CUSTKEY]], aggr=[[count(ORDERS.O_ORDERKEY)]]\ \n Projection: CUSTOMER.C_CUSTKEY, ORDERS.O_ORDERKEY\ @@ -410,10 +410,10 @@ mod tests { let plan_str = tpch_plan_to_string(21).await?; assert_eq!( plan_str, - "Projection: SUPPLIER.S_NAME, count(Int64(1)) AS NUMWAIT\ + "Projection: SUPPLIER.S_NAME, count(*) AS NUMWAIT\ \n Limit: skip=0, fetch=100\ - \n Sort: count(Int64(1)) DESC NULLS FIRST, SUPPLIER.S_NAME ASC NULLS LAST\ - \n Aggregate: groupBy=[[SUPPLIER.S_NAME]], aggr=[[count(Int64(1))]]\ + \n Sort: count(*) DESC NULLS FIRST, SUPPLIER.S_NAME ASC NULLS LAST\ + \n Aggregate: groupBy=[[SUPPLIER.S_NAME]], aggr=[[count(*)]]\ \n Projection: SUPPLIER.S_NAME\ \n Filter: SUPPLIER.S_SUPPKEY = LINEITEM.L_SUPPKEY AND ORDERS.O_ORDERKEY = LINEITEM.L_ORDERKEY AND ORDERS.O_ORDERSTATUS = Utf8(\"F\") AND LINEITEM.L_RECEIPTDATE > LINEITEM.L_COMMITDATE AND EXISTS () AND NOT EXISTS () AND SUPPLIER.S_NATIONKEY = NATION.N_NATIONKEY AND NATION.N_NAME = Utf8(\"SAUDI ARABIA\")\ \n Subquery:\ @@ -438,9 +438,9 @@ mod tests { let plan_str = tpch_plan_to_string(22).await?; assert_eq!( plan_str, - "Projection: substr(CUSTOMER.C_PHONE,Int32(1),Int32(2)) AS CNTRYCODE, count(Int64(1)) AS NUMCUST, sum(CUSTOMER.C_ACCTBAL) AS TOTACCTBAL\ + "Projection: substr(CUSTOMER.C_PHONE,Int32(1),Int32(2)) AS CNTRYCODE, count(*) AS NUMCUST, sum(CUSTOMER.C_ACCTBAL) AS TOTACCTBAL\ \n Sort: substr(CUSTOMER.C_PHONE,Int32(1),Int32(2)) ASC NULLS LAST\ - \n Aggregate: groupBy=[[substr(CUSTOMER.C_PHONE,Int32(1),Int32(2))]], aggr=[[count(Int64(1)), sum(CUSTOMER.C_ACCTBAL)]]\ + \n Aggregate: groupBy=[[substr(CUSTOMER.C_PHONE,Int32(1),Int32(2))]], aggr=[[count(*), sum(CUSTOMER.C_ACCTBAL)]]\ \n Projection: substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)), CUSTOMER.C_ACCTBAL\ \n Filter: (substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) = CAST(Utf8(\"13\") AS Utf8) OR substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) = CAST(Utf8(\"31\") AS Utf8) OR substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) = CAST(Utf8(\"23\") AS Utf8) OR substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) = CAST(Utf8(\"29\") AS Utf8) OR substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) = CAST(Utf8(\"30\") AS Utf8) OR substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) = CAST(Utf8(\"18\") AS Utf8) OR substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) = CAST(Utf8(\"17\") AS Utf8)) AND CUSTOMER.C_ACCTBAL > () AND NOT EXISTS ()\ \n Subquery:\ diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 5fb357dfcd23..68856117a38c 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -687,7 +687,7 @@ async fn simple_intersect() -> Result<()> { // Substrait treats both count(*) and count(1) the same assert_expected_plan( "SELECT count(*) FROM (SELECT data.a FROM data INTERSECT SELECT data2.a FROM data2);", - "Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]\ + "Aggregate: groupBy=[[]], aggr=[[count(*)]]\ \n Projection: \ \n LeftSemi Join: data.a = data2.a\ \n Aggregate: groupBy=[[data.a]], aggr=[[]]\ @@ -822,7 +822,7 @@ async fn simple_intersect_table_reuse() -> Result<()> { // Schema check works because we set aliases to what the Substrait consumer will generate. assert_expected_plan( "SELECT count(1) FROM (SELECT left.a FROM data AS left INTERSECT SELECT right.a FROM data AS right);", - "Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]\ + "Aggregate: groupBy=[[]], aggr=[[count(*)]]\ \n Projection: \ \n LeftSemi Join: left.a = right.a\ \n SubqueryAlias: left\