From 54315961191c112bff686b2cbdb51edda08955f7 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Mon, 31 Oct 2022 17:22:51 +0800 Subject: [PATCH] add uuid() function to return a unique uuid per row --- datafusion-cli/Cargo.lock | 1 + datafusion/core/tests/sql/expr.rs | 10 +++++++++ datafusion/expr/src/built_in_function.rs | 5 +++++ datafusion/expr/src/expr_fn.rs | 21 ++++++++++++++++++- datafusion/expr/src/function.rs | 2 ++ datafusion/physical-expr/Cargo.toml | 1 + datafusion/physical-expr/src/functions.rs | 7 ++++++- .../physical-expr/src/string_expressions.rs | 19 +++++++++++++++++ datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/from_proto.rs | 6 ++++-- datafusion/proto/src/generated/pbjson.rs | 3 +++ datafusion/proto/src/generated/prost.rs | 2 ++ datafusion/proto/src/to_proto.rs | 1 + 13 files changed, 75 insertions(+), 4 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index db724780e65a3..a693295e9e719 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -662,6 +662,7 @@ dependencies = [ "regex", "sha2", "unicode-segmentation", + "uuid", ] [[package]] diff --git a/datafusion/core/tests/sql/expr.rs b/datafusion/core/tests/sql/expr.rs index b7e685592e9f4..8251a554657a9 100644 --- a/datafusion/core/tests/sql/expr.rs +++ b/datafusion/core/tests/sql/expr.rs @@ -1179,6 +1179,16 @@ async fn test_random_expression() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_uuid_expression() -> Result<()> { + let ctx = create_ctx(); + let sql = "SELECT uuid()"; + let actual = execute(&ctx, sql).await; + let uuid = actual[0][0].parse::().unwrap(); + assert_eq!(uuid.get_version_num(), 4); + Ok(()) +} + #[tokio::test] async fn case_with_bool_type_result() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index c8e144718ff7f..d78d382628ac0 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -166,6 +166,8 @@ pub enum BuiltinScalarFunction { Trim, /// upper Upper, + /// uuid + Uuid, /// regexp_match RegexpMatch, /// struct @@ -184,6 +186,7 @@ impl BuiltinScalarFunction { | BuiltinScalarFunction::Now | BuiltinScalarFunction::CurrentDate | BuiltinScalarFunction::CurrentTime + | BuiltinScalarFunction::Uuid ) } /// Returns the [Volatility] of the builtin function. @@ -266,6 +269,7 @@ impl BuiltinScalarFunction { // Volatile builtin functions BuiltinScalarFunction::Random => Volatility::Volatile, + BuiltinScalarFunction::Uuid => Volatility::Volatile, } } } @@ -358,6 +362,7 @@ impl FromStr for BuiltinScalarFunction { "translate" => BuiltinScalarFunction::Translate, "trim" => BuiltinScalarFunction::Trim, "upper" => BuiltinScalarFunction::Upper, + "uuid" => BuiltinScalarFunction::Uuid, "regexp_match" => BuiltinScalarFunction::RegexpMatch, "struct" => BuiltinScalarFunction::Struct, "from_unixtime" => BuiltinScalarFunction::FromUnixtime, diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 006bcac5e9c0c..8480a9376c260 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -438,7 +438,7 @@ pub fn array(args: Vec) -> Expr { /// which is not NULL pub fn coalesce(args: Vec) -> Expr { Expr::ScalarFunction { - fun: built_in_function::BuiltinScalarFunction::Coalesce, + fun: BuiltinScalarFunction::Coalesce, args, } } @@ -460,6 +460,14 @@ pub fn current_date() -> Expr { } } +/// Returns uuid v4 as a string value +pub fn uuid() -> Expr { + Expr::ScalarFunction { + fun: BuiltinScalarFunction::Uuid, + args: vec![], + } +} + /// Returns current UTC time as a [`DataType::Time64`] value pub fn current_time() -> Expr { Expr::ScalarFunction { @@ -684,6 +692,17 @@ mod test { test_unary_scalar_expr!(ArrowTypeof, arrow_typeof); } + #[test] + fn uuid_function_definitions() { + if let Expr::ScalarFunction { fun, args } = uuid() { + let name = built_in_function::BuiltinScalarFunction::Uuid; + assert_eq!(name, fun); + assert_eq!(0, args.len()); + } else { + assert!(false, "unexpected: {:?}", uuid()); + } + } + #[test] fn digest_function_definitions() { if let Expr::ScalarFunction { fun, args } = digest(col("tableA.a"), lit("md5")) { diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs index de5ab0e1a36d7..515cce980fd92 100644 --- a/datafusion/expr/src/function.rs +++ b/datafusion/expr/src/function.rs @@ -157,6 +157,7 @@ pub fn return_type( utf8_to_int_type(&input_expr_types[0], "octet_length") } BuiltinScalarFunction::Random => Ok(DataType::Float64), + BuiltinScalarFunction::Uuid => Ok(DataType::Utf8), BuiltinScalarFunction::RegexpReplace => { utf8_to_str_type(&input_expr_types[0], "regex_replace") } @@ -580,6 +581,7 @@ pub fn signature(fun: &BuiltinScalarFunction) -> Signature { fun.volatility(), ), BuiltinScalarFunction::Random => Signature::exact(vec![], fun.volatility()), + BuiltinScalarFunction::Uuid => Signature::exact(vec![], fun.volatility()), BuiltinScalarFunction::Power => Signature::one_of( vec![ TypeSignature::Exact(vec![DataType::Int64, DataType::Int64]), diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index d281b9543b281..a7482072ba37e 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -57,3 +57,4 @@ rand = "0.8" regex = { version = "^1.4.3", optional = true } sha2 = { version = "^0.10.1", optional = true } unicode-segmentation = { version = "^1.7.1", optional = true } +uuid = { version = "^1.2", features = ["v4"] } diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index cd1d31544371e..000991d079398 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -760,6 +760,7 @@ pub fn create_physical_fun( ))), }), BuiltinScalarFunction::Upper => Arc::new(string_expressions::upper), + BuiltinScalarFunction::Uuid => Arc::new(string_expressions::uuid), _ => { return Err(DataFusionError::Internal(format!( "create_physical_fun: Unsupported scalar function {:?}", @@ -2760,7 +2761,11 @@ mod tests { let execution_props = ExecutionProps::new(); let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - let funs = [BuiltinScalarFunction::Now, BuiltinScalarFunction::Random]; + let funs = [ + BuiltinScalarFunction::Now, + BuiltinScalarFunction::Random, + BuiltinScalarFunction::Uuid, + ]; for fun in funs.iter() { create_physical_expr_with_type_coercion(fun, &[], &schema, &execution_props)?; diff --git a/datafusion/physical-expr/src/string_expressions.rs b/datafusion/physical-expr/src/string_expressions.rs index c13a853bb07b0..5733234f870b0 100644 --- a/datafusion/physical-expr/src/string_expressions.rs +++ b/datafusion/physical-expr/src/string_expressions.rs @@ -32,7 +32,9 @@ use datafusion_common::ScalarValue; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::ColumnarValue; use std::any::type_name; +use std::iter; use std::sync::Arc; +use uuid::Uuid; macro_rules! downcast_string_arg { ($ARG:expr, $NAME:expr, $T:ident) => {{ @@ -586,3 +588,20 @@ where pub fn upper(args: &[ColumnarValue]) -> Result { handle(args, |string| string.to_ascii_uppercase(), "upper") } + +/// Prints random (v4) uuid values per row +/// uuid() = 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11' +pub fn uuid(args: &[ColumnarValue]) -> Result { + let len: usize = match &args[0] { + ColumnarValue::Array(array) => array.len(), + _ => { + return Err(DataFusionError::Internal( + "Expect uuid function to take no param".to_string(), + )) + } + }; + + let values = iter::repeat_with(|| Uuid::new_v4().to_string()).take(len); + let array = GenericStringArray::::from_iter_values(values); + Ok(ColumnarValue::Array(Arc::new(array))) +} diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 083f24502354b..3cb9763d317fc 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -497,6 +497,7 @@ enum ScalarFunction { ArrowTypeof=69; CurrentDate=70; CurrentTime=71; + Uuid=72; } message ScalarFunctionNode { diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index 708fb51ecbcea..775acc3a0aa8d 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -41,8 +41,8 @@ use datafusion_expr::{ regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, substring, tan, to_hex, to_timestamp_micros, to_timestamp_millis, - to_timestamp_seconds, translate, trim, trunc, upper, AggregateFunction, Between, - BuiltInWindowFunction, BuiltinScalarFunction, Case, Expr, GetIndexedField, + to_timestamp_seconds, translate, trim, trunc, upper, uuid, AggregateFunction, + Between, BuiltInWindowFunction, BuiltinScalarFunction, Case, Expr, GetIndexedField, GroupingSet, GroupingSet::GroupingSets, Like, Operator, WindowFrame, WindowFrameBound, WindowFrameUnits, @@ -431,6 +431,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::Now => Self::Now, ScalarFunction::CurrentDate => Self::CurrentDate, ScalarFunction::CurrentTime => Self::CurrentTime, + ScalarFunction::Uuid => Self::Uuid, ScalarFunction::Translate => Self::Translate, ScalarFunction::RegexpMatch => Self::RegexpMatch, ScalarFunction::Coalesce => Self::Coalesce, @@ -970,6 +971,7 @@ pub fn parse_expr( parse_expr(&args[1], registry)?, )), ScalarFunction::Random => Ok(random()), + ScalarFunction::Uuid => Ok(uuid()), ScalarFunction::Repeat => Ok(repeat( parse_expr(&args[0], registry)?, parse_expr(&args[1], registry)?, diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 3557dee467e46..2aee09ab37f2f 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -9622,6 +9622,7 @@ impl serde::Serialize for ScalarFunction { Self::ArrowTypeof => "ArrowTypeof", Self::CurrentDate => "CurrentDate", Self::CurrentTime => "CurrentTime", + Self::Uuid => "Uuid", }; serializer.serialize_str(variant) } @@ -9705,6 +9706,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "ArrowTypeof", "CurrentDate", "CurrentTime", + "Uuid", ]; struct GeneratedVisitor; @@ -9819,6 +9821,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "ArrowTypeof" => Ok(ScalarFunction::ArrowTypeof), "CurrentDate" => Ok(ScalarFunction::CurrentDate), "CurrentTime" => Ok(ScalarFunction::CurrentTime), + "Uuid" => Ok(ScalarFunction::Uuid), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index d404f02759e7c..4faa08fece27a 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1226,6 +1226,7 @@ pub enum ScalarFunction { ArrowTypeof = 69, CurrentDate = 70, CurrentTime = 71, + Uuid = 72, } impl ScalarFunction { /// String value of the enum field names used in the ProtoBuf definition. @@ -1306,6 +1307,7 @@ impl ScalarFunction { ScalarFunction::ArrowTypeof => "ArrowTypeof", ScalarFunction::CurrentDate => "CurrentDate", ScalarFunction::CurrentTime => "CurrentTime", + ScalarFunction::Uuid => "Uuid", } } } diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs index 1aea38f3a1207..44f27429afb46 100644 --- a/datafusion/proto/src/to_proto.rs +++ b/datafusion/proto/src/to_proto.rs @@ -1164,6 +1164,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::Left => Self::Left, BuiltinScalarFunction::Lpad => Self::Lpad, BuiltinScalarFunction::Random => Self::Random, + BuiltinScalarFunction::Uuid => Self::Uuid, BuiltinScalarFunction::RegexpReplace => Self::RegexpReplace, BuiltinScalarFunction::Repeat => Self::Repeat, BuiltinScalarFunction::Replace => Self::Replace,