Skip to content

Commit

Permalink
Rename array() function to make_array(), extend array[]
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Aug 15, 2022
1 parent e57dead commit 65ab517
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 132 deletions.
22 changes: 11 additions & 11 deletions datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ pub use datafusion_common::{
Column, DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema,
};
pub use datafusion_expr::{
abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
atan2, avg, bit_length, btrim, call_fn, case, cast, ceil, character_length, chr,
coalesce, col, combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos,
count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest,
exists, exp, expr_rewriter,
abs, acos, and, approx_distinct, approx_percentile_cont, ascii, asin, atan, atan2,
avg, bit_length, btrim, call_fn, case, cast, ceil, character_length, chr, coalesce,
col, combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos, count,
count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, exists, exp,
expr_rewriter,
expr_rewriter::{
normalize_col, normalize_col_with_schemas, normalize_cols, replace_col,
rewrite_sort_cols_by_aggs, unnormalize_col, unnormalize_cols, ExprRewritable,
Expand All @@ -50,11 +50,11 @@ pub use datafusion_expr::{
StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values,
},
lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, now_expr,
nullif, octet_length, or, power, random, regexp_match, regexp_replace, repeat,
replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, sha384,
sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim,
trunc, unalias, upper, when, Expr, ExprSchemable, Literal, Operator,
lower, lpad, ltrim, make_array, max, md5, min, not_exists, not_in_subquery, now,
now_expr, nullif, octet_length, or, power, random, regexp_match, regexp_replace,
repeat, replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256,
sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan,
to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate,
trim, trunc, unalias, upper, when, Expr, ExprSchemable, Literal, Operator,
};
pub use datafusion_optimizer::expr_simplifier::{ExprSimplifiable, SimplifyInfo};
8 changes: 4 additions & 4 deletions datafusion/core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ pub use crate::execution::options::{
AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
};
pub use crate::logical_plan::{
approx_percentile_cont, array, ascii, avg, bit_length, btrim, cast, character_length,
chr, coalesce, col, concat, concat_ws, count, create_udf, date_part, date_trunc,
digest, exists, from_unixtime, in_list, in_subquery, initcap, left, length, lit,
lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, octet_length,
approx_percentile_cont, ascii, avg, bit_length, btrim, cast, character_length, chr,
coalesce, col, concat, concat_ws, count, create_udf, date_part, date_trunc, digest,
exists, from_unixtime, in_list, in_subquery, initcap, left, length, lit, lower, lpad,
ltrim, make_array, max, md5, min, not_exists, not_in_subquery, now, octet_length,
random, regexp_match, regexp_replace, repeat, replace, reverse, right, rpad, rtrim,
scalar_subquery, sha224, sha256, sha384, sha512, split_part, starts_with, strpos,
substr, sum, to_hex, translate, trim, upper, Column, Expr, JoinType, Partitioning,
Expand Down
113 changes: 90 additions & 23 deletions datafusion/core/tests/sql/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ async fn query_concat() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn query_array() -> Result<()> {
// Return a session context with table "test" registered with 2 columns
fn array_context() -> SessionContext {
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, false),
Field::new("c2", DataType::Int32, true),
Expand All @@ -124,43 +124,110 @@ async fn query_array() -> Result<()> {
Arc::new(StringArray::from_slice(&["", "a", "aa", "aaa"])),
Arc::new(Int32Array::from(vec![Some(0), Some(1), None, Some(3)])),
],
)?;
)
.unwrap();

let table = MemTable::try_new(schema, vec![vec![data]])?;
let table = MemTable::try_new(schema, vec![vec![data]]).unwrap();

let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(table))?;
let sql = "SELECT array(c1, cast(c2 as varchar)) FROM test";
ctx.register_table("test", Arc::new(table)).unwrap();
ctx
}

#[tokio::test]
async fn query_array() {
let ctx = array_context();
let sql = "SELECT array[c1, cast(c2 as varchar)] FROM test";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+--------------------------------------+",
"| array(test.c1,CAST(test.c2 AS Utf8)) |",
"+--------------------------------------+",
"| [, 0] |",
"| [a, 1] |",
"| [aa, ] |",
"| [aaa, 3] |",
"+--------------------------------------+",
"+----------+",
"| array |",
"+----------+",
"| [, 0] |",
"| [a, 1] |",
"| [aa, ] |",
"| [aaa, 3] |",
"+----------+",
];
assert_batches_eq!(expected, &actual);
}

