Skip to content

Commit

Permalink
ARROW-9887: [Rust] [DataFusion] Added support for complex return type…
Browse files Browse the repository at this point in the history
…s for built-in functions

@alamb and @andygrove , I was able to split #8032 in two, so that they address different problems. This PR is specific to the problem that we have been discussing in #7967. It offers a solution that covers the three main cases:

* single return type, such as `sqrt -> f64`
* finite set of return types, such as `concat` (utf8 and largeUTF8)
* potentially infinite set of return types, such as `array` (Array of any primitive or non-primitive type)

I believe that this implementation is closer to option 1 that @alamb enumerated here. It is so because so far I was unable to offer an implementation for option 3, because functions such as `array` have an arbitrary return type (it can be any valid type, primitive or non-primitive), and thus we can't write them as `array_TYPE` as the number of cases is potentially large.

---------------

This PR is exclusive to *built-in functions* of variable return type and it does not care about UDFs. It addresses a limitation of our current logical planning, that has been thoroughly discussed in #8032 and #7967, that logical planning needs to specify a specific return type when planning usage of UDFs and built-in functions (details below).

Notation: `return type function`: a function mapping the functions' argument types to its return type. E.g. `(utf8) -> utf8; (LargeUtf8) -> LargeUtf8;` is an example of the signature of a typical one argument string function.

The primary difference between built-ins and UDFs is that built-in's return type function is always known (hard-coded), while the return type function of a UDF is known by accessing the registry where it is registered on (it is a non-static closure).

This PR is required to address an incompatibility of the following requirements that I gathered from discussions between @alamb, @andygrove and @jorgecarleitao:

