From b0ce95d2205e45e3921adc1201de17c6bf8f0427 Mon Sep 17 00:00:00 2001 From: Mustafa akur <106137913+mustafasrepo@users.noreply.github.com> Date: Thu, 27 Oct 2022 17:28:12 +0300 Subject: [PATCH] Informative Error Message for LAG and LEAD functions (#3963) * panic is changed to error for lag and lead functions * remove unnecessary changes * remove unnecessary changes --- .../core/src/physical_plan/windows/mod.rs | 24 ++++++++++++------- datafusion/core/tests/sql/window.rs | 20 ++++++++++++++++ 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index be9421a9de843..95582b2119de6 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -72,13 +72,19 @@ pub fn create_window_expr( fn get_scalar_value_from_args( args: &[Arc], index: usize, -) -> Option { - args.get(index).map(|v| { - v.as_any() +) -> Result> { + Ok(if let Some(field) = args.get(index) { + let tmp = field + .as_any() .downcast_ref::() - .unwrap() + .ok_or_else(|| DataFusionError::NotImplemented( + format!("There is only support Literal types for field at idx: {} in Window Function", index), + ))? .value() - .clone() + .clone(); + Some(tmp) + } else { + None }) } @@ -98,20 +104,20 @@ fn create_built_in_window_expr( let coerced_args = coerce(args, input_schema, &signature_for_built_in(fun))?; let arg = coerced_args[0].clone(); let data_type = args[0].data_type(input_schema)?; - let shift_offset = get_scalar_value_from_args(&coerced_args, 1) + let shift_offset = get_scalar_value_from_args(&coerced_args, 1)? .map(|v| v.try_into()) .and_then(|v| v.ok()); - let default_value = get_scalar_value_from_args(&coerced_args, 2); + let default_value = get_scalar_value_from_args(&coerced_args, 2)?; Arc::new(lag(name, data_type, arg, shift_offset, default_value)) } BuiltInWindowFunction::Lead => { let coerced_args = coerce(args, input_schema, &signature_for_built_in(fun))?; let arg = coerced_args[0].clone(); let data_type = args[0].data_type(input_schema)?; - let shift_offset = get_scalar_value_from_args(&coerced_args, 1) + let shift_offset = get_scalar_value_from_args(&coerced_args, 1)? .map(|v| v.try_into()) .and_then(|v| v.ok()); - let default_value = get_scalar_value_from_args(&coerced_args, 2); + let default_value = get_scalar_value_from_args(&coerced_args, 2)?; Arc::new(lead(name, data_type, arg, shift_offset, default_value)) } BuiltInWindowFunction::NthValue => { diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index d9ede97718585..48f5a08dd55cc 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -1210,6 +1210,26 @@ async fn window_frame_groups_query() -> Result<()> { Ok(()) } +#[tokio::test] +async fn window_frame_lag() -> Result<()> { + let ctx = SessionContext::new(); + register_aggregate_csv(&ctx).await?; + // execute the query + let df = ctx + .sql( + "SELECT c2, + lag(c2, c2, c2) OVER () as lag1 + FROM aggregate_test_100;", + ) + .await?; + let err = df.collect().await.unwrap_err(); + assert_eq!( + err.to_string(), + "This feature is not implemented: There is only support Literal types for field at idx: 1 in Window Function".to_owned() + ); + Ok(()) +} + #[tokio::test] async fn window_frame_creation() -> Result<()> { let ctx = SessionContext::new();