#[tokio::test]
async fn query_make_array() {
let ctx = array_context();
let sql = "SELECT make_array(c1, cast(c2 as varchar)) FROM test";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+------------------------------------------+",
"| makearray(test.c1,CAST(test.c2 AS Utf8)) |",
"+------------------------------------------+",
"| [, 0] |",
"| [a, 1] |",
"| [aa, ] |",
"| [aaa, 3] |",
"+------------------------------------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
async fn query_array_scalar() -> Result<()> {
async fn query_array_scalar() {
let ctx = SessionContext::new();

let sql = "SELECT array(1, 2, 3);";
let sql = "SELECT array[1, 2, 3];";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-----------------------------------+",
"| array(Int64(1),Int64(2),Int64(3)) |",
"+-----------------------------------+",
"| [1, 2, 3] |",
"+-----------------------------------+",
"+-----------+",
"| array |",
"+-----------+",
"| [1, 2, 3] |",
"+-----------+",
];
assert_batches_eq!(expected, &actual);

// alternate syntax format
let sql = "SELECT [1, 2, 3];";
let actual = execute_to_batches(&ctx, sql).await;
assert_batches_eq!(expected, &actual);
}

#[tokio::test]
async fn query_array_scalar_bad_types() {
let ctx = SessionContext::new();

// no common type to coerce to, should error
let err = plan_and_collect(&ctx, "SELECT [1, true, null]")
.await
.unwrap_err();
assert_eq!(err.to_string(), "Error during planning: Coercion from [Int64, Boolean, Null] to the signature VariadicEqual failed.",);
}

#[tokio::test]
async fn query_array_scalar_coerce() {
let ctx = SessionContext::new();

// The planner should be able to coerce this to all integers
// https://github.com/apache/arrow-datafusion/issues/3170
let err = plan_and_collect(&ctx, "SELECT [1, 2, '3']")
.await
.unwrap_err();
assert_eq!(err.to_string(), "Error during planning: Coercion from [Int64, Int64, Utf8] to the signature VariadicEqual failed.",);
}

#[tokio::test]
async fn query_make_array_scalar() {
let ctx = SessionContext::new();

let sql = "SELECT make_array(1, 2, 3);";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+---------------------------------------+",
"| makearray(Int64(1),Int64(2),Int64(3)) |",
"+---------------------------------------+",
"| [1, 2, 3] |",
"+---------------------------------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
Expand Down
12 changes: 8 additions & 4 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@ pub enum BuiltinScalarFunction {
/// trunc
Trunc,

// string functions
// array functions
/// construct an array from columns
Array,
MakeArray,

// string functions
/// ascii
Ascii,
/// bit_length
Expand Down Expand Up @@ -204,7 +206,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Sqrt => Volatility::Immutable,
BuiltinScalarFunction::Tan => Volatility::Immutable,
BuiltinScalarFunction::Trunc => Volatility::Immutable,
BuiltinScalarFunction::Array => Volatility::Immutable,
BuiltinScalarFunction::MakeArray => Volatility::Immutable,
BuiltinScalarFunction::Ascii => Volatility::Immutable,
BuiltinScalarFunction::BitLength => Volatility::Immutable,
BuiltinScalarFunction::Btrim => Volatility::Immutable,
Expand Down Expand Up @@ -297,8 +299,10 @@ impl FromStr for BuiltinScalarFunction {
// conditional functions
"coalesce" => BuiltinScalarFunction::Coalesce,

// array functions
"make_array" => BuiltinScalarFunction::MakeArray,

// string functions
"array" => BuiltinScalarFunction::Array,
"ascii" => BuiltinScalarFunction::Ascii,
"bit_length" => BuiltinScalarFunction::BitLength,
"btrim" => BuiltinScalarFunction::Btrim,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,9 @@ scalar_expr!(FromUnixtime, from_unixtime, unixtime);
unary_scalar_expr!(ArrowTypeof, arrow_typeof, "data type");

/// Returns an array of fixed size with each argument on it.
pub fn array(args: Vec<Expr>) -> Expr {
pub fn make_array(args: Vec<Expr>) -> Expr {
Expr::ScalarFunction {
fun: built_in_function::BuiltinScalarFunction::Array,
fun: built_in_function::BuiltinScalarFunction::MakeArray,
args,
}
}
Expand Down
12 changes: 4 additions & 8 deletions datafusion/expr/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use crate::nullif::SUPPORTED_NULLIF_TYPES;
use crate::type_coercion::data_types;
use crate::ColumnarValue;
use crate::{
array_expressions, conditional_expressions, struct_expressions, Accumulator,
BuiltinScalarFunction, Signature, TypeSignature,
conditional_expressions, struct_expressions, Accumulator, BuiltinScalarFunction,
Signature, TypeSignature,
};
use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
use datafusion_common::{DataFusionError, Result};
Expand Down Expand Up @@ -96,7 +96,7 @@ pub fn return_type(
// the return type of the built in function.
// Some built-in functions' return type depends on the incoming type.
match fun {
BuiltinScalarFunction::Array => Ok(DataType::FixedSizeList(
BuiltinScalarFunction::MakeArray => Ok(DataType::FixedSizeList(
Box::new(Field::new("item", input_expr_types[0].clone(), true)),
input_expr_types.len() as i32,
)),
Expand Down Expand Up @@ -267,12 +267,8 @@ pub fn return_type(
pub fn signature(fun: &BuiltinScalarFunction) -> Signature {
// note: the physical expression must accept the type returned by this function or the execution panics.

// for now, the list is small, as we do not have many built-in functions.
match fun {
BuiltinScalarFunction::Array => Signature::variadic(
array_expressions::SUPPORTED_ARRAY_TYPES.to_vec(),
fun.volatility(),
),
BuiltinScalarFunction::MakeArray => Signature::variadic_equal(fun.volatility()),
BuiltinScalarFunction::Struct => Signature::variadic(
struct_expressions::SUPPORTED_STRUCT_TYPES.to_vec(),
fun.volatility(),
Expand Down
41 changes: 24 additions & 17 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ macro_rules! downcast_vec {
}};
}

macro_rules! array {
/// Create an array of FixedSizeList from a set of individual Arrays
/// where each element in the output FixedSizeList is the result of
/// concatenating the corresponding values in the input Arrays
macro_rules! make_fixed_size_list {
($ARGS:expr, $ARRAY_TYPE:ident, $BUILDER_TYPE:ident) => {{
// downcast all arguments to their common format
let args =
Expand All @@ -59,7 +62,7 @@ macro_rules! array {
}};
}

fn array_array(args: &[ArrayRef]) -> Result<ArrayRef> {
fn arrays_to_fixed_size_list_array(args: &[ArrayRef]) -> Result<ArrayRef> {
// do not accept 0 arguments.
if args.is_empty() {
return Err(DataFusionError::Internal(
Expand All @@ -68,19 +71,21 @@ fn array_array(args: &[ArrayRef]) -> Result<ArrayRef> {
}

let res = match args[0].data_type() {
DataType::Utf8 => array!(args, StringArray, StringBuilder),
DataType::LargeUtf8 => array!(args, LargeStringArray, LargeStringBuilder),
DataType::Boolean => array!(args, BooleanArray, BooleanBuilder),
DataType::Float32 => array!(args, Float32Array, Float32Builder),
DataType::Float64 => array!(args, Float64Array, Float64Builder),
DataType::Int8 => array!(args, Int8Array, Int8Builder),
DataType::Int16 => array!(args, Int16Array, Int16Builder),
DataType::Int32 => array!(args, Int32Array, Int32Builder),
DataType::Int64 => array!(args, Int64Array, Int64Builder),
DataType::UInt8 => array!(args, UInt8Array, UInt8Builder),
DataType::UInt16 => array!(args, UInt16Array, UInt16Builder),
DataType::UInt32 => array!(args, UInt32Array, UInt32Builder),
DataType::UInt64 => array!(args, UInt64Array, UInt64Builder),
DataType::Utf8 => make_fixed_size_list!(args, StringArray, StringBuilder),
DataType::LargeUtf8 => {
make_fixed_size_list!(args, LargeStringArray, LargeStringBuilder)
}
DataType::Boolean => make_fixed_size_list!(args, BooleanArray, BooleanBuilder),
DataType::Float32 => make_fixed_size_list!(args, Float32Array, Float32Builder),
DataType::Float64 => make_fixed_size_list!(args, Float64Array, Float64Builder),
DataType::Int8 => make_fixed_size_list!(args, Int8Array, Int8Builder),
DataType::Int16 => make_fixed_size_list!(args, Int16Array, Int16Builder),
DataType::Int32 => make_fixed_size_list!(args, Int32Array, Int32Builder),
DataType::Int64 => make_fixed_size_list!(args, Int64Array, Int64Builder),
DataType::UInt8 => make_fixed_size_list!(args, UInt8Array, UInt8Builder),
DataType::UInt16 => make_fixed_size_list!(args, UInt16Array, UInt16Builder),
DataType::UInt32 => make_fixed_size_list!(args, UInt32Array, UInt32Builder),
DataType::UInt64 => make_fixed_size_list!(args, UInt64Array, UInt64Builder),
data_type => {
return Err(DataFusionError::NotImplemented(format!(
"Array is not implemented for type '{:?}'.",
Expand All @@ -92,13 +97,15 @@ fn array_array(args: &[ArrayRef]) -> Result<ArrayRef> {
}

/// put values in an array.
pub fn array(values: &[ColumnarValue]) -> Result<ColumnarValue> {
pub fn make_array(values: &[ColumnarValue]) -> Result<ColumnarValue> {
let arrays: Vec<ArrayRef> = values
.iter()
.map(|x| match x {
ColumnarValue::Array(array) => array.clone(),
ColumnarValue::Scalar(scalar) => scalar.to_array().clone(),
})
.collect();
Ok(ColumnarValue::Array(array_array(arrays.as_slice())?))
Ok(ColumnarValue::Array(arrays_to_fixed_size_list_array(
arrays.as_slice(),
)?))
}
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ pub fn create_physical_fun(
}

// string functions
BuiltinScalarFunction::Array => Arc::new(array_expressions::array),
BuiltinScalarFunction::MakeArray => Arc::new(array_expressions::make_array),
BuiltinScalarFunction::Struct => Arc::new(struct_expressions::struct_expr),
BuiltinScalarFunction::Ascii => Arc::new(|args| match args[0].data_type() {
DataType::Utf8 => {
Expand Down
Loading

0 comments on commit 65ab517

Please sign in to comment.