Skip to content

Commit

Permalink
Convert ntile builtIn function to UDWF (#13040)
Browse files Browse the repository at this point in the history
* converting to ntile udwf

* updated the window functions documentation file

* wip: update the ntile udwf function

* fix the roundtrip_logical_plan.rs

* removed builtIn ntile function

* fixed field name issue

* fixing the return type of ntile udwf

* error if UInt64 conversion fails

* handling if null is found

* handling if value is zero or less than zero

* removed unused import

* updated prost.rs file

* removed dead code

* fixed clippy error

* added inner doc comment

* minor fixes and added roundtrip logical plan test

* removed parse_expr in ntile
  • Loading branch information
jatin510 authored Oct 25, 2024
1 parent 13a4225 commit 02b9693
Show file tree
Hide file tree
Showing 19 changed files with 221 additions and 276 deletions.
19 changes: 0 additions & 19 deletions datafusion/expr/src/built_in_window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ impl fmt::Display for BuiltInWindowFunction {
/// [Window Function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, EnumIter)]
pub enum BuiltInWindowFunction {
/// Integer ranging from 1 to the argument value, dividing the partition as equally as possible
Ntile,
/// returns value evaluated at the row that is the first row of the window frame
FirstValue,
/// Returns value evaluated at the row that is the last row of the window frame
Expand All @@ -54,7 +52,6 @@ impl BuiltInWindowFunction {
pub fn name(&self) -> &str {
use BuiltInWindowFunction::*;
match self {
Ntile => "NTILE",
FirstValue => "first_value",
LastValue => "last_value",
NthValue => "NTH_VALUE",
Expand All @@ -66,7 +63,6 @@ impl FromStr for BuiltInWindowFunction {
type Err = DataFusionError;
fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
Ok(match name.to_uppercase().as_str() {
"NTILE" => BuiltInWindowFunction::Ntile,
"FIRST_VALUE" => BuiltInWindowFunction::FirstValue,
"LAST_VALUE" => BuiltInWindowFunction::LastValue,
"NTH_VALUE" => BuiltInWindowFunction::NthValue,
Expand Down Expand Up @@ -97,7 +93,6 @@ impl BuiltInWindowFunction {
})?;

match self {
BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
BuiltInWindowFunction::FirstValue
| BuiltInWindowFunction::LastValue
| BuiltInWindowFunction::NthValue => Ok(input_expr_types[0].clone()),
Expand All @@ -111,20 +106,6 @@ impl BuiltInWindowFunction {
BuiltInWindowFunction::FirstValue | BuiltInWindowFunction::LastValue => {
Signature::any(1, Volatility::Immutable)
}
BuiltInWindowFunction::Ntile => Signature::uniform(
1,
vec![
DataType::UInt64,
DataType::UInt32,
DataType::UInt16,
DataType::UInt8,
DataType::Int64,
DataType::Int32,
DataType::Int16,
DataType::Int8,
],
Volatility::Immutable,
),
BuiltInWindowFunction::NthValue => Signature::any(2, Volatility::Immutable),
}
}
Expand Down
11 changes: 1 addition & 10 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2567,18 +2567,9 @@ mod test {
Ok(())
}

#[test]
fn test_ntile_return_type() -> Result<()> {
let fun = find_df_window_func("ntile").unwrap();
let observed = fun.return_type(&[DataType::Int16], &[true], "")?;
assert_eq!(DataType::UInt64, observed);

Ok(())
}

#[test]
fn test_window_function_case_insensitive() -> Result<()> {
let names = vec!["ntile", "first_value", "last_value", "nth_value"];
let names = vec!["first_value", "last_value", "nth_value"];
for name in names {
let fun = find_df_window_func(name).unwrap();
let fun2 = find_df_window_func(name.to_uppercase().as_str()).unwrap();
Expand Down
5 changes: 0 additions & 5 deletions datafusion/expr/src/window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

use crate::{expr::WindowFunction, BuiltInWindowFunction, Expr, Literal};

/// Create an expression to represent the `ntile` window function
pub fn ntile(arg: Expr) -> Expr {
Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::Ntile, vec![arg]))
}

/// Create an expression to represent the `nth_value` window function
pub fn nth_value(arg: Expr, n: i64) -> Expr {
Expr::WindowFunction(WindowFunction::new(
Expand Down
4 changes: 3 additions & 1 deletion datafusion/functions-window/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub mod macros;

pub mod cume_dist;
pub mod lead_lag;

pub mod ntile;
pub mod rank;
pub mod row_number;
mod utils;
Expand All @@ -44,6 +44,7 @@ pub mod expr_fn {
pub use super::cume_dist::cume_dist;
pub use super::lead_lag::lag;
pub use super::lead_lag::lead;
pub use super::ntile::ntile;
pub use super::rank::{dense_rank, percent_rank, rank};
pub use super::row_number::row_number;
}
Expand All @@ -58,6 +59,7 @@ pub fn all_default_window_functions() -> Vec<Arc<WindowUDF>> {
rank::rank_udwf(),
rank::dense_rank_udwf(),
rank::percent_rank_udwf(),
ntile::ntile_udwf(),
]
}
/// Registers all enabled packages with a [`FunctionRegistry`]
Expand Down
168 changes: 168 additions & 0 deletions datafusion/functions-window/src/ntile.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! `ntile` window function implementation
use std::any::Any;
use std::fmt::Debug;
use std::sync::{Arc, OnceLock};

use crate::utils::{
get_scalar_value_from_args, get_signed_integer, get_unsigned_integer,
};
use datafusion_common::arrow::array::{ArrayRef, UInt64Array};
use datafusion_common::arrow::datatypes::{DataType, Field};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::window_doc_sections::DOC_SECTION_RANKING;
use datafusion_expr::{
Documentation, Expr, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
};
use datafusion_functions_window_common::field;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use field::WindowUDFFieldArgs;

get_or_init_udwf!(
Ntile,
ntile,
"integer ranging from 1 to the argument value, dividing the partition as equally as possible"
);

pub fn ntile(arg: Expr) -> Expr {
ntile_udwf().call(vec![arg])
}

#[derive(Debug)]
pub struct Ntile {
signature: Signature,
}

impl Ntile {
/// Create a new `ntile` function
pub fn new() -> Self {
Self {
signature: Signature::uniform(
1,
vec![
DataType::UInt64,
DataType::UInt32,
DataType::UInt16,
DataType::UInt8,
DataType::Int64,
DataType::Int32,
DataType::Int16,
DataType::Int8,
],
Volatility::Immutable,
),
}
}
}

impl Default for Ntile {
fn default() -> Self {
Self::new()
}
}

static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();

fn get_ntile_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_RANKING)
.with_description(
"Integer ranging from 1 to the argument value, dividing the partition as equally as possible",
)
.with_syntax_example("ntile(expression)")
.with_argument("expression","An integer describing the number groups the partition should be split into")
.build()
.unwrap()
})
}

impl WindowUDFImpl for Ntile {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"ntile"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn partition_evaluator(
&self,
partition_evaluator_args: PartitionEvaluatorArgs,
) -> Result<Box<dyn PartitionEvaluator>> {
let scalar_n =
get_scalar_value_from_args(partition_evaluator_args.input_exprs(), 0)?
.ok_or_else(|| {
DataFusionError::Execution(
"NTILE requires a positive integer".to_string(),
)
})?;

if scalar_n.is_null() {
return exec_err!("NTILE requires a positive integer, but finds NULL");
}

if scalar_n.is_unsigned() {
let n = get_unsigned_integer(scalar_n)?;
Ok(Box::new(NtileEvaluator { n }))
} else {
let n: i64 = get_signed_integer(scalar_n)?;
if n <= 0 {
return exec_err!("NTILE requires a positive integer");
}
Ok(Box::new(NtileEvaluator { n: n as u64 }))
}
}
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
let nullable = false;

Ok(Field::new(field_args.name(), DataType::UInt64, nullable))
}

fn documentation(&self) -> Option<&Documentation> {
Some(get_ntile_doc())
}
}

#[derive(Debug)]
struct NtileEvaluator {
n: u64,
}

impl PartitionEvaluator for NtileEvaluator {
fn evaluate_all(
&mut self,
_values: &[ArrayRef],
num_rows: usize,
) -> Result<ArrayRef> {
let num_rows = num_rows as u64;
let mut vec: Vec<u64> = Vec::new();
let n = u64::min(self.n, num_rows);
for i in 0..num_rows {
let res = i * n / num_rows;
vec.push(res + 1)
}
Ok(Arc::new(UInt64Array::from(vec)))
}
}
12 changes: 12 additions & 0 deletions datafusion/functions-window/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,15 @@ pub(crate) fn get_scalar_value_from_args(
None
})
}

pub(crate) fn get_unsigned_integer(value: ScalarValue) -> Result<u64> {
if value.is_null() {
return Ok(0);
}

if !value.data_type().is_integer() {
return exec_err!("Expected an integer value");
}

value.cast_to(&DataType::UInt64)?.try_into()
}
1 change: 0 additions & 1 deletion datafusion/physical-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ mod unknown_column;
/// Module with some convenient methods used in expression building
pub use crate::aggregate::stats::StatsType;
pub use crate::window::nth_value::NthValue;
pub use crate::window::ntile::Ntile;
pub use crate::PhysicalSortExpr;

pub use binary::{binary, similar_to, BinaryExpr};
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-expr/src/window/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ mod aggregate;
mod built_in;
mod built_in_window_function_expr;
pub(crate) mod nth_value;
pub(crate) mod ntile;
mod sliding_aggregate;
mod window_expr;

Expand Down
Loading

0 comments on commit 02b9693

Please sign in to comment.