Skip to content

Commit

Permalink
add uuid() function to return a unique uuid per row
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu authored and jimexist committed Nov 2, 2022
1 parent 97f2e4f commit 5431596
Show file tree
Hide file tree
Showing 13 changed files with 75 additions and 4 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions datafusion/core/tests/sql/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,16 @@ async fn test_random_expression() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_uuid_expression() -> Result<()> {
let ctx = create_ctx();
let sql = "SELECT uuid()";
let actual = execute(&ctx, sql).await;
let uuid = actual[0][0].parse::<uuid::Uuid>().unwrap();
assert_eq!(uuid.get_version_num(), 4);
Ok(())
}

#[tokio::test]
async fn case_with_bool_type_result() -> Result<()> {
let ctx = SessionContext::new();
Expand Down
5 changes: 5 additions & 0 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ pub enum BuiltinScalarFunction {
Trim,
/// upper
Upper,
/// uuid
Uuid,
/// regexp_match
RegexpMatch,
/// struct
Expand All @@ -184,6 +186,7 @@ impl BuiltinScalarFunction {
| BuiltinScalarFunction::Now
| BuiltinScalarFunction::CurrentDate
| BuiltinScalarFunction::CurrentTime
| BuiltinScalarFunction::Uuid
)
}
/// Returns the [Volatility] of the builtin function.
Expand Down Expand Up @@ -266,6 +269,7 @@ impl BuiltinScalarFunction {

// Volatile builtin functions
BuiltinScalarFunction::Random => Volatility::Volatile,
BuiltinScalarFunction::Uuid => Volatility::Volatile,
}
}
}
Expand Down Expand Up @@ -358,6 +362,7 @@ impl FromStr for BuiltinScalarFunction {
"translate" => BuiltinScalarFunction::Translate,
"trim" => BuiltinScalarFunction::Trim,
"upper" => BuiltinScalarFunction::Upper,
"uuid" => BuiltinScalarFunction::Uuid,
"regexp_match" => BuiltinScalarFunction::RegexpMatch,
"struct" => BuiltinScalarFunction::Struct,
"from_unixtime" => BuiltinScalarFunction::FromUnixtime,
Expand Down
21 changes: 20 additions & 1 deletion datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ pub fn array(args: Vec<Expr>) -> Expr {
/// which is not NULL
pub fn coalesce(args: Vec<Expr>) -> Expr {
Expr::ScalarFunction {
fun: built_in_function::BuiltinScalarFunction::Coalesce,
fun: BuiltinScalarFunction::Coalesce,
args,
}
}
Expand All @@ -460,6 +460,14 @@ pub fn current_date() -> Expr {
}
}

/// Returns uuid v4 as a string value
pub fn uuid() -> Expr {
Expr::ScalarFunction {
fun: BuiltinScalarFunction::Uuid,
args: vec![],
}
}

/// Returns current UTC time as a [`DataType::Time64`] value
pub fn current_time() -> Expr {
Expr::ScalarFunction {
Expand Down Expand Up @@ -684,6 +692,17 @@ mod test {
test_unary_scalar_expr!(ArrowTypeof, arrow_typeof);
}

#[test]
fn uuid_function_definitions() {
if let Expr::ScalarFunction { fun, args } = uuid() {
let name = built_in_function::BuiltinScalarFunction::Uuid;
assert_eq!(name, fun);
assert_eq!(0, args.len());
} else {
assert!(false, "unexpected: {:?}", uuid());
}
}

#[test]
fn digest_function_definitions() {
if let Expr::ScalarFunction { fun, args } = digest(col("tableA.a"), lit("md5")) {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ pub fn return_type(
utf8_to_int_type(&input_expr_types[0], "octet_length")
}
BuiltinScalarFunction::Random => Ok(DataType::Float64),
BuiltinScalarFunction::Uuid => Ok(DataType::Utf8),
BuiltinScalarFunction::RegexpReplace => {
utf8_to_str_type(&input_expr_types[0], "regex_replace")
}
Expand Down Expand Up @@ -580,6 +581,7 @@ pub fn signature(fun: &BuiltinScalarFunction) -> Signature {
fun.volatility(),
),
BuiltinScalarFunction::Random => Signature::exact(vec![], fun.volatility()),
BuiltinScalarFunction::Uuid => Signature::exact(vec![], fun.volatility()),
BuiltinScalarFunction::Power => Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Int64, DataType::Int64]),
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ rand = "0.8"
regex = { version = "^1.4.3", optional = true }
sha2 = { version = "^0.10.1", optional = true }
unicode-segmentation = { version = "^1.7.1", optional = true }
uuid = { version = "^1.2", features = ["v4"] }
7 changes: 6 additions & 1 deletion datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ pub fn create_physical_fun(
))),
}),
BuiltinScalarFunction::Upper => Arc::new(string_expressions::upper),
BuiltinScalarFunction::Uuid => Arc::new(string_expressions::uuid),
_ => {
return Err(DataFusionError::Internal(format!(
"create_physical_fun: Unsupported scalar function {:?}",
Expand Down Expand Up @@ -2760,7 +2761,11 @@ mod tests {
let execution_props = ExecutionProps::new();
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);

let funs = [BuiltinScalarFunction::Now, BuiltinScalarFunction::Random];
let funs = [
BuiltinScalarFunction::Now,
BuiltinScalarFunction::Random,
BuiltinScalarFunction::Uuid,
];

for fun in funs.iter() {
create_physical_expr_with_type_coercion(fun, &[], &schema, &execution_props)?;
Expand Down
19 changes: 19 additions & 0 deletions datafusion/physical-expr/src/string_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::ColumnarValue;
use std::any::type_name;
use std::iter;
use std::sync::Arc;
use uuid::Uuid;

macro_rules! downcast_string_arg {
($ARG:expr, $NAME:expr, $T:ident) => {{
Expand Down Expand Up @@ -586,3 +588,20 @@ where
pub fn upper(args: &[ColumnarValue]) -> Result<ColumnarValue> {
handle(args, |string| string.to_ascii_uppercase(), "upper")
}

/// Prints random (v4) uuid values per row
/// uuid() = 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'
pub fn uuid(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let len: usize = match &args[0] {
ColumnarValue::Array(array) => array.len(),
_ => {
return Err(DataFusionError::Internal(
"Expect uuid function to take no param".to_string(),
))
}
};

let values = iter::repeat_with(|| Uuid::new_v4().to_string()).take(len);
let array = GenericStringArray::<i32>::from_iter_values(values);
Ok(ColumnarValue::Array(Arc::new(array)))
}
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ enum ScalarFunction {
ArrowTypeof=69;
CurrentDate=70;
CurrentTime=71;
Uuid=72;
}

message ScalarFunctionNode {
Expand Down
6 changes: 4 additions & 2 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ use datafusion_expr::{
regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, sha224, sha256,
sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr,
substring, tan, to_hex, to_timestamp_micros, to_timestamp_millis,
to_timestamp_seconds, translate, trim, trunc, upper, AggregateFunction, Between,
BuiltInWindowFunction, BuiltinScalarFunction, Case, Expr, GetIndexedField,
to_timestamp_seconds, translate, trim, trunc, upper, uuid, AggregateFunction,
Between, BuiltInWindowFunction, BuiltinScalarFunction, Case, Expr, GetIndexedField,
GroupingSet,
GroupingSet::GroupingSets,
Like, Operator, WindowFrame, WindowFrameBound, WindowFrameUnits,
Expand Down Expand Up @@ -431,6 +431,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
ScalarFunction::Now => Self::Now,
ScalarFunction::CurrentDate => Self::CurrentDate,
ScalarFunction::CurrentTime => Self::CurrentTime,
ScalarFunction::Uuid => Self::Uuid,
ScalarFunction::Translate => Self::Translate,
ScalarFunction::RegexpMatch => Self::RegexpMatch,
ScalarFunction::Coalesce => Self::Coalesce,
Expand Down Expand Up @@ -970,6 +971,7 @@ pub fn parse_expr(
parse_expr(&args[1], registry)?,
)),
ScalarFunction::Random => Ok(random()),
ScalarFunction::Uuid => Ok(uuid()),
ScalarFunction::Repeat => Ok(repeat(
parse_expr(&args[0], registry)?,
parse_expr(&args[1], registry)?,
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
BuiltinScalarFunction::Left => Self::Left,
BuiltinScalarFunction::Lpad => Self::Lpad,
BuiltinScalarFunction::Random => Self::Random,
BuiltinScalarFunction::Uuid => Self::Uuid,
BuiltinScalarFunction::RegexpReplace => Self::RegexpReplace,
BuiltinScalarFunction::Repeat => Self::Repeat,
BuiltinScalarFunction::Replace => Self::Replace,
Expand Down

0 comments on commit 5431596

Please sign in to comment.