Skip to content

Commit

Permalink
[task apache#8987]add_to_date_function
Browse files Browse the repository at this point in the history
Signed-off-by: tangruilin <[email protected]>
  • Loading branch information
Tangruilin committed Jan 31, 2024
1 parent d6d35f7 commit da55c5d
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 36 deletions.
7 changes: 7 additions & 0 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use arrow::{
},
};
use arrow_array::cast::as_list_array;
use arrow_array::types::Date32Type;

/// A dynamically typed, nullable single value, (the single-valued counter-part
/// to arrow's [`Array`])
Expand Down Expand Up @@ -3239,6 +3240,12 @@ impl ScalarType<i64> for TimestampNanosecondType {
}
}

impl ScalarType<i32> for Date32Type {
fn scalar(r: Option<i32>) -> ScalarValue {
ScalarValue::Date32(r)
}
}

#[cfg(test)]
mod tests {
use std::cmp::Ordering;
Expand Down
6 changes: 6 additions & 0 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ pub enum BuiltinScalarFunction {
ToTimestampSeconds,
/// from_unixtime
FromUnixtime,
/// to_date
ToDate,
///now
Now,
///current_date
Expand Down Expand Up @@ -487,6 +489,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::RegexpMatch => Volatility::Immutable,
BuiltinScalarFunction::Struct => Volatility::Immutable,
BuiltinScalarFunction::FromUnixtime => Volatility::Immutable,
BuiltinScalarFunction::ToDate => Volatility::Immutable,
BuiltinScalarFunction::ArrowTypeof => Volatility::Immutable,
BuiltinScalarFunction::OverLay => Volatility::Immutable,
BuiltinScalarFunction::Levenshtein => Volatility::Immutable,
Expand Down Expand Up @@ -802,6 +805,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ToTimestampMicros => Ok(Timestamp(Microsecond, None)),
BuiltinScalarFunction::ToTimestampSeconds => Ok(Timestamp(Second, None)),
BuiltinScalarFunction::FromUnixtime => Ok(Timestamp(Second, None)),
BuiltinScalarFunction::ToDate => Ok(Date32),
BuiltinScalarFunction::Now => {
Ok(Timestamp(Nanosecond, Some("+00:00".into())))
}
Expand Down Expand Up @@ -1050,6 +1054,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::FromUnixtime => {
Signature::uniform(1, vec![Int64], self.volatility())
}
BuiltinScalarFunction::ToDate => Signature::variadic_any(self.volatility()),
BuiltinScalarFunction::Digest => Signature::one_of(
vec![
Exact(vec![Utf8, Utf8]),
Expand Down Expand Up @@ -1494,6 +1499,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ToTimestampSeconds => &["to_timestamp_seconds"],
BuiltinScalarFunction::ToTimestampNanos => &["to_timestamp_nanos"],
BuiltinScalarFunction::FromUnixtime => &["from_unixtime"],
BuiltinScalarFunction::ToDate => &["to_date"],

// hashing functions
BuiltinScalarFunction::Digest => &["digest"],
Expand Down
5 changes: 5 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,11 @@ nary_scalar_expr!(
scalar_expr!(DatePart, date_part, part date, "extracts a subfield from the date");
scalar_expr!(DateTrunc, date_trunc, part date, "truncates the date to a specified level of precision");
scalar_expr!(DateBin, date_bin, stride source origin, "coerces an arbitrary timestamp to the start of the nearest specified interval");
nary_scalar_expr!(
ToDate,
to_date,
"converts a string and optional formats to a `Date32`"
);
nary_scalar_expr!(
ToTimestamp,
to_timestamp,
Expand Down
71 changes: 71 additions & 0 deletions datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,42 @@ fn string_to_timestamp_nanos_shim(s: &str) -> Result<i64> {
string_to_timestamp_nanos(s).map_err(|e| e.into())
}

fn to_date_impl(args: &[ColumnarValue], name: &str) -> Result<ColumnarValue> {
match args.len() {
1 => handle::<Date32Type, _, Date32Type>(
args,
|s| {
string_to_timestamp_nanos_shim(s)
.map(|n| n / (1_000_000 * 24 * 60 * 60 * 1_000))
.and_then(|v| {
v.try_into().map_err(|_| {
DataFusionError::NotImplemented("()".to_string())
})
})
},
name,
),
n if n >= 2 => handle_multiple::<Date32Type, _, Date32Type, _>(
args,
|s, format| {
string_to_timestamp_nanos_formatted(s, format)
.map(|n| {
println!("{n}");
n / (1_000_000 * 24 * 60 * 60 * 1_000)
})
.and_then(|v| {
v.try_into().map_err(|_| {
DataFusionError::NotImplemented("()".to_string())
})
})
},
|n| n,
name,
),
_ => internal_err!("Unsupported 0 argument count for function {name}"),
}
}

fn to_timestamp_impl<T: ArrowTimestampType + ScalarType<i64>>(
args: &[ColumnarValue],
name: &str,
Expand Down Expand Up @@ -423,6 +459,11 @@ fn to_timestamp_impl<T: ArrowTimestampType + ScalarType<i64>>(
}
}

/// to_date SQL function
pub fn to_date(args: &[ColumnarValue]) -> Result<ColumnarValue> {
to_date_impl(args, "to_date")
}

/// to_timestamp SQL function
///
/// Note: `to_timestamp` returns `Timestamp(Nanosecond)` though its arguments are interpreted as **seconds**.
Expand Down Expand Up @@ -1308,6 +1349,36 @@ fn validate_to_timestamp_data_types(
None
}

/// to_date SQL function implementation
pub fn to_date_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.is_empty() {
return exec_err!(
"to_date function requires 1 or more arguments, got {}",
args.len()
);
}

// validate that any args after the first one are Utf8
if args.len() > 1 {
if let Some(value) = validate_to_timestamp_data_types(args, "to_date") {
return value;
}
}

match args[0].data_type() {
DataType::Int32
| DataType::Int64
| DataType::Null
| DataType::Float64
| DataType::Date32
| DataType::Date64 => cast_column(&args[0], &DataType::Date32, None),
DataType::Utf8 => to_date(args),
other => {
internal_err!("Unsupported data type {:?} for function to_date", other)
}
}
}

/// to_timestamp() SQL function implementation
pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.is_empty() {
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ pub fn create_physical_fun(
BuiltinScalarFunction::FromUnixtime => {
Arc::new(datetime_expressions::from_unixtime_invoke)
}
BuiltinScalarFunction::ToDate => Arc::new(datetime_expressions::to_date_invoke),
BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() {
DataType::Utf8 => {
make_scalar_function_inner(string_expressions::initcap::<i32>)(args)
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ enum ScalarFunction {
EndsWith = 131;
InStr = 132;
MakeDate = 133;
ToDate = 134;
}

message ScalarFunctionNode {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit da55c5d

Please sign in to comment.