1. we want to have typing information during logical planning (see [here](https://docs.google.com/document/d/1Kzz642ScizeKXmVE1bBlbLvR663BKQaGqVIyy9cAscY/edit?disco=AAAAJ4XOjHk))
2. we want to have functions that require their return type to depend on their input. Examples include `array` (any type to any other type) and `concatenate` (`utf8 -> utf8`, `largeutf8 -> largeutf8`), and many others (see [here](#7967 (comment)))
3. we would like users to plan built-in functions without accessing the registry (see [here](#8032 (comment)) and mailing list)
4. a UDFs return type function needs to be retrieved from the registry (`ExecutionContextState`).
5. Currently, all our built-in functions are declared as UDFs and registered on the registry when the context is initialized.

These points are incompatible because:

* 1. and 2. requires access to built-in function's return type function during planning
* 4. and 5. requires access the registry to know the built-in's return type
* 3. forbids us from accessing the registry during planning

This PR solves this incompatibility by leveraging the following:

* builtin functions have a well declared return type during planning, since they are part of the source code
* builtin functions do not need to be in our function's registry

The first commit in this PR makes the existing logical node `Expr::ScalarFunction` to be exclusive for built-in functions, and moves our UDF planning logic to a new node named `Expr::ScalarUDF`. It also makes the planning of built-in functions to no longer require access the registry.

The second commit in this PR introduces the necessary functionality for built-in functions to support all types of complex signatures. Examples of usage of this functionality are in the following PRs:

1. add support for math functions that accept f32: https://github.com/jorgecarleitao/arrow/pull/4/files
2. add `concat`, of an arbitrary number of arguments of type utf8: https://github.com/jorgecarleitao/arrow/pull/5/files
3. add `array` function, supporting an arbitrary number of arguments with uniform types: https://github.com/jorgecarleitao/arrow/pull/6/files

Closes #8080 from jorgecarleitao/functions

Authored-by: Jorge C. Leitao <[email protected]>
Signed-off-by: Andy Grove <[email protected]>
  • Loading branch information
jorgecarleitao authored and andygrove committed Sep 1, 2020
1 parent 1790751 commit a898ee1
Show file tree
Hide file tree
Showing 13 changed files with 706 additions and 152 deletions.
6 changes: 1 addition & 5 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use crate::execution::physical_plan::common;
use crate::execution::physical_plan::csv::CsvReadOptions;
use crate::execution::physical_plan::merge::MergeExec;
use crate::execution::physical_plan::planner::DefaultPhysicalPlanner;
use crate::execution::physical_plan::scalar_functions;
use crate::execution::physical_plan::udf::ScalarFunction;
use crate::execution::physical_plan::ExecutionPlan;
use crate::execution::physical_plan::PhysicalPlanner;
Expand Down Expand Up @@ -105,16 +104,13 @@ impl ExecutionContext {

/// Create a new execution context using the provided configuration
pub fn with_config(config: ExecutionConfig) -> Self {
let mut ctx = Self {
let ctx = Self {
state: Arc::new(Mutex::new(ExecutionContextState {
datasources: HashMap::new(),
scalar_functions: HashMap::new(),
config,
})),
};
for udf in scalar_functions() {
ctx.register_udf(udf);
}
ctx
}

Expand Down
2 changes: 1 addition & 1 deletion rust/datafusion/src/execution/physical_plan/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1361,7 +1361,7 @@ pub struct CastExpr {
}

/// Determine if a DataType is numeric or not
fn is_numeric(dt: &DataType) -> bool {
pub fn is_numeric(dt: &DataType) -> bool {
match dt {
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => true,
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => true,
Expand Down
261 changes: 261 additions & 0 deletions rust/datafusion/src/execution/physical_plan/functions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Declaration of built-in (scalar) functions.
//! This module contains built-in functions' enumeration and metadata.
//!
//! Generally, a function has:
//! * a signature
//! * a return type, that is a function of the incoming argument's types
//! * the computation, that must accept each valid signature
//!
//! * Signature: see `Signature`
//! * Return type: a function `(arg_types) -> return_type`. E.g. for sqrt, ([f32]) -> f32, ([f64]) -> f64.
//!
//! This module also has a set of coercion rules to improve user experience: if an argument i32 is passed
//! to a function that supports f64, it is coerced to f64.
use super::{
type_coercion::{coerce, data_types},
PhysicalExpr,
};
use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::math_expressions;
use crate::execution::physical_plan::udf;
use arrow::{
compute::kernels::length::length,
datatypes::{DataType, Schema},
};
use std::{fmt, str::FromStr, sync::Arc};
use udf::ScalarUdf;

/// A function's signature, which defines the function's supported argument types.
#[derive(Debug)]
pub enum Signature {
/// arbitrary number of arguments of an common type out of a list of valid types
// A function such as `concat` is `Variadic(vec![DataType::Utf8, DataType::LargeUtf8])`
Variadic(Vec<DataType>),
/// arbitrary number of arguments of an arbitrary but equal type
// A function such as `array` is `VariadicEqual`
// The first argument decides the type used for coercion
VariadicEqual,
/// fixed number of arguments of an arbitrary but equal type out of a list of valid types
// A function of one argument of f64 is `Uniform(1, vec![DataType::Float64])`
// A function of two arguments of f64 or f32 is `Uniform(1, vec![DataType::Float32, DataType::Float64])`
Uniform(usize, Vec<DataType>),
}

/// Enum of all built-in scalar functions
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ScalarFunction {
/// sqrt
Sqrt,
/// sin
Sin,
/// cos
Cos,
/// tan
Tan,
/// asin
Asin,
/// acos
Acos,
/// atan
Atan,
/// exp
Exp,
/// log, also known as ln
Log,
/// log2
Log2,
/// log10
Log10,
/// floor
Floor,
/// ceil
Ceil,
/// round
Round,
/// trunc
Trunc,
/// abs
Abs,
/// signum
Signum,
/// length
Length,
}

impl fmt::Display for ScalarFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// lowercase of the debug.
write!(f, "{}", format!("{:?}", self).to_lowercase())
}
}

impl FromStr for ScalarFunction {
type Err = ExecutionError;
fn from_str(name: &str) -> Result<ScalarFunction> {
Ok(match name {
"sqrt" => ScalarFunction::Sqrt,
"sin" => ScalarFunction::Sin,
"cos" => ScalarFunction::Cos,
"tan" => ScalarFunction::Tan,
"asin" => ScalarFunction::Asin,
"acos" => ScalarFunction::Acos,
"atan" => ScalarFunction::Atan,
"exp" => ScalarFunction::Exp,
"log" => ScalarFunction::Log,
"log2" => ScalarFunction::Log2,
"log10" => ScalarFunction::Log10,
"floor" => ScalarFunction::Floor,
"ceil" => ScalarFunction::Ceil,
"round" => ScalarFunction::Round,
"truc" => ScalarFunction::Trunc,
"abs" => ScalarFunction::Abs,
"signum" => ScalarFunction::Signum,
"length" => ScalarFunction::Length,
_ => {
return Err(ExecutionError::General(format!(
"There is no built-in function named {}",
name
)))
}
})
}
}

/// Returns the datatype of the scalar function
pub fn return_type(fun: &ScalarFunction, arg_types: &Vec<DataType>) -> Result<DataType> {
// Note that this function *must* return the same type that the respective physical expression returns
// or the execution panics.

// verify that this is a valid set of data types for this function
data_types(&arg_types, &signature(fun))?;

// the return type after coercion.
// for now, this is type-independent, but there will be built-in functions whose return type
// depends on the incoming type.
match fun {
ScalarFunction::Length => Ok(DataType::UInt32),
_ => Ok(DataType::Float64),
}
}

/// Create a physical (function) expression.
/// This function errors when `args`' can't be coerced to a valid argument type of the function.
pub fn create_physical_expr(
fun: &ScalarFunction,
args: &Vec<Arc<dyn PhysicalExpr>>,
input_schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>> {
let fun_expr: ScalarUdf = Arc::new(match fun {
ScalarFunction::Sqrt => math_expressions::sqrt,
ScalarFunction::Sin => math_expressions::sin,
ScalarFunction::Cos => math_expressions::cos,
ScalarFunction::Tan => math_expressions::tan,
ScalarFunction::Asin => math_expressions::asin,
ScalarFunction::Acos => math_expressions::acos,
ScalarFunction::Atan => math_expressions::atan,
ScalarFunction::Exp => math_expressions::exp,
ScalarFunction::Log => math_expressions::ln,
ScalarFunction::Log2 => math_expressions::log2,
ScalarFunction::Log10 => math_expressions::log10,
ScalarFunction::Floor => math_expressions::floor,
ScalarFunction::Ceil => math_expressions::ceil,
ScalarFunction::Round => math_expressions::round,
ScalarFunction::Trunc => math_expressions::trunc,
ScalarFunction::Abs => math_expressions::abs,
ScalarFunction::Signum => math_expressions::signum,
ScalarFunction::Length => |args| Ok(Arc::new(length(args[0].as_ref())?)),
});
// coerce
let args = coerce(args, input_schema, &signature(fun))?;

let arg_types = args
.iter()
.map(|e| e.data_type(input_schema))
.collect::<Result<Vec<_>>>()?;

Ok(Arc::new(udf::ScalarFunctionExpr::new(
&format!("{}", fun),
fun_expr,
args,
&return_type(&fun, &arg_types)?,
)))
}

/// the signatures supported by the function `fun`.
fn signature(fun: &ScalarFunction) -> Signature {
// note: the physical expression must accept the type returned by this function or the execution panics.

// for now, the list is small, as we do not have many built-in functions.
match fun {
ScalarFunction::Length => Signature::Uniform(1, vec![DataType::Utf8]),
// math expressions expect 1 argument of type f64
_ => Signature::Uniform(1, vec![DataType::Float64]),
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{
error::Result, execution::physical_plan::expressions::lit,
logicalplan::ScalarValue,
};
use arrow::{
array::{ArrayRef, Float64Array, Int32Array},
datatypes::Field,
record_batch::RecordBatch,
};

fn generic_test_math(value: ScalarValue, expected: &str) -> Result<()> {
// any type works here: we evaluate against a literal of `value`
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let columns: Vec<ArrayRef> = vec![Arc::new(Int32Array::from(vec![1]))];

let arg = lit(value);

let expr = create_physical_expr(&ScalarFunction::Exp, &vec![arg], &schema)?;

// type is correct
assert_eq!(expr.data_type(&schema)?, DataType::Float64);

// evaluate works
let result =
expr.evaluate(&RecordBatch::try_new(Arc::new(schema.clone()), columns)?)?;

// downcast works
let result = result.as_any().downcast_ref::<Float64Array>().unwrap();

// value is correct
assert_eq!(format!("{}", result.value(0)), expected); // = exp(1)

Ok(())
}

#[test]
fn test_math_function() -> Result<()> {
let exp_f64 = "2.718281828459045";
generic_test_math(ScalarValue::Int32(1i32), exp_f64)?;
generic_test_math(ScalarValue::UInt32(1u32), exp_f64)?;
generic_test_math(ScalarValue::Float64(1f64), exp_f64)?;
generic_test_math(ScalarValue::Float32(1f32), exp_f64)?;
Ok(())
}
}
Loading

0 comments on commit a898ee1

Please sign in to comment.