Skip to content

Commit

Permalink
[task apache#9511] move date_part, date_trunc, date_bin functions to …
Browse files Browse the repository at this point in the history
…datafusion-functions

Signed-off-by: tangruilin <[email protected]>
  • Loading branch information
Tangruilin committed Mar 9, 2024
1 parent b7f4772 commit 261b500
Show file tree
Hide file tree
Showing 14 changed files with 291 additions and 228 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 0 additions & 32 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ pub enum BuiltinScalarFunction {
Concat,
/// concat_ws
ConcatWithSeparator,
/// date_part
DatePart,
/// date_trunc
DateTrunc,
/// date_bin
Expand Down Expand Up @@ -391,7 +389,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Chr => Volatility::Immutable,
BuiltinScalarFunction::Concat => Volatility::Immutable,
BuiltinScalarFunction::ConcatWithSeparator => Volatility::Immutable,
BuiltinScalarFunction::DatePart => Volatility::Immutable,
BuiltinScalarFunction::DateTrunc => Volatility::Immutable,
BuiltinScalarFunction::DateBin => Volatility::Immutable,
BuiltinScalarFunction::EndsWith => Volatility::Immutable,
Expand Down Expand Up @@ -617,7 +614,6 @@ impl BuiltinScalarFunction {
}
BuiltinScalarFunction::Concat => Ok(Utf8),
BuiltinScalarFunction::ConcatWithSeparator => Ok(Utf8),
BuiltinScalarFunction::DatePart => Ok(Float64),
BuiltinScalarFunction::DateBin | BuiltinScalarFunction::DateTrunc => {
match &input_expr_types[1] {
Timestamp(Nanosecond, None) | Utf8 | Null => {
Expand Down Expand Up @@ -1052,33 +1048,6 @@ impl BuiltinScalarFunction {

Signature::one_of(full_sig, self.volatility())
}
BuiltinScalarFunction::DatePart => Signature::one_of(
vec![
Exact(vec![Utf8, Timestamp(Nanosecond, None)]),
Exact(vec![
Utf8,
Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![Utf8, Timestamp(Millisecond, None)]),
Exact(vec![
Utf8,
Timestamp(Millisecond, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![Utf8, Timestamp(Microsecond, None)]),
Exact(vec![
Utf8,
Timestamp(Microsecond, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![Utf8, Timestamp(Second, None)]),
Exact(vec![
Utf8,
Timestamp(Second, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![Utf8, Date64]),
Exact(vec![Utf8, Date32]),
],
self.volatility(),
),
BuiltinScalarFunction::SplitPart => Signature::one_of(
vec![
Exact(vec![Utf8, Utf8, Int64]),
Expand Down Expand Up @@ -1369,7 +1338,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::MakeDate => &["make_date"],
BuiltinScalarFunction::DateBin => &["date_bin"],
BuiltinScalarFunction::DateTrunc => &["date_trunc", "datetrunc"],
BuiltinScalarFunction::DatePart => &["date_part", "datepart"],
BuiltinScalarFunction::ToChar => &["to_char", "date_format"],
BuiltinScalarFunction::FromUnixtime => &["from_unixtime"],

Expand Down
2 changes: 0 additions & 2 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,6 @@ nary_scalar_expr!(
);

// date functions
scalar_expr!(DatePart, date_part, part date, "extracts a subfield from the date");
scalar_expr!(DateTrunc, date_trunc, part date, "truncates the date to a specified level of precision");
scalar_expr!(DateBin, date_bin, stride source origin, "coerces an arbitrary timestamp to the start of the nearest specified interval");
scalar_expr!(
Expand Down Expand Up @@ -1327,7 +1326,6 @@ mod test {
test_scalar_expr!(Trim, trim, string);
test_scalar_expr!(Upper, upper, string);

test_scalar_expr!(DatePart, date_part, part, date);
test_scalar_expr!(DateTrunc, date_trunc, part, date);
test_scalar_expr!(DateBin, date_bin, stride, source, origin);
test_scalar_expr!(FromUnixtime, from_unixtime, unixtime);
Expand Down
269 changes: 269 additions & 0 deletions datafusion/functions/src/datetime/date_part.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::sync::Arc;

use arrow::compute::cast;
use arrow::{
array::{Array, ArrayRef, Float64Array, PrimitiveArray},
datatypes::{ArrowNumericType, ArrowTemporalType, DataType},
};
use arrow::{compute::kernels::temporal, datatypes::TimeUnit};
use datafusion_common::cast::{
as_date32_array, as_date64_array, as_timestamp_microsecond_array,
as_timestamp_millisecond_array, as_timestamp_nanosecond_array,
as_timestamp_second_array,
};
use datafusion_common::{exec_err, Result, ScalarValue};
use datafusion_expr::{
ColumnarValue, ScalarUDFImpl, Signature, TypeSignature::Exact, Volatility,
TIMEZONE_WILDCARD,
};
use DataType::{Date32, Date64, Timestamp, Utf8};
use TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};

fn to_ticks<T>(array: &PrimitiveArray<T>, frac: i32) -> Result<Float64Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
let zipped = temporal::second(array)?
.values()
.iter()
.zip(temporal::nanosecond(array)?.values().iter())
.map(|o| ((*o.0 as f64 + (*o.1 as f64) / 1_000_000_000.0) * (frac as f64)))
.collect::<Vec<f64>>();

Ok(Float64Array::from(zipped))
}

fn seconds<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
to_ticks(array, 1)
}

fn millis<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
to_ticks(array, 1_000)
}

fn micros<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
to_ticks(array, 1_000_000)
}

fn nanos<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
to_ticks(array, 1_000_000_000)
}

fn epoch<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
let b = match array.data_type() {
DataType::Timestamp(tu, _) => {
let scale = match tu {
TimeUnit::Second => 1,
TimeUnit::Millisecond => 1_000,
TimeUnit::Microsecond => 1_000_000,
TimeUnit::Nanosecond => 1_000_000_000,
} as f64;
array.unary(|n| {
let n: i64 = n.into();
n as f64 / scale
})
}
DataType::Date32 => {
let seconds_in_a_day = 86400_f64;
array.unary(|n| {
let n: i64 = n.into();
n as f64 * seconds_in_a_day
})
}
DataType::Date64 => array.unary(|n| {
let n: i64 = n.into();
n as f64 / 1_000_f64
}),
_ => return exec_err!("Can not convert {:?} to epoch", array.data_type()),
};
Ok(b)
}

macro_rules! extract_date_part {
($ARRAY: expr, $FN:expr) => {
match $ARRAY.data_type() {
DataType::Date32 => {
let array = as_date32_array($ARRAY)?;
Ok($FN(array)
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
}
DataType::Date64 => {
let array = as_date64_array($ARRAY)?;
Ok($FN(array)
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
}
DataType::Timestamp(time_unit, _) => match time_unit {
TimeUnit::Second => {
let array = as_timestamp_second_array($ARRAY)?;
Ok($FN(array)
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
}
TimeUnit::Millisecond => {
let array = as_timestamp_millisecond_array($ARRAY)?;
Ok($FN(array)
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
}
TimeUnit::Microsecond => {
let array = as_timestamp_microsecond_array($ARRAY)?;
Ok($FN(array)
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
}
TimeUnit::Nanosecond => {
let array = as_timestamp_nanosecond_array($ARRAY)?;
Ok($FN(array)
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
}
},
datatype => exec_err!("Extract does not support datatype {:?}", datatype),
}
};
}

#[derive(Debug)]
pub struct DatePartFunc {
signature: Signature,
aliases: Vec<String>,
}

impl DatePartFunc {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![
Exact(vec![Utf8, Timestamp(Nanosecond, None)]),
Exact(vec![
Utf8,
Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![Utf8, Timestamp(Millisecond, None)]),
Exact(vec![
Utf8,
Timestamp(Millisecond, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![Utf8, Timestamp(Microsecond, None)]),
Exact(vec![
Utf8,
Timestamp(Microsecond, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![Utf8, Timestamp(Second, None)]),
Exact(vec![
Utf8,
Timestamp(Second, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![Utf8, Date64]),
Exact(vec![Utf8, Date32]),
],
Volatility::Immutable,
),
aliases: vec![String::from("date_part"), String::from("datepart")],
}
}
}

impl ScalarUDFImpl for DatePartFunc {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"date_part"
}

fn aliases(&self) -> &[String] {
&self.aliases
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Float64)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 {
return exec_err!("Expected two arguments in DATE_PART");
}
let (date_part, array) = (&args[0], &args[1]);

let date_part =
if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = date_part {
v
} else {
return exec_err!(
"First argument of `DATE_PART` must be non-null scalar Utf8"
);
};

let is_scalar = matches!(array, ColumnarValue::Scalar(_));

let array = match array {
ColumnarValue::Array(array) => array.clone(),
ColumnarValue::Scalar(scalar) => scalar.to_array()?,
};

let arr = match date_part.to_lowercase().as_str() {
"year" => extract_date_part!(&array, temporal::year),
"quarter" => extract_date_part!(&array, temporal::quarter),
"month" => extract_date_part!(&array, temporal::month),
"week" => extract_date_part!(&array, temporal::week),
"day" => extract_date_part!(&array, temporal::day),
"doy" => extract_date_part!(&array, temporal::doy),
"dow" => extract_date_part!(&array, temporal::num_days_from_sunday),
"hour" => extract_date_part!(&array, temporal::hour),
"minute" => extract_date_part!(&array, temporal::minute),
"second" => extract_date_part!(&array, seconds),
"millisecond" => extract_date_part!(&array, millis),
"microsecond" => extract_date_part!(&array, micros),
"nanosecond" => extract_date_part!(&array, nanos),
"epoch" => extract_date_part!(&array, epoch),
_ => exec_err!("Date part '{date_part}' not supported"),
}?;

Ok(if is_scalar {
ColumnarValue::Scalar(ScalarValue::try_from_array(&arr?, 0)?)
} else {
ColumnarValue::Array(arr?)
})
}
}
7 changes: 7 additions & 0 deletions datafusion/functions/src/datetime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ use std::sync::Arc;
use datafusion_expr::ScalarUDF;

mod common;
pub mod date_part;
mod to_date;
mod to_timestamp;
mod to_unixtime;

// create UDFs
make_udf_function!(to_date::ToDateFunc, TO_DATE, to_date);
make_udf_function!(date_part::DatePartFunc, DATE_PART, date_part);
make_udf_function!(to_unixtime::ToUnixtimeFunc, TO_UNIXTIME, to_unixtime);
make_udf_function!(to_timestamp::ToTimestampFunc, TO_TIMESTAMP, to_timestamp);
make_udf_function!(
Expand Down Expand Up @@ -107,6 +109,10 @@ pub mod expr_fn {
super::to_date().call(args)
}

pub fn date_part(args: Vec<Expr>) -> Expr {
super::date_part().call(args)
}

#[doc = "converts a string and optional formats to a Unixtime"]
pub fn to_unixtime(args: Vec<Expr>) -> Expr {
super::to_unixtime().call(args)
Expand Down Expand Up @@ -142,6 +148,7 @@ pub mod expr_fn {
pub fn functions() -> Vec<Arc<ScalarUDF>> {
vec![
to_date(),
date_part(),
to_unixtime(),
to_timestamp(),
to_timestamp_seconds(),
Expand Down
Loading

0 comments on commit 261b500

Please sign in to comment.