From 02b969381db1f3765676029aa47ddf5e54ccdf4f Mon Sep 17 00:00:00 2001 From: Jagdish Parihar Date: Fri, 25 Oct 2024 18:42:58 +0530 Subject: [PATCH] Convert `ntile` builtIn function to UDWF (#13040) * converting to ntile udwf * updated the window functions documentation file * wip: update the ntile udwf function * fix the roundtrip_logical_plan.rs * removed builtIn ntile function * fixed field name issue * fixing the return type of ntile udwf * error if UInt64 conversion fails * handling if null is found * handling if value is zero or less than zero * removed unused import * updated prost.rs file * removed dead code * fixed clippy error * added inner doc comment * minor fixes and added roundtrip logical plan test * removed parse_expr in ntile --- .../expr/src/built_in_window_function.rs | 19 -- datafusion/expr/src/expr.rs | 11 +- datafusion/expr/src/window_function.rs | 5 - datafusion/functions-window/src/lib.rs | 4 +- datafusion/functions-window/src/ntile.rs | 168 ++++++++++++++++++ datafusion/functions-window/src/utils.rs | 12 ++ .../physical-expr/src/expressions/mod.rs | 1 - datafusion/physical-expr/src/window/mod.rs | 1 - datafusion/physical-expr/src/window/ntile.rs | 111 ------------ datafusion/physical-plan/src/windows/mod.rs | 59 +----- datafusion/proto/proto/datafusion.proto | 2 +- datafusion/proto/src/generated/pbjson.rs | 3 - datafusion/proto/src/generated/prost.rs | 4 +- .../proto/src/logical_plan/from_proto.rs | 1 - datafusion/proto/src/logical_plan/to_proto.rs | 1 - .../proto/src/physical_plan/to_proto.rs | 45 ++--- .../tests/cases/roundtrip_logical_plan.rs | 3 +- .../source/user-guide/sql/window_functions.md | 34 ---- .../user-guide/sql/window_functions_new.md | 13 ++ 19 files changed, 221 insertions(+), 276 deletions(-) create mode 100644 datafusion/functions-window/src/ntile.rs delete mode 100644 datafusion/physical-expr/src/window/ntile.rs diff --git a/datafusion/expr/src/built_in_window_function.rs b/datafusion/expr/src/built_in_window_function.rs index 36916a6b594f..ab41395ad371 100644 --- a/datafusion/expr/src/built_in_window_function.rs +++ b/datafusion/expr/src/built_in_window_function.rs @@ -40,8 +40,6 @@ impl fmt::Display for BuiltInWindowFunction { /// [Window Function]: https://en.wikipedia.org/wiki/Window_function_(SQL) #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, EnumIter)] pub enum BuiltInWindowFunction { - /// Integer ranging from 1 to the argument value, dividing the partition as equally as possible - Ntile, /// returns value evaluated at the row that is the first row of the window frame FirstValue, /// Returns value evaluated at the row that is the last row of the window frame @@ -54,7 +52,6 @@ impl BuiltInWindowFunction { pub fn name(&self) -> &str { use BuiltInWindowFunction::*; match self { - Ntile => "NTILE", FirstValue => "first_value", LastValue => "last_value", NthValue => "NTH_VALUE", @@ -66,7 +63,6 @@ impl FromStr for BuiltInWindowFunction { type Err = DataFusionError; fn from_str(name: &str) -> Result { Ok(match name.to_uppercase().as_str() { - "NTILE" => BuiltInWindowFunction::Ntile, "FIRST_VALUE" => BuiltInWindowFunction::FirstValue, "LAST_VALUE" => BuiltInWindowFunction::LastValue, "NTH_VALUE" => BuiltInWindowFunction::NthValue, @@ -97,7 +93,6 @@ impl BuiltInWindowFunction { })?; match self { - BuiltInWindowFunction::Ntile => Ok(DataType::UInt64), BuiltInWindowFunction::FirstValue | BuiltInWindowFunction::LastValue | BuiltInWindowFunction::NthValue => Ok(input_expr_types[0].clone()), @@ -111,20 +106,6 @@ impl BuiltInWindowFunction { BuiltInWindowFunction::FirstValue | BuiltInWindowFunction::LastValue => { Signature::any(1, Volatility::Immutable) } - BuiltInWindowFunction::Ntile => Signature::uniform( - 1, - vec![ - DataType::UInt64, - DataType::UInt32, - DataType::UInt16, - DataType::UInt8, - DataType::Int64, - DataType::Int32, - DataType::Int16, - DataType::Int8, - ], - Volatility::Immutable, - ), BuiltInWindowFunction::NthValue => Signature::any(2, Volatility::Immutable), } } diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 7fadf6391bf3..4d73c2a04486 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -2567,18 +2567,9 @@ mod test { Ok(()) } - #[test] - fn test_ntile_return_type() -> Result<()> { - let fun = find_df_window_func("ntile").unwrap(); - let observed = fun.return_type(&[DataType::Int16], &[true], "")?; - assert_eq!(DataType::UInt64, observed); - - Ok(()) - } - #[test] fn test_window_function_case_insensitive() -> Result<()> { - let names = vec!["ntile", "first_value", "last_value", "nth_value"]; + let names = vec!["first_value", "last_value", "nth_value"]; for name in names { let fun = find_df_window_func(name).unwrap(); let fun2 = find_df_window_func(name.to_uppercase().as_str()).unwrap(); diff --git a/datafusion/expr/src/window_function.rs b/datafusion/expr/src/window_function.rs index c13a028e4a30..be2b6575e2e9 100644 --- a/datafusion/expr/src/window_function.rs +++ b/datafusion/expr/src/window_function.rs @@ -17,11 +17,6 @@ use crate::{expr::WindowFunction, BuiltInWindowFunction, Expr, Literal}; -/// Create an expression to represent the `ntile` window function -pub fn ntile(arg: Expr) -> Expr { - Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::Ntile, vec![arg])) -} - /// Create an expression to represent the `nth_value` window function pub fn nth_value(arg: Expr, n: i64) -> Expr { Expr::WindowFunction(WindowFunction::new( diff --git a/datafusion/functions-window/src/lib.rs b/datafusion/functions-window/src/lib.rs index 13a77977d579..ff8542838df9 100644 --- a/datafusion/functions-window/src/lib.rs +++ b/datafusion/functions-window/src/lib.rs @@ -34,7 +34,7 @@ pub mod macros; pub mod cume_dist; pub mod lead_lag; - +pub mod ntile; pub mod rank; pub mod row_number; mod utils; @@ -44,6 +44,7 @@ pub mod expr_fn { pub use super::cume_dist::cume_dist; pub use super::lead_lag::lag; pub use super::lead_lag::lead; + pub use super::ntile::ntile; pub use super::rank::{dense_rank, percent_rank, rank}; pub use super::row_number::row_number; } @@ -58,6 +59,7 @@ pub fn all_default_window_functions() -> Vec> { rank::rank_udwf(), rank::dense_rank_udwf(), rank::percent_rank_udwf(), + ntile::ntile_udwf(), ] } /// Registers all enabled packages with a [`FunctionRegistry`] diff --git a/datafusion/functions-window/src/ntile.rs b/datafusion/functions-window/src/ntile.rs new file mode 100644 index 000000000000..b0a7241f24cd --- /dev/null +++ b/datafusion/functions-window/src/ntile.rs @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! `ntile` window function implementation + +use std::any::Any; +use std::fmt::Debug; +use std::sync::{Arc, OnceLock}; + +use crate::utils::{ + get_scalar_value_from_args, get_signed_integer, get_unsigned_integer, +}; +use datafusion_common::arrow::array::{ArrayRef, UInt64Array}; +use datafusion_common::arrow::datatypes::{DataType, Field}; +use datafusion_common::{exec_err, DataFusionError, Result}; +use datafusion_expr::window_doc_sections::DOC_SECTION_RANKING; +use datafusion_expr::{ + Documentation, Expr, PartitionEvaluator, Signature, Volatility, WindowUDFImpl, +}; +use datafusion_functions_window_common::field; +use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; +use field::WindowUDFFieldArgs; + +get_or_init_udwf!( + Ntile, + ntile, + "integer ranging from 1 to the argument value, dividing the partition as equally as possible" +); + +pub fn ntile(arg: Expr) -> Expr { + ntile_udwf().call(vec![arg]) +} + +#[derive(Debug)] +pub struct Ntile { + signature: Signature, +} + +impl Ntile { + /// Create a new `ntile` function + pub fn new() -> Self { + Self { + signature: Signature::uniform( + 1, + vec![ + DataType::UInt64, + DataType::UInt32, + DataType::UInt16, + DataType::UInt8, + DataType::Int64, + DataType::Int32, + DataType::Int16, + DataType::Int8, + ], + Volatility::Immutable, + ), + } + } +} + +impl Default for Ntile { + fn default() -> Self { + Self::new() + } +} + +static DOCUMENTATION: OnceLock = OnceLock::new(); + +fn get_ntile_doc() -> &'static Documentation { + DOCUMENTATION.get_or_init(|| { + Documentation::builder() + .with_doc_section(DOC_SECTION_RANKING) + .with_description( + "Integer ranging from 1 to the argument value, dividing the partition as equally as possible", + ) + .with_syntax_example("ntile(expression)") + .with_argument("expression","An integer describing the number groups the partition should be split into") + .build() + .unwrap() + }) +} + +impl WindowUDFImpl for Ntile { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "ntile" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn partition_evaluator( + &self, + partition_evaluator_args: PartitionEvaluatorArgs, + ) -> Result> { + let scalar_n = + get_scalar_value_from_args(partition_evaluator_args.input_exprs(), 0)? + .ok_or_else(|| { + DataFusionError::Execution( + "NTILE requires a positive integer".to_string(), + ) + })?; + + if scalar_n.is_null() { + return exec_err!("NTILE requires a positive integer, but finds NULL"); + } + + if scalar_n.is_unsigned() { + let n = get_unsigned_integer(scalar_n)?; + Ok(Box::new(NtileEvaluator { n })) + } else { + let n: i64 = get_signed_integer(scalar_n)?; + if n <= 0 { + return exec_err!("NTILE requires a positive integer"); + } + Ok(Box::new(NtileEvaluator { n: n as u64 })) + } + } + fn field(&self, field_args: WindowUDFFieldArgs) -> Result { + let nullable = false; + + Ok(Field::new(field_args.name(), DataType::UInt64, nullable)) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(get_ntile_doc()) + } +} + +#[derive(Debug)] +struct NtileEvaluator { + n: u64, +} + +impl PartitionEvaluator for NtileEvaluator { + fn evaluate_all( + &mut self, + _values: &[ArrayRef], + num_rows: usize, + ) -> Result { + let num_rows = num_rows as u64; + let mut vec: Vec = Vec::new(); + let n = u64::min(self.n, num_rows); + for i in 0..num_rows { + let res = i * n / num_rows; + vec.push(res + 1) + } + Ok(Arc::new(UInt64Array::from(vec))) + } +} diff --git a/datafusion/functions-window/src/utils.rs b/datafusion/functions-window/src/utils.rs index 69f68aa78f2c..3f8061dbea3e 100644 --- a/datafusion/functions-window/src/utils.rs +++ b/datafusion/functions-window/src/utils.rs @@ -51,3 +51,15 @@ pub(crate) fn get_scalar_value_from_args( None }) } + +pub(crate) fn get_unsigned_integer(value: ScalarValue) -> Result { + if value.is_null() { + return Ok(0); + } + + if !value.data_type().is_integer() { + return exec_err!("Expected an integer value"); + } + + value.cast_to(&DataType::UInt64)?.try_into() +} diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 63047f6929c1..7d71bd9ff17b 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -36,7 +36,6 @@ mod unknown_column; /// Module with some convenient methods used in expression building pub use crate::aggregate::stats::StatsType; pub use crate::window::nth_value::NthValue; -pub use crate::window::ntile::Ntile; pub use crate::PhysicalSortExpr; pub use binary::{binary, similar_to, BinaryExpr}; diff --git a/datafusion/physical-expr/src/window/mod.rs b/datafusion/physical-expr/src/window/mod.rs index 7bab4dbc5af6..3c37fff7a1ba 100644 --- a/datafusion/physical-expr/src/window/mod.rs +++ b/datafusion/physical-expr/src/window/mod.rs @@ -19,7 +19,6 @@ mod aggregate; mod built_in; mod built_in_window_function_expr; pub(crate) mod nth_value; -pub(crate) mod ntile; mod sliding_aggregate; mod window_expr; diff --git a/datafusion/physical-expr/src/window/ntile.rs b/datafusion/physical-expr/src/window/ntile.rs deleted file mode 100644 index fb7a7ad84fb7..000000000000 --- a/datafusion/physical-expr/src/window/ntile.rs +++ /dev/null @@ -1,111 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Defines physical expression for `ntile` that can evaluated -//! at runtime during query execution - -use crate::expressions::Column; -use crate::window::BuiltInWindowFunctionExpr; -use crate::{PhysicalExpr, PhysicalSortExpr}; - -use arrow::array::{ArrayRef, UInt64Array}; -use arrow::datatypes::Field; -use arrow_schema::{DataType, SchemaRef, SortOptions}; -use datafusion_common::Result; -use datafusion_expr::PartitionEvaluator; - -use std::any::Any; -use std::sync::Arc; - -#[derive(Debug)] -pub struct Ntile { - name: String, - n: u64, - /// Output data type - data_type: DataType, -} - -impl Ntile { - pub fn new(name: String, n: u64, data_type: &DataType) -> Self { - Self { - name, - n, - data_type: data_type.clone(), - } - } - - pub fn get_n(&self) -> u64 { - self.n - } -} - -impl BuiltInWindowFunctionExpr for Ntile { - fn as_any(&self) -> &dyn Any { - self - } - - fn field(&self) -> Result { - let nullable = false; - Ok(Field::new(self.name(), self.data_type.clone(), nullable)) - } - - fn expressions(&self) -> Vec> { - vec![] - } - - fn name(&self) -> &str { - &self.name - } - - fn create_evaluator(&self) -> Result> { - Ok(Box::new(NtileEvaluator { n: self.n })) - } - - fn get_result_ordering(&self, schema: &SchemaRef) -> Option { - // The built-in NTILE window function introduces a new ordering: - schema.column_with_name(self.name()).map(|(idx, field)| { - let expr = Arc::new(Column::new(field.name(), idx)); - let options = SortOptions { - descending: false, - nulls_first: false, - }; // ASC, NULLS LAST - PhysicalSortExpr { expr, options } - }) - } -} - -#[derive(Debug)] -pub(crate) struct NtileEvaluator { - n: u64, -} - -impl PartitionEvaluator for NtileEvaluator { - fn evaluate_all( - &mut self, - _values: &[ArrayRef], - num_rows: usize, - ) -> Result { - let num_rows = num_rows as u64; - let mut vec: Vec = Vec::new(); - let n = u64::min(self.n, num_rows); - for i in 0..num_rows { - let res = i * n / num_rows; - vec.push(res + 1) - } - Ok(Arc::new(UInt64Array::from(vec))) - } -} diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 39ff71496e21..7ebb7e71ec57 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -21,15 +21,13 @@ use std::borrow::Borrow; use std::sync::Arc; use crate::{ - expressions::{Literal, NthValue, Ntile, PhysicalSortExpr}, + expressions::{Literal, NthValue, PhysicalSortExpr}, ExecutionPlan, ExecutionPlanProperties, InputOrderMode, PhysicalExpr, }; use arrow::datatypes::Schema; use arrow_schema::{DataType, Field, SchemaRef}; -use datafusion_common::{ - exec_datafusion_err, exec_err, DataFusionError, Result, ScalarValue, -}; +use datafusion_common::{exec_datafusion_err, exec_err, Result, ScalarValue}; use datafusion_expr::{ BuiltInWindowFunction, PartitionEvaluator, ReversedUDWF, WindowFrame, WindowFunctionDefinition, WindowUDF, @@ -165,25 +163,6 @@ fn window_expr_from_aggregate_expr( } } -fn get_scalar_value_from_args( - args: &[Arc], - index: usize, -) -> Result> { - Ok(if let Some(field) = args.get(index) { - let tmp = field - .as_any() - .downcast_ref::() - .ok_or_else(|| DataFusionError::NotImplemented( - format!("There is only support Literal types for field at idx: {index} in Window Function"), - ))? - .value() - .clone(); - Some(tmp) - } else { - None - }) -} - fn get_signed_integer(value: ScalarValue) -> Result { if value.is_null() { return Ok(0); @@ -196,18 +175,6 @@ fn get_signed_integer(value: ScalarValue) -> Result { value.cast_to(&DataType::Int64)?.try_into() } -fn get_unsigned_integer(value: ScalarValue) -> Result { - if value.is_null() { - return Ok(0); - } - - if !value.data_type().is_integer() { - return exec_err!("Expected an integer value"); - } - - value.cast_to(&DataType::UInt64)?.try_into() -} - fn create_built_in_window_expr( fun: &BuiltInWindowFunction, args: &[Arc], @@ -219,28 +186,6 @@ fn create_built_in_window_expr( let out_data_type: &DataType = input_schema.field_with_name(&name)?.data_type(); Ok(match fun { - BuiltInWindowFunction::Ntile => { - let n = get_scalar_value_from_args(args, 0)?.ok_or_else(|| { - DataFusionError::Execution( - "NTILE requires a positive integer".to_string(), - ) - })?; - - if n.is_null() { - return exec_err!("NTILE requires a positive integer, but finds NULL"); - } - - if n.is_unsigned() { - let n = get_unsigned_integer(n)?; - Arc::new(Ntile::new(name, n, out_data_type)) - } else { - let n: i64 = get_signed_integer(n)?; - if n <= 0 { - return exec_err!("NTILE requires a positive integer"); - } - Arc::new(Ntile::new(name, n as u64, out_data_type)) - } - } BuiltInWindowFunction::NthValue => { let arg = Arc::clone(&args[0]); let n = get_signed_integer( diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index c92328278e83..b68c47c57eb9 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -512,7 +512,7 @@ enum BuiltInWindowFunction { // DENSE_RANK = 2; // PERCENT_RANK = 3; // CUME_DIST = 4; - NTILE = 5; + // NTILE = 5; // LAG = 6; // LEAD = 7; FIRST_VALUE = 8; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index ca331cdaa513..e54edb718808 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -1662,7 +1662,6 @@ impl serde::Serialize for BuiltInWindowFunction { { let variant = match self { Self::Unspecified => "UNSPECIFIED", - Self::Ntile => "NTILE", Self::FirstValue => "FIRST_VALUE", Self::LastValue => "LAST_VALUE", Self::NthValue => "NTH_VALUE", @@ -1678,7 +1677,6 @@ impl<'de> serde::Deserialize<'de> for BuiltInWindowFunction { { const FIELDS: &[&str] = &[ "UNSPECIFIED", - "NTILE", "FIRST_VALUE", "LAST_VALUE", "NTH_VALUE", @@ -1723,7 +1721,6 @@ impl<'de> serde::Deserialize<'de> for BuiltInWindowFunction { { match value { "UNSPECIFIED" => Ok(BuiltInWindowFunction::Unspecified), - "NTILE" => Ok(BuiltInWindowFunction::Ntile), "FIRST_VALUE" => Ok(BuiltInWindowFunction::FirstValue), "LAST_VALUE" => Ok(BuiltInWindowFunction::LastValue), "NTH_VALUE" => Ok(BuiltInWindowFunction::NthValue), diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index fb0b3bcb2c13..dfc30e809108 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1819,7 +1819,7 @@ pub enum BuiltInWindowFunction { /// DENSE_RANK = 2; /// PERCENT_RANK = 3; /// CUME_DIST = 4; - Ntile = 5, + /// NTILE = 5; /// LAG = 6; /// LEAD = 7; FirstValue = 8, @@ -1834,7 +1834,6 @@ impl BuiltInWindowFunction { pub fn as_str_name(&self) -> &'static str { match self { Self::Unspecified => "UNSPECIFIED", - Self::Ntile => "NTILE", Self::FirstValue => "FIRST_VALUE", Self::LastValue => "LAST_VALUE", Self::NthValue => "NTH_VALUE", @@ -1844,7 +1843,6 @@ impl BuiltInWindowFunction { pub fn from_str_name(value: &str) -> ::core::option::Option { match value { "UNSPECIFIED" => Some(Self::Unspecified), - "NTILE" => Some(Self::Ntile), "FIRST_VALUE" => Some(Self::FirstValue), "LAST_VALUE" => Some(Self::LastValue), "NTH_VALUE" => Some(Self::NthValue), diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 4587c090c96a..27bda7dd5ace 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -152,7 +152,6 @@ impl From for BuiltInWindowFunction { match built_in_function { protobuf::BuiltInWindowFunction::Unspecified => todo!(), protobuf::BuiltInWindowFunction::FirstValue => Self::FirstValue, - protobuf::BuiltInWindowFunction::Ntile => Self::Ntile, protobuf::BuiltInWindowFunction::NthValue => Self::NthValue, protobuf::BuiltInWindowFunction::LastValue => Self::LastValue, } diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index dce0cd741fd3..5a6f3a32c668 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -127,7 +127,6 @@ impl From<&BuiltInWindowFunction> for protobuf::BuiltInWindowFunction { BuiltInWindowFunction::FirstValue => Self::FirstValue, BuiltInWindowFunction::LastValue => Self::LastValue, BuiltInWindowFunction::NthValue => Self::NthValue, - BuiltInWindowFunction::Ntile => Self::Ntile, } } } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 37ea6a2b47be..89a2403922e9 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -24,7 +24,7 @@ use datafusion::physical_expr::window::{NthValueKind, SlidingAggregateWindowExpr use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, - Literal, NegativeExpr, NotExpr, NthValue, Ntile, TryCastExpr, + Literal, NegativeExpr, NotExpr, NthValue, TryCastExpr, }; use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::windows::{BuiltInWindowExpr, PlainAggregateWindowExpr}; @@ -108,33 +108,24 @@ pub fn serialize_physical_window_expr( let expr = built_in_window_expr.get_built_in_func_expr(); let built_in_fn_expr = expr.as_any(); - let builtin_fn = if let Some(ntile_expr) = - built_in_fn_expr.downcast_ref::() - { - args.insert( - 0, - Arc::new(Literal::new(datafusion_common::ScalarValue::Int64(Some( - ntile_expr.get_n() as i64, - )))), - ); - protobuf::BuiltInWindowFunction::Ntile - } else if let Some(nth_value_expr) = built_in_fn_expr.downcast_ref::() { - match nth_value_expr.get_kind() { - NthValueKind::First => protobuf::BuiltInWindowFunction::FirstValue, - NthValueKind::Last => protobuf::BuiltInWindowFunction::LastValue, - NthValueKind::Nth(n) => { - args.insert( - 1, - Arc::new(Literal::new(datafusion_common::ScalarValue::Int64( - Some(n), - ))), - ); - protobuf::BuiltInWindowFunction::NthValue + let builtin_fn = + if let Some(nth_value_expr) = built_in_fn_expr.downcast_ref::() { + match nth_value_expr.get_kind() { + NthValueKind::First => protobuf::BuiltInWindowFunction::FirstValue, + NthValueKind::Last => protobuf::BuiltInWindowFunction::LastValue, + NthValueKind::Nth(n) => { + args.insert( + 1, + Arc::new(Literal::new( + datafusion_common::ScalarValue::Int64(Some(n)), + )), + ); + protobuf::BuiltInWindowFunction::NthValue + } } - } - } else { - return not_impl_err!("BuiltIn function not supported: {expr:?}"); - }; + } else { + return not_impl_err!("BuiltIn function not supported: {expr:?}"); + }; ( physical_window_expr_node::WindowFunction::BuiltInFunction(builtin_fn as i32), diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index a8c82ff80f23..3fec7d1c6ea0 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -48,7 +48,7 @@ use datafusion::functions_aggregate::expr_fn::{ use datafusion::functions_aggregate::min_max::max_udaf; use datafusion::functions_nested::map::map; use datafusion::functions_window::expr_fn::{ - cume_dist, dense_rank, lag, lead, percent_rank, rank, row_number, + cume_dist, dense_rank, lag, lead, ntile, percent_rank, rank, row_number, }; use datafusion::functions_window::rank::rank_udwf; use datafusion::prelude::*; @@ -951,6 +951,7 @@ async fn roundtrip_expr_api() -> Result<()> { lag(col("b"), None, None), lag(col("b"), Some(2), None), lag(col("b"), Some(2), Some(ScalarValue::from(100))), + ntile(lit(3)), nth_value(col("b"), 1, vec![]), nth_value( col("b"), diff --git a/docs/source/user-guide/sql/window_functions.md b/docs/source/user-guide/sql/window_functions.md index 0799859e4371..6bf2005dabf9 100644 --- a/docs/source/user-guide/sql/window_functions.md +++ b/docs/source/user-guide/sql/window_functions.md @@ -146,40 +146,6 @@ RANGE and GROUPS modes require an ORDER BY clause (with RANGE the ORDER BY must All [aggregate functions](aggregate_functions.md) can be used as window functions. -## Ranking functions - -- [rank](#rank) -- [dense_rank](#dense_rank) -- [ntile](#ntile) - -### `rank` - -Rank of the current row with gaps; same as row_number of its first peer. - -```sql -rank() -``` - -### `dense_rank` - -Rank of the current row without gaps; this function counts peer groups. - -```sql -dense_rank() -``` - -### `ntile` - -Integer ranging from 1 to the argument value, dividing the partition as equally as possible. - -```sql -ntile(expression) -``` - -#### Arguments - -- **expression**: An integer describing the number groups the partition should be split into - ## Analytical functions - [cume_dist](#cume_dist) diff --git a/docs/source/user-guide/sql/window_functions_new.md b/docs/source/user-guide/sql/window_functions_new.md index 267060abfdcc..ae3edb832fcb 100644 --- a/docs/source/user-guide/sql/window_functions_new.md +++ b/docs/source/user-guide/sql/window_functions_new.md @@ -159,6 +159,7 @@ All [aggregate functions](aggregate_functions.md) can be used as window function - [cume_dist](#cume_dist) - [dense_rank](#dense_rank) +- [ntile](#ntile) - [percent_rank](#percent_rank) - [rank](#rank) - [row_number](#row_number) @@ -179,6 +180,18 @@ Returns the rank of the current row without gaps. This function ranks rows in a dense_rank() ``` +### `ntile` + +Integer ranging from 1 to the argument value, dividing the partition as equally as possible + +``` +ntile(expression) +``` + +#### Arguments + +- **expression**: An integer describing the number groups the partition should be split into + ### `percent_rank` Returns the percentage rank of the current row within its partition. The value ranges from 0 to 1 and is computed as `(rank - 1) / (total_rows - 1)`.