Skip to content

Commit

Permalink
port array_empty and array_length (apache#9510)
Browse files Browse the repository at this point in the history
  • Loading branch information
Weijun-H authored Mar 9, 2024
1 parent b7f4772 commit b1d8082
Show file tree
Hide file tree
Showing 13 changed files with 251 additions and 193 deletions.
14 changes: 0 additions & 14 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,6 @@ pub enum BuiltinScalarFunction {
ArrayDistinct,
/// array_element
ArrayElement,
/// array_empty
ArrayEmpty,
/// array_length
ArrayLength,
/// array_position
ArrayPosition,
/// array_positions
Expand Down Expand Up @@ -360,11 +356,9 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayAppend => Volatility::Immutable,
BuiltinScalarFunction::ArraySort => Volatility::Immutable,
BuiltinScalarFunction::ArrayConcat => Volatility::Immutable,
BuiltinScalarFunction::ArrayEmpty => Volatility::Immutable,
BuiltinScalarFunction::ArrayDistinct => Volatility::Immutable,
BuiltinScalarFunction::ArrayElement => Volatility::Immutable,
BuiltinScalarFunction::ArrayExcept => Volatility::Immutable,
BuiltinScalarFunction::ArrayLength => Volatility::Immutable,
BuiltinScalarFunction::ArrayPopFront => Volatility::Immutable,
BuiltinScalarFunction::ArrayPopBack => Volatility::Immutable,
BuiltinScalarFunction::ArrayPosition => Volatility::Immutable,
Expand Down Expand Up @@ -527,7 +521,6 @@ impl BuiltinScalarFunction {

Ok(expr_type)
}
BuiltinScalarFunction::ArrayEmpty => Ok(Boolean),
BuiltinScalarFunction::ArrayDistinct => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayElement => match &input_expr_types[0] {
List(field)
Expand All @@ -537,7 +530,6 @@ impl BuiltinScalarFunction {
"The {self} function can only accept List, LargeList or FixedSizeList as the first argument"
),
},
BuiltinScalarFunction::ArrayLength => Ok(UInt64),
BuiltinScalarFunction::ArrayPopFront => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayPopBack => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayPosition => Ok(UInt64),
Expand Down Expand Up @@ -831,15 +823,11 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayConcat => {
Signature::variadic_any(self.volatility())
}
BuiltinScalarFunction::ArrayEmpty => Signature::array(self.volatility()),
BuiltinScalarFunction::ArrayElement => {
Signature::array_and_index(self.volatility())
}
BuiltinScalarFunction::ArrayExcept => Signature::any(2, self.volatility()),
BuiltinScalarFunction::Flatten => Signature::array(self.volatility()),
BuiltinScalarFunction::ArrayLength => {
Signature::variadic_any(self.volatility())
}
BuiltinScalarFunction::ArrayDistinct => Signature::array(self.volatility()),
BuiltinScalarFunction::ArrayPosition => {
Signature::array_and_element_and_optional_index(self.volatility())
Expand Down Expand Up @@ -1396,7 +1384,6 @@ impl BuiltinScalarFunction {
&["array_concat", "array_cat", "list_concat", "list_cat"]
}
BuiltinScalarFunction::ArrayDistinct => &["array_distinct", "list_distinct"],
BuiltinScalarFunction::ArrayEmpty => &["empty"],
BuiltinScalarFunction::ArrayElement => &[
"array_element",
"array_extract",
Expand All @@ -1405,7 +1392,6 @@ impl BuiltinScalarFunction {
],
BuiltinScalarFunction::ArrayExcept => &["array_except", "list_except"],
BuiltinScalarFunction::Flatten => &["flatten"],
BuiltinScalarFunction::ArrayLength => &["array_length", "list_length"],
BuiltinScalarFunction::ArrayPopFront => {
&["array_pop_front", "list_pop_front"]
}
Expand Down
13 changes: 0 additions & 13 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,12 +611,6 @@ scalar_expr!(
);

nary_scalar_expr!(ArrayConcat, array_concat, "concatenates arrays.");
scalar_expr!(
ArrayEmpty,
array_empty,
array,
"returns true for an empty array or false for a non-empty array."
);
scalar_expr!(
Flatten,
flatten,
Expand All @@ -635,12 +629,6 @@ scalar_expr!(
first_array second_array,
"Returns an array of the elements that appear in the first array but not in the second."
);
scalar_expr!(
ArrayLength,
array_length,
array dimension,
"returns the length of the array dimension."
);
scalar_expr!(
ArrayDistinct,
array_distinct,
Expand Down Expand Up @@ -1336,7 +1324,6 @@ mod test {
test_scalar_expr!(ArraySort, array_sort, array, desc, null_first);
test_scalar_expr!(ArrayPopFront, array_pop_front, array);
test_scalar_expr!(ArrayPopBack, array_pop_back, array);
test_scalar_expr!(ArrayLength, array_length, array, dimension);
test_scalar_expr!(ArrayPosition, array_position, array, element, index);
test_scalar_expr!(ArrayPositions, array_positions, array, element);
test_scalar_expr!(ArrayPrepend, array_prepend, array, element);
Expand Down
105 changes: 102 additions & 3 deletions datafusion/functions-array/src/kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@

//! implementation kernels for array functions
use arrow::array::ListArray;
use arrow::array::{
Array, ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array,
GenericListArray, Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray,
OffsetSizeTrait, StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow::array::{LargeListArray, ListArray};
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::Field;
use arrow::datatypes::UInt64Type;
use arrow::datatypes::{DataType, Date32Type, IntervalMonthDayNanoType};
use datafusion_common::cast::{
as_date32_array, as_int64_array, as_interval_mdn_array, as_large_list_array,
as_list_array, as_string_array,
as_date32_array, as_generic_list_array, as_int64_array, as_interval_mdn_array,
as_large_list_array, as_list_array, as_null_array, as_string_array,
};
use datafusion_common::{exec_err, not_impl_datafusion_err, DataFusionError, Result};
use std::any::type_name;
Expand Down Expand Up @@ -517,3 +517,102 @@ pub fn gen_range_date(
)?);
Ok(arr)
}

/// Array_empty SQL function
pub fn array_empty(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 1 {
return exec_err!("array_empty expects one argument");
}

if as_null_array(&args[0]).is_ok() {
// Make sure to return Boolean type.
return Ok(Arc::new(BooleanArray::new_null(args[0].len())));
}
let array_type = args[0].data_type();

match array_type {
DataType::List(_) => general_array_empty::<i32>(&args[0]),
DataType::LargeList(_) => general_array_empty::<i64>(&args[0]),
_ => exec_err!("array_empty does not support type '{array_type:?}'."),
}
}

fn general_array_empty<O: OffsetSizeTrait>(array: &ArrayRef) -> Result<ArrayRef> {
let array = as_generic_list_array::<O>(array)?;
let builder = array
.iter()
.map(|arr| arr.map(|arr| arr.len() == arr.null_count()))
.collect::<BooleanArray>();
Ok(Arc::new(builder))
}

/// Returns the length of a concrete array dimension
fn compute_array_length(
arr: Option<ArrayRef>,
dimension: Option<i64>,
) -> Result<Option<u64>> {
let mut current_dimension: i64 = 1;
let mut value = match arr {
Some(arr) => arr,
None => return Ok(None),
};
let dimension = match dimension {
Some(value) => {
if value < 1 {
return Ok(None);
}

value
}
None => return Ok(None),
};

loop {
if current_dimension == dimension {
return Ok(Some(value.len() as u64));
}

match value.data_type() {
DataType::List(..) => {
value = downcast_arg!(value, ListArray).value(0);
current_dimension += 1;
}
DataType::LargeList(..) => {
value = downcast_arg!(value, LargeListArray).value(0);
current_dimension += 1;
}
_ => return Ok(None),
}
}
}

/// Dispatch array length computation based on the offset type.
fn general_array_length<O: OffsetSizeTrait>(array: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_generic_list_array::<O>(&array[0])?;
let dimension = if array.len() == 2 {
as_int64_array(&array[1])?.clone()
} else {
Int64Array::from_value(1, list_array.len())
};

let result = list_array
.iter()
.zip(dimension.iter())
.map(|(arr, dim)| compute_array_length(arr, dim))
.collect::<Result<UInt64Array>>()?;

Ok(Arc::new(result) as ArrayRef)
}

/// Array_length SQL function
pub fn array_length(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 1 && args.len() != 2 {
return exec_err!("array_length expects one or two arguments");
}

match &args[0].data_type() {
DataType::List(_) => general_array_length::<i32>(args),
DataType::LargeList(_) => general_array_length::<i64>(args),
array_type => exec_err!("array_length does not support type '{array_type:?}'"),
}
}
4 changes: 4 additions & 0 deletions datafusion/functions-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub mod expr_fn {
pub use super::array_has::array_has_all;
pub use super::array_has::array_has_any;
pub use super::udf::array_dims;
pub use super::udf::array_empty;
pub use super::udf::array_length;
pub use super::udf::array_ndims;
pub use super::udf::array_to_string;
pub use super::udf::cardinality;
Expand All @@ -64,6 +66,8 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> {
array_has::array_has_udf(),
array_has::array_has_all_udf(),
array_has::array_has_any_udf(),
udf::array_empty_udf(),
udf::array_length_udf(),
];
functions.into_iter().try_for_each(|udf| {
let existing_udf = registry.register_udf(udf)?;
Expand Down
Loading

0 comments on commit b1d8082

Please sign in to comment.