Skip to content

Commit

Permalink
Convert built-in row_number to user-defined window function (#12030)
Browse files Browse the repository at this point in the history
* Adds new crate for window functions

* Moves `row_number` to window functions crate

* Fixes build errors

* Regenerates protobuf

* Makes `row_number` no-op temporarily

* Minor: fixes formatting

* Implements `WindowUDF` for `row_number`

* Minor: fixes formatting

* Adds singleton instance of UDWF: `row_number`

* Adds partition evaluator

* Registers default window functions

* Implements `evaluate_all`

* Fixes: allow non-uppercase globals

* Minor: prefix underscore for unused variable

* Minor: fixes formatting

* Uses `row_number_udwf`

* Fixes: unparser test for `row_number`

* Uses row number to represent functional dependency

* Minor: fixes formatting

* Removes `row_number` from case-insensitive name test

* Deletes wrapper for `row_number` window expression

* Fixes: lowercase name in error statement

* Fixes: `row_number` fields are not nullable

* Fixes: lowercase name in explain output

* Updates Cargo.lock

* Fixes: lowercase name in explain output

* Adds support for result ordering

* Minor: add newline between methods

* Fixes: re-export crate name in doc comments

* Adds doc comment for `WindowUDFImpl::nullable`

* Minor: renames variable

* Minor: update doc comments

* Deletes code

* Minor: update doc comments

* Minor: adds period

* Adds doc comment for `row_number` window UDF

* Adds fluent API for creating `row_number` expression

* Minor: removes unnecessary path prefix

* Adds roundtrip logical plan test case

* Updates unit tests for `row_number`

* Deletes code

* Minor: copy edit doc comments

* Minor: deletes comment

* Minor: copy edits udwf doc comments
  • Loading branch information
jcsherin authored Aug 17, 2024
1 parent e4bc622 commit bd2d4ee
Show file tree
Hide file tree
Showing 34 changed files with 519 additions and 326 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ members = [
"datafusion/functions-aggregate",
"datafusion/functions-aggregate-common",
"datafusion/functions-nested",
"datafusion/functions-window",
"datafusion/optimizer",
"datafusion/physical-expr",
"datafusion/physical-expr-common",
Expand Down Expand Up @@ -102,6 +103,7 @@ datafusion-functions = { path = "datafusion/functions", version = "41.0.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "41.0.0" }
datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "41.0.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version = "41.0.0" }
datafusion-functions-window = { path = "datafusion/functions-window", version = "41.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "41.0.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "41.0.0", default-features = false }
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "41.0.0", default-features = false }
Expand Down
11 changes: 11 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-nested = { workspace = true, optional = true }
datafusion-functions-window = { workspace = true }
datafusion-optimizer = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,7 @@ impl SessionStateBuilder {
self.scalar_functions = Some(SessionStateDefaults::default_scalar_functions());
self.aggregate_functions =
Some(SessionStateDefaults::default_aggregate_functions());
self.window_functions = Some(SessionStateDefaults::default_window_functions());
self
}

Expand Down
9 changes: 7 additions & 2 deletions datafusion/core/src/execution/session_state_defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ use crate::datasource::provider::DefaultTableFactory;
use crate::execution::context::SessionState;
#[cfg(feature = "nested_expressions")]
use crate::functions_nested;
use crate::{functions, functions_aggregate};
use crate::{functions, functions_aggregate, functions_window};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_expr::planner::ExprPlanner;
use datafusion_expr::{AggregateUDF, ScalarUDF};
use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
use std::collections::HashMap;
use std::sync::Arc;
use url::Url;
Expand Down Expand Up @@ -112,6 +112,11 @@ impl SessionStateDefaults {
functions_aggregate::all_default_aggregate_functions()
}

/// returns the list of default [`WindowUDF']'s
pub fn default_window_functions() -> Vec<Arc<WindowUDF>> {
functions_window::all_default_window_functions()
}

/// returns the list of default [`FileFormatFactory']'s
pub fn default_file_formats() -> Vec<Arc<dyn FileFormatFactory>> {
let file_formats: Vec<Arc<dyn FileFormatFactory>> = vec![
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,11 @@ pub mod functions_aggregate {
pub use datafusion_functions_aggregate::*;
}

/// re-export of [`datafusion_functions_window`] crate
pub mod functions_window {
pub use datafusion_functions_window::*;
}

#[cfg(test)]
pub mod test;
pub mod test_util;
Expand Down
13 changes: 5 additions & 8 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use datafusion_physical_expr::expressions::{cast, col, lit};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use test_utils::add_empty_batches;

use datafusion::functions_window::row_number::row_number_udwf;
use hashbrown::HashMap;
use rand::distributions::Alphanumeric;
use rand::rngs::StdRng;
Expand Down Expand Up @@ -180,12 +181,10 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
// ROWS BETWEEN UNBOUNDED PRECEDING AND <end_bound> PRECEDING/FOLLOWING
// )
(
// Window function
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::RowNumber,
),
// user-defined window function
WindowFunctionDefinition::WindowUDF(row_number_udwf()),
// its name
"ROW_NUMBER",
"row_number",
// no argument
vec![],
// Expected causality, for None cases causality will be determined from window frame boundaries
Expand Down Expand Up @@ -377,9 +376,7 @@ fn get_random_function(
window_fn_map.insert(
"row_number",
(
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::RowNumber,
),
WindowFunctionDefinition::WindowUDF(row_number_udwf()),
vec![],
),
);
Expand Down
10 changes: 2 additions & 8 deletions datafusion/expr/src/built_in_window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ impl fmt::Display for BuiltInWindowFunction {
/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumIter)]
pub enum BuiltInWindowFunction {
/// number of the current row within its partition, counting from 1
RowNumber,
/// rank of the current row with gaps; same as row_number of its first peer
Rank,
/// rank of the current row without gaps; this function counts peer groups
Expand Down Expand Up @@ -74,7 +72,6 @@ impl BuiltInWindowFunction {
pub fn name(&self) -> &str {
use BuiltInWindowFunction::*;
match self {
RowNumber => "ROW_NUMBER",
Rank => "RANK",
DenseRank => "DENSE_RANK",
PercentRank => "PERCENT_RANK",
Expand All @@ -93,7 +90,6 @@ impl FromStr for BuiltInWindowFunction {
type Err = DataFusionError;
fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
Ok(match name.to_uppercase().as_str() {
"ROW_NUMBER" => BuiltInWindowFunction::RowNumber,
"RANK" => BuiltInWindowFunction::Rank,
"DENSE_RANK" => BuiltInWindowFunction::DenseRank,
"PERCENT_RANK" => BuiltInWindowFunction::PercentRank,
Expand Down Expand Up @@ -131,8 +127,7 @@ impl BuiltInWindowFunction {
})?;

match self {
BuiltInWindowFunction::RowNumber
| BuiltInWindowFunction::Rank
BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank
| BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => {
Expand All @@ -150,8 +145,7 @@ impl BuiltInWindowFunction {
pub fn signature(&self) -> Signature {
// note: the physical expression must accept the type returned by this function or the execution panics.
match self {
BuiltInWindowFunction::RowNumber
| BuiltInWindowFunction::Rank
BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank
| BuiltInWindowFunction::PercentRank
| BuiltInWindowFunction::CumeDist => Signature::any(0, Volatility::Immutable),
Expand Down
1 change: 0 additions & 1 deletion datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2896,7 +2896,6 @@ mod test {
#[test]
fn test_window_function_case_insensitive() -> Result<()> {
let names = vec![
"row_number",
"rank",
"dense_rank",
"percent_rank",
Expand Down
14 changes: 5 additions & 9 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ use crate::utils::{
split_conjunction,
};
use crate::{
build_join_schema, expr_vec_fmt, BinaryExpr, BuiltInWindowFunction,
CreateMemoryTable, CreateView, Expr, ExprSchemable, LogicalPlanBuilder, Operator,
TableProviderFilterPushDown, TableSource, WindowFunctionDefinition,
build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Expr,
ExprSchemable, LogicalPlanBuilder, Operator, TableProviderFilterPushDown,
TableSource, WindowFunctionDefinition,
};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
Expand Down Expand Up @@ -2214,18 +2214,14 @@ impl Window {
.enumerate()
.filter_map(|(idx, expr)| {
if let Expr::WindowFunction(WindowFunction {
// Function is ROW_NUMBER
fun:
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::RowNumber,
),
fun: WindowFunctionDefinition::WindowUDF(udwf),
partition_by,
..
}) = expr
{
// When there is no PARTITION BY, row number will be unique
// across the entire table.
if partition_by.is_empty() {
if udwf.name() == "row_number" && partition_by.is_empty() {
return Some(idx + input_len);
}
}
Expand Down
34 changes: 34 additions & 0 deletions datafusion/expr/src/udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! [`WindowUDF`]: User Defined Window Functions
use arrow::compute::SortOptions;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::{
any::Any,
Expand Down Expand Up @@ -176,6 +177,21 @@ impl WindowUDF {
pub fn partition_evaluator_factory(&self) -> Result<Box<dyn PartitionEvaluator>> {
self.inner.partition_evaluator()
}

/// Returns if column values are nullable for this window function.
///
/// See [`WindowUDFImpl::nullable`] for more details.
pub fn nullable(&self) -> bool {
self.inner.nullable()
}

/// Returns custom result ordering introduced by this window function
/// which is used to update ordering equivalences.
///
/// See [`WindowUDFImpl::sort_options`] for more details.
pub fn sort_options(&self) -> Option<SortOptions> {
self.inner.sort_options()
}
}

impl<F> From<F> for WindowUDF
Expand Down Expand Up @@ -319,6 +335,24 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
self.signature().hash(hasher);
hasher.finish()
}

/// Allows customizing nullable of column for this window UDF.
///
/// By default, the final result of evaluating the window UDF is
/// allowed to have null values. But if that is not the case then
/// it can be customized in the window UDF implementation.
fn nullable(&self) -> bool {
true
}

/// Allows the window UDF to define a custom result ordering.
///
/// By default, a window UDF doesn't introduce an ordering.
/// But when specified by a window UDF this is used to update
/// ordering equivalences.
fn sort_options(&self) -> Option<SortOptions> {
None
}
}

/// WindowUDF that adds an alias to the underlying function. It is better to
Expand Down
8 changes: 0 additions & 8 deletions datafusion/expr/src/window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ use datafusion_common::ScalarValue;

use crate::{expr::WindowFunction, BuiltInWindowFunction, Expr, Literal};

/// Create an expression to represent the `row_number` window function
pub fn row_number() -> Expr {
Expr::WindowFunction(WindowFunction::new(
BuiltInWindowFunction::RowNumber,
vec![],
))
}

/// Create an expression to represent the `rank` window function
pub fn rank() -> Expr {
Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::Rank, vec![]))
Expand Down
47 changes: 47 additions & 0 deletions datafusion/functions-window/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "datafusion-functions-window"
description = "Window function packages for the DataFusion query engine"
keywords = ["datafusion", "logical", "plan", "expressions"]
readme = "README.md"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
license = { workspace = true }
authors = { workspace = true }
rust-version = { workspace = true }

[lints]
workspace = true

[lib]
name = "datafusion_functions_window"
path = "src/lib.rs"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
log = { workspace = true }

[dev-dependencies]
arrow = { workspace = true }
26 changes: 26 additions & 0 deletions datafusion/functions-window/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# DataFusion Window Function Library

[DataFusion][df] is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.

This crate contains user-defined window functions.

[df]: https://crates.io/crates/datafusion
Loading

0 comments on commit bd2d4ee

Please sign in to comment.