diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 578f18524779..e0c7c4391b25 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -754,9 +754,9 @@ dependencies = [ [[package]] name = "blake3" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0231f06152bf547e9c2b5194f247cd97aacf6dcd8b15d8e5ec0663f64580da87" +checksum = "30cca6d3674597c30ddf2c587bf8d9d65c9a84d2326d941cc79c9842dfe0ef52" dependencies = [ "arrayref", "arrayvec", @@ -1275,6 +1275,7 @@ dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-expr", + "datafusion-functions", "itertools", "log", "paste", @@ -2648,9 +2649,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.78" +version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e" dependencies = [ "unicode-ident", ] @@ -2777,9 +2778,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" -version = "0.11.25" +version = "0.11.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0eea5a9eb898d3783f17c6407670e3592fd174cb81a10e51d4c37f49450b9946" +checksum = "78bf93c4af7a8bb7d879d51cebe797356ff10ae8516ace542b5182d9dcac10b2" dependencies = [ "base64 0.21.7", "bytes", @@ -3333,20 +3334,20 @@ checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" [[package]] name = "system-configuration" -version = "0.6.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "658bc6ee10a9b4fcf576e9b0819d95ec16f4d2c02d39fd83ac1c8789785c4a42" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" dependencies = [ - "bitflags 2.4.2", + "bitflags 1.3.2", "core-foundation", "system-configuration-sys", ] [[package]] name = "system-configuration-sys" -version = "0.6.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" dependencies = [ "core-foundation-sys", "libc", @@ -3387,18 +3388,18 @@ checksum = "23d434d3f8967a09480fb04132ebe0a3e088c173e6d0ee7897abbdf4eab0f8b9" [[package]] name = "thiserror" -version = "1.0.57" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b" +checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.57" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" +checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" dependencies = [ "proc-macro2", "quote", diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 0feff860fd93..96f5e1c3ffd3 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -207,10 +207,13 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result { let expr = create_physical_name(expr, false)?; Ok(format!("{expr} IS NOT UNKNOWN")) } - Expr::GetIndexedField(GetIndexedField { expr, field }) => { - let expr = create_physical_name(expr, false)?; - let name = match field { - GetFieldAccess::NamedStructField { name } => format!("{expr}[{name}]"), + Expr::GetIndexedField(GetIndexedField { expr: _, field }) => { + match field { + GetFieldAccess::NamedStructField { name: _ } => { + unreachable!( + "NamedStructField should have been rewritten in OperatorToFunction" + ) + } GetFieldAccess::ListIndex { key: _ } => { unreachable!( "ListIndex should have been rewritten in OperatorToFunction" @@ -222,12 +225,10 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result { stride: _, } => { unreachable!( - "ListIndex should have been rewritten in OperatorToFunction" + "ListRange should have been rewritten in OperatorToFunction" ) } }; - - Ok(name) } Expr::ScalarFunction(fun) => { // function should be resolved during `AnalyzerRule`s diff --git a/datafusion/functions-array/Cargo.toml b/datafusion/functions-array/Cargo.toml index ba7d9e26ecaf..99239ffb3bdc 100644 --- a/datafusion/functions-array/Cargo.toml +++ b/datafusion/functions-array/Cargo.toml @@ -44,6 +44,7 @@ arrow-schema = { workspace = true } datafusion-common = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-functions = { workspace = true } itertools = { version = "0.12", features = ["use_std"] } log = { workspace = true } paste = "1.0.14" diff --git a/datafusion/functions-array/src/rewrite.rs b/datafusion/functions-array/src/rewrite.rs index 368fad41af29..a9e79f54a52d 100644 --- a/datafusion/functions-array/src/rewrite.rs +++ b/datafusion/functions-array/src/rewrite.rs @@ -28,6 +28,7 @@ use datafusion_expr::expr_rewriter::FunctionRewrite; use datafusion_expr::{ BinaryExpr, BuiltinScalarFunction, Expr, GetFieldAccess, GetIndexedField, Operator, }; +use datafusion_functions::expr_fn::get_field; /// Rewrites expressions into function calls to array functions pub(crate) struct ArrayFunctionRewriter {} @@ -147,6 +148,15 @@ impl FunctionRewrite for ArrayFunctionRewriter { Transformed::yes(array_prepend(*left, *right)) } + Expr::GetIndexedField(GetIndexedField { + expr, + field: GetFieldAccess::NamedStructField { name }, + }) => { + let expr = *expr.clone(); + let name = Expr::Literal(name); + Transformed::yes(get_field(expr, name.clone())) + } + // expr[idx] ==> array_element(expr, idx) Expr::GetIndexedField(GetIndexedField { expr, diff --git a/datafusion/functions/src/core/getfield.rs b/datafusion/functions/src/core/getfield.rs new file mode 100644 index 000000000000..0a99cccf9e1c --- /dev/null +++ b/datafusion/functions/src/core/getfield.rs @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::DataType; +use arrow_array::{Scalar, StringArray}; +use datafusion_common::cast::{as_map_array, as_struct_array}; +use datafusion_common::{exec_err, ExprSchema, Result, ScalarValue}; +use datafusion_expr::field_util::GetFieldAccessSchema; +use datafusion_expr::{ColumnarValue, Expr, ExprSchemable}; +use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; +use std::any::Any; + +#[derive(Debug)] +pub(super) struct GetFieldFunc { + signature: Signature, +} + +impl GetFieldFunc { + pub fn new() -> Self { + Self { + signature: Signature::any(2, Volatility::Immutable), + } + } +} + +// get_field(struct_array, field_name) +impl ScalarUDFImpl for GetFieldFunc { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "get_field" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + todo!() + } + + fn return_type_from_exprs( + &self, + args: &[Expr], + schema: &dyn ExprSchema, + _arg_types: &[DataType], + ) -> Result { + if args.len() != 2 { + return exec_err!( + "get_field function requires 2 arguments, got {}", + args.len() + ); + } + + let name = match &args[1] { + Expr::Literal(name) => name, + _ => { + return exec_err!( + "get_field function requires the argument field_name to be a string" + ); + } + }; + let access_schema = GetFieldAccessSchema::NamedStructField { name: name.clone() }; + let arg_dt = args[0].get_type(schema)?; + access_schema + .get_accessed_field(&arg_dt) + .map(|f| f.data_type().clone()) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + if args.len() != 2 { + return exec_err!( + "get_field function requires 2 arguments, got {}", + args.len() + ); + } + + let arrays = ColumnarValue::values_to_arrays(args)?; + let array = arrays[0].clone(); + + let name = match &args[1] { + ColumnarValue::Scalar(name) => name, + _ => { + return exec_err!( + "get_field function requires the argument field_name to be a string" + ); + } + }; + match (array.data_type(), name) { + (DataType::Map(_, _), ScalarValue::Utf8(Some(k))) => { + let map_array = as_map_array(array.as_ref())?; + let key_scalar = Scalar::new(StringArray::from(vec![k.clone()])); + let keys = arrow::compute::kernels::cmp::eq(&key_scalar, map_array.keys())?; + let entries = arrow::compute::filter(map_array.entries(), &keys)?; + let entries_struct_array = as_struct_array(entries.as_ref())?; + Ok(ColumnarValue::Array(entries_struct_array.column(1).clone())) + } + (DataType::Struct(_), ScalarValue::Utf8(Some(k))) => { + let as_struct_array = as_struct_array(&array)?; + match as_struct_array.column_by_name(k) { + None => exec_err!( + "get indexed field {k} not found in struct"), + Some(col) => Ok(ColumnarValue::Array(col.clone())) + } + } + (DataType::Struct(_), name) => exec_err!( + "get indexed field is only possible on struct with utf8 indexes. \ + Tried with {name:?} index"), + (dt, name) => exec_err!( + "get indexed field is only possible on lists with int64 indexes or struct \ + with utf8 indexes. Tried {dt:?} with {name:?} index"), + } + } +} diff --git a/datafusion/functions/src/core/mod.rs b/datafusion/functions/src/core/mod.rs index 3f13067a4a07..73cc4d18bf9f 100644 --- a/datafusion/functions/src/core/mod.rs +++ b/datafusion/functions/src/core/mod.rs @@ -18,10 +18,11 @@ //! "core" DataFusion functions mod arrowtypeof; +mod getfield; mod nullif; mod nvl; mod nvl2; -pub mod r#struct; +mod r#struct; // create UDFs make_udf_function!(nullif::NullIfFunc, NULLIF, nullif); @@ -29,6 +30,7 @@ make_udf_function!(nvl::NVLFunc, NVL, nvl); make_udf_function!(nvl2::NVL2Func, NVL2, nvl2); make_udf_function!(arrowtypeof::ArrowTypeOfFunc, ARROWTYPEOF, arrow_typeof); make_udf_function!(r#struct::StructFunc, STRUCT, r#struct); +make_udf_function!(getfield::GetFieldFunc, GET_FIELD, get_field); // Export the functions out of this package, both as expr_fn as well as a list of functions export_functions!( @@ -36,5 +38,6 @@ export_functions!( (nvl, arg_1 arg_2, "returns value2 if value1 is NULL; otherwise it returns value1"), (nvl2, arg_1 arg_2 arg_3, "Returns value2 if value1 is not NULL; otherwise, it returns value3."), (arrow_typeof, arg_1, "Returns the Arrow type of the input expression."), - (r#struct, args, "Returns a struct with the given arguments") + (r#struct, args, "Returns a struct with the given arguments"), + (get_field, arg_1 arg_2, "Returns the value of the field with the given name from the struct") ); diff --git a/datafusion/functions/src/core/struct.rs b/datafusion/functions/src/core/struct.rs index 6236f98794bb..406e402ccd85 100644 --- a/datafusion/functions/src/core/struct.rs +++ b/datafusion/functions/src/core/struct.rs @@ -61,7 +61,7 @@ fn struct_expr(args: &[ColumnarValue]) -> Result { Ok(ColumnarValue::Array(array_struct(arrays.as_slice())?)) } #[derive(Debug)] -pub struct StructFunc { +pub(super) struct StructFunc { signature: Signature, } @@ -73,12 +73,6 @@ impl StructFunc { } } -impl Default for StructFunc { - fn default() -> Self { - Self::new() - } -} - impl ScalarUDFImpl for StructFunc { fn as_any(&self) -> &dyn Any { self diff --git a/datafusion/physical-expr/src/expressions/get_indexed_field.rs b/datafusion/physical-expr/src/expressions/get_indexed_field.rs deleted file mode 100644 index 99b2279ba572..000000000000 --- a/datafusion/physical-expr/src/expressions/get_indexed_field.rs +++ /dev/null @@ -1,245 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! get field of a `ListArray` - -use crate::PhysicalExpr; -use datafusion_common::exec_err; - -use crate::physical_expr::down_cast_any_ref; -use arrow::{ - array::{Array, Scalar, StringArray}, - datatypes::{DataType, Schema}, - record_batch::RecordBatch, -}; -use datafusion_common::{ - cast::{as_map_array, as_struct_array}, - Result, ScalarValue, -}; -use datafusion_expr::{field_util::GetFieldAccessSchema, ColumnarValue}; -use std::fmt::Debug; -use std::hash::{Hash, Hasher}; -use std::{any::Any, sync::Arc}; - -/// Access a sub field of a nested type, such as `Field` or `List` -#[derive(Clone, Hash, Debug)] -pub enum GetFieldAccessExpr { - /// Named field, For example `struct["name"]` - NamedStructField { name: ScalarValue }, -} - -impl std::fmt::Display for GetFieldAccessExpr { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - GetFieldAccessExpr::NamedStructField { name } => write!(f, "[{}]", name), - } - } -} - -impl PartialEq for GetFieldAccessExpr { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| match (self, x) { - ( - GetFieldAccessExpr::NamedStructField { name: lhs }, - GetFieldAccessExpr::NamedStructField { name: rhs }, - ) => lhs.eq(rhs), - }) - .unwrap_or(false) - } -} - -/// Expression to get a field of a struct array. -#[derive(Debug, Hash)] -pub struct GetIndexedFieldExpr { - /// The expression to find - arg: Arc, - /// The key statement - field: GetFieldAccessExpr, -} - -impl GetIndexedFieldExpr { - /// Create new [`GetIndexedFieldExpr`] - pub fn new(arg: Arc, field: GetFieldAccessExpr) -> Self { - Self { arg, field } - } - - /// Create a new [`GetIndexedFieldExpr`] for accessing the named field - pub fn new_field(arg: Arc, name: impl Into) -> Self { - Self::new( - arg, - GetFieldAccessExpr::NamedStructField { - name: ScalarValue::from(name.into()), - }, - ) - } - - /// Get the description of what field should be accessed - pub fn field(&self) -> &GetFieldAccessExpr { - &self.field - } - - /// Get the input expression - pub fn arg(&self) -> &Arc { - &self.arg - } - - fn schema_access(&self, _input_schema: &Schema) -> Result { - Ok(match &self.field { - GetFieldAccessExpr::NamedStructField { name } => { - GetFieldAccessSchema::NamedStructField { name: name.clone() } - } - }) - } -} - -impl std::fmt::Display for GetIndexedFieldExpr { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "({}).{}", self.arg, self.field) - } -} - -impl PhysicalExpr for GetIndexedFieldExpr { - fn as_any(&self) -> &dyn Any { - self - } - - fn data_type(&self, input_schema: &Schema) -> Result { - let arg_dt = self.arg.data_type(input_schema)?; - self.schema_access(input_schema)? - .get_accessed_field(&arg_dt) - .map(|f| f.data_type().clone()) - } - - fn nullable(&self, input_schema: &Schema) -> Result { - let arg_dt = self.arg.data_type(input_schema)?; - self.schema_access(input_schema)? - .get_accessed_field(&arg_dt) - .map(|f| f.is_nullable()) - } - - fn evaluate(&self, batch: &RecordBatch) -> Result { - let array = self.arg.evaluate(batch)?.into_array(batch.num_rows())?; - match &self.field { - GetFieldAccessExpr::NamedStructField{name} => match (array.data_type(), name) { - (DataType::Map(_, _), ScalarValue::Utf8(Some(k))) => { - let map_array = as_map_array(array.as_ref())?; - let key_scalar = Scalar::new(StringArray::from(vec![k.clone()])); - let keys = arrow::compute::kernels::cmp::eq(&key_scalar, map_array.keys())?; - let entries = arrow::compute::filter(map_array.entries(), &keys)?; - let entries_struct_array = as_struct_array(entries.as_ref())?; - Ok(ColumnarValue::Array(entries_struct_array.column(1).clone())) - } - (DataType::Struct(_), ScalarValue::Utf8(Some(k))) => { - let as_struct_array = as_struct_array(&array)?; - match as_struct_array.column_by_name(k) { - None => exec_err!( - "get indexed field {k} not found in struct"), - Some(col) => Ok(ColumnarValue::Array(col.clone())) - } - } - (DataType::Struct(_), name) => exec_err!( - "get indexed field is only possible on struct with utf8 indexes. \ - Tried with {name:?} index"), - (dt, name) => exec_err!( - "get indexed field is only possible on lists with int64 indexes or struct \ - with utf8 indexes. Tried {dt:?} with {name:?} index"), - }, - } - } - - fn children(&self) -> Vec> { - vec![self.arg.clone()] - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> Result> { - Ok(Arc::new(GetIndexedFieldExpr::new( - children[0].clone(), - self.field.clone(), - ))) - } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.hash(&mut s); - } -} - -impl PartialEq for GetIndexedFieldExpr { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| self.arg.eq(&x.arg) && self.field.eq(&x.field)) - .unwrap_or(false) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::expressions::col; - use arrow::array::ArrayRef; - use arrow::array::{BooleanArray, Int64Array, StructArray}; - use arrow::datatypes::Field; - use arrow::datatypes::Fields; - use datafusion_common::cast::as_boolean_array; - use datafusion_common::Result; - - #[test] - fn get_indexed_field_named_struct_field() -> Result<()> { - let schema = struct_schema(); - let boolean = BooleanArray::from(vec![false, false, true, true]); - let int = Int64Array::from(vec![42, 28, 19, 31]); - let struct_array = StructArray::from(vec![ - ( - Arc::new(Field::new("a", DataType::Boolean, true)), - Arc::new(boolean.clone()) as ArrayRef, - ), - ( - Arc::new(Field::new("b", DataType::Int64, true)), - Arc::new(int) as ArrayRef, - ), - ]); - let expr = col("str", &schema).unwrap(); - // only one row should be processed - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)])?; - let expr = Arc::new(GetIndexedFieldExpr::new_field(expr, "a")); - let result = expr - .evaluate(&batch)? - .into_array(1) - .expect("Failed to convert to array"); - let result = - as_boolean_array(&result).expect("failed to downcast to BooleanArray"); - assert_eq!(boolean, result.clone()); - Ok(()) - } - - fn struct_schema() -> Schema { - Schema::new(vec![Field::new_struct( - "str", - Fields::from(vec![ - Field::new("a", DataType::Boolean, true), - Field::new("b", DataType::Int64, true), - ]), - true, - )]) - } -} diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 26d649f57201..7c4ea07dfbcb 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -23,7 +23,6 @@ mod case; mod cast; mod column; mod datum; -mod get_indexed_field; mod in_list; mod is_not_null; mod is_null; @@ -82,7 +81,6 @@ pub use binary::{binary, BinaryExpr}; pub use case::{case, CaseExpr}; pub use cast::{cast, cast_with_options, CastExpr}; pub use column::{col, Column, UnKnownColumn}; -pub use get_indexed_field::{GetFieldAccessExpr, GetIndexedFieldExpr}; pub use in_list::{in_list, InListExpr}; pub use is_not_null::{is_not_null, IsNotNullExpr}; pub use is_null::{is_null, IsNullExpr}; diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index e6022d383e46..241f01a4170a 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -15,9 +15,8 @@ // specific language governing permissions and limitations // under the License. -use crate::expressions::GetFieldAccessExpr; use crate::{ - expressions::{self, binary, like, Column, GetIndexedFieldExpr, Literal}, + expressions::{self, binary, like, Column, Literal}, functions, udf, PhysicalExpr, }; use arrow::datatypes::Schema; @@ -228,10 +227,12 @@ pub fn create_physical_expr( input_dfschema, execution_props, )?), - Expr::GetIndexedField(GetIndexedField { expr, field }) => { - let field = match field { - GetFieldAccess::NamedStructField { name } => { - GetFieldAccessExpr::NamedStructField { name: name.clone() } + Expr::GetIndexedField(GetIndexedField { expr: _, field }) => { + match field { + GetFieldAccess::NamedStructField { name: _ } => { + unreachable!( + "NamedStructField should be rewritten in OperatorToFunction" + ) } GetFieldAccess::ListIndex { key: _ } => { unreachable!("ListIndex should be rewritten in OperatorToFunction") @@ -244,10 +245,6 @@ pub fn create_physical_expr( unreachable!("ListRange should be rewritten in OperatorToFunction") } }; - Ok(Arc::new(GetIndexedFieldExpr::new( - create_physical_expr(expr, input_dfschema, execution_props)?, - field, - ))) } Expr::ScalarFunction(ScalarFunction { func_def, args }) => { diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index b4cab05a28bd..b5683dc1425e 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1452,8 +1452,6 @@ message PhysicalExprNode { PhysicalScalarUdfNode scalar_udf = 16; PhysicalLikeExprNode like_expr = 18; - - PhysicalGetIndexedFieldExprNode get_indexed_field_expr = 19; } } @@ -1868,17 +1866,4 @@ message ColumnStats { Precision max_value = 2; Precision null_count = 3; Precision distinct_count = 4; -} - -message NamedStructFieldExpr { - ScalarValue name = 1; -} - -message PhysicalGetIndexedFieldExprNode { - PhysicalExprNode arg = 1; - oneof field { - NamedStructFieldExpr named_struct_field_expr = 2; - // 3 was list_index_expr - // 4 was list_range_expr - } -} +} \ No newline at end of file diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 7d565e481070..f5be49dc9de7 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -15356,97 +15356,6 @@ impl<'de> serde::Deserialize<'de> for NamedStructField { deserializer.deserialize_struct("datafusion.NamedStructField", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for NamedStructFieldExpr { - #[allow(deprecated)] - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - use serde::ser::SerializeStruct; - let mut len = 0; - if self.name.is_some() { - len += 1; - } - let mut struct_ser = serializer.serialize_struct("datafusion.NamedStructFieldExpr", len)?; - if let Some(v) = self.name.as_ref() { - struct_ser.serialize_field("name", v)?; - } - struct_ser.end() - } -} -impl<'de> serde::Deserialize<'de> for NamedStructFieldExpr { - #[allow(deprecated)] - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - const FIELDS: &[&str] = &[ - "name", - ]; - - #[allow(clippy::enum_variant_names)] - enum GeneratedField { - Name, - } - impl<'de> serde::Deserialize<'de> for GeneratedField { - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - struct GeneratedVisitor; - - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = GeneratedField; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(formatter, "expected one of: {:?}", &FIELDS) - } - - #[allow(unused_variables)] - fn visit_str(self, value: &str) -> std::result::Result - where - E: serde::de::Error, - { - match value { - "name" => Ok(GeneratedField::Name), - _ => Err(serde::de::Error::unknown_field(value, FIELDS)), - } - } - } - deserializer.deserialize_identifier(GeneratedVisitor) - } - } - struct GeneratedVisitor; - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = NamedStructFieldExpr; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct datafusion.NamedStructFieldExpr") - } - - fn visit_map(self, mut map_: V) -> std::result::Result - where - V: serde::de::MapAccess<'de>, - { - let mut name__ = None; - while let Some(k) = map_.next_key()? { - match k { - GeneratedField::Name => { - if name__.is_some() { - return Err(serde::de::Error::duplicate_field("name")); - } - name__ = map_.next_value()?; - } - } - } - Ok(NamedStructFieldExpr { - name: name__, - }) - } - } - deserializer.deserialize_struct("datafusion.NamedStructFieldExpr", FIELDS, GeneratedVisitor) - } -} impl serde::Serialize for NegativeNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -18754,9 +18663,6 @@ impl serde::Serialize for PhysicalExprNode { physical_expr_node::ExprType::LikeExpr(v) => { struct_ser.serialize_field("likeExpr", v)?; } - physical_expr_node::ExprType::GetIndexedFieldExpr(v) => { - struct_ser.serialize_field("getIndexedFieldExpr", v)?; - } } } struct_ser.end() @@ -18798,8 +18704,6 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "scalarUdf", "like_expr", "likeExpr", - "get_indexed_field_expr", - "getIndexedFieldExpr", ]; #[allow(clippy::enum_variant_names)] @@ -18821,7 +18725,6 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { WindowExpr, ScalarUdf, LikeExpr, - GetIndexedFieldExpr, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -18860,7 +18763,6 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "windowExpr" | "window_expr" => Ok(GeneratedField::WindowExpr), "scalarUdf" | "scalar_udf" => Ok(GeneratedField::ScalarUdf), "likeExpr" | "like_expr" => Ok(GeneratedField::LikeExpr), - "getIndexedFieldExpr" | "get_indexed_field_expr" => Ok(GeneratedField::GetIndexedFieldExpr), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -19000,13 +18902,6 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { return Err(serde::de::Error::duplicate_field("likeExpr")); } expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::LikeExpr) -; - } - GeneratedField::GetIndexedFieldExpr => { - if expr_type__.is_some() { - return Err(serde::de::Error::duplicate_field("getIndexedFieldExpr")); - } - expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::GetIndexedFieldExpr) ; } } @@ -19130,120 +19025,6 @@ impl<'de> serde::Deserialize<'de> for PhysicalExtensionNode { deserializer.deserialize_struct("datafusion.PhysicalExtensionNode", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for PhysicalGetIndexedFieldExprNode { - #[allow(deprecated)] - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - use serde::ser::SerializeStruct; - let mut len = 0; - if self.arg.is_some() { - len += 1; - } - if self.field.is_some() { - len += 1; - } - let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalGetIndexedFieldExprNode", len)?; - if let Some(v) = self.arg.as_ref() { - struct_ser.serialize_field("arg", v)?; - } - if let Some(v) = self.field.as_ref() { - match v { - physical_get_indexed_field_expr_node::Field::NamedStructFieldExpr(v) => { - struct_ser.serialize_field("namedStructFieldExpr", v)?; - } - } - } - struct_ser.end() - } -} -impl<'de> serde::Deserialize<'de> for PhysicalGetIndexedFieldExprNode { - #[allow(deprecated)] - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - const FIELDS: &[&str] = &[ - "arg", - "named_struct_field_expr", - "namedStructFieldExpr", - ]; - - #[allow(clippy::enum_variant_names)] - enum GeneratedField { - Arg, - NamedStructFieldExpr, - } - impl<'de> serde::Deserialize<'de> for GeneratedField { - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - struct GeneratedVisitor; - - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = GeneratedField; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(formatter, "expected one of: {:?}", &FIELDS) - } - - #[allow(unused_variables)] - fn visit_str(self, value: &str) -> std::result::Result - where - E: serde::de::Error, - { - match value { - "arg" => Ok(GeneratedField::Arg), - "namedStructFieldExpr" | "named_struct_field_expr" => Ok(GeneratedField::NamedStructFieldExpr), - _ => Err(serde::de::Error::unknown_field(value, FIELDS)), - } - } - } - deserializer.deserialize_identifier(GeneratedVisitor) - } - } - struct GeneratedVisitor; - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = PhysicalGetIndexedFieldExprNode; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct datafusion.PhysicalGetIndexedFieldExprNode") - } - - fn visit_map(self, mut map_: V) -> std::result::Result - where - V: serde::de::MapAccess<'de>, - { - let mut arg__ = None; - let mut field__ = None; - while let Some(k) = map_.next_key()? { - match k { - GeneratedField::Arg => { - if arg__.is_some() { - return Err(serde::de::Error::duplicate_field("arg")); - } - arg__ = map_.next_value()?; - } - GeneratedField::NamedStructFieldExpr => { - if field__.is_some() { - return Err(serde::de::Error::duplicate_field("namedStructFieldExpr")); - } - field__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_get_indexed_field_expr_node::Field::NamedStructFieldExpr) -; - } - } - } - Ok(PhysicalGetIndexedFieldExprNode { - arg: arg__, - field: field__, - }) - } - } - deserializer.deserialize_struct("datafusion.PhysicalGetIndexedFieldExprNode", FIELDS, GeneratedVisitor) - } -} impl serde::Serialize for PhysicalHashRepartition { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 80f529196f54..e1c9af105bbd 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2035,7 +2035,7 @@ pub struct PhysicalExtensionNode { pub struct PhysicalExprNode { #[prost( oneof = "physical_expr_node::ExprType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 18, 19" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 18" )] pub expr_type: ::core::option::Option, } @@ -2083,10 +2083,6 @@ pub mod physical_expr_node { ScalarUdf(super::PhysicalScalarUdfNode), #[prost(message, tag = "18")] LikeExpr(::prost::alloc::boxed::Box), - #[prost(message, tag = "19")] - GetIndexedFieldExpr( - ::prost::alloc::boxed::Box, - ), } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -2743,31 +2739,6 @@ pub struct ColumnStats { #[prost(message, optional, tag = "4")] pub distinct_count: ::core::option::Option, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct NamedStructFieldExpr { - #[prost(message, optional, tag = "1")] - pub name: ::core::option::Option, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct PhysicalGetIndexedFieldExprNode { - #[prost(message, optional, boxed, tag = "1")] - pub arg: ::core::option::Option<::prost::alloc::boxed::Box>, - #[prost(oneof = "physical_get_indexed_field_expr_node::Field", tags = "2")] - pub field: ::core::option::Option, -} -/// Nested message and enum types in `PhysicalGetIndexedFieldExprNode`. -pub mod physical_get_indexed_field_expr_node { - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum Field { - /// 3 was list_index_expr - /// 4 was list_range_expr - #[prost(message, tag = "2")] - NamedStructFieldExpr(super::NamedStructFieldExpr), - } -} #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum JoinType { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 16f0e94cad83..184c048c1bdd 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -41,9 +41,8 @@ use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::WindowFunctionDefinition; use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::{ - in_list, BinaryExpr, CaseExpr, CastExpr, Column, GetFieldAccessExpr, - GetIndexedFieldExpr, IsNotNullExpr, IsNullExpr, LikeExpr, Literal, NegativeExpr, - NotExpr, TryCastExpr, + in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr, + Literal, NegativeExpr, NotExpr, TryCastExpr, }; use datafusion::physical_plan::windows::create_window_expr; use datafusion::physical_plan::{ @@ -384,27 +383,6 @@ pub fn parse_physical_expr( input_schema, )?, )), - ExprType::GetIndexedFieldExpr(get_indexed_field_expr) => { - let field = match &get_indexed_field_expr.field { - Some(protobuf::physical_get_indexed_field_expr_node::Field::NamedStructFieldExpr(named_struct_field_expr)) => GetFieldAccessExpr::NamedStructField{ - name: convert_required!(named_struct_field_expr.name)?, - }, - None => - return Err(proto_error( - "Field must not be None", - )), - }; - - Arc::new(GetIndexedFieldExpr::new( - parse_required_physical_expr( - get_indexed_field_expr.arg.as_deref(), - registry, - "arg", - input_schema, - )?, - field, - )) - } }; Ok(pexpr) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index bdb6cc668708..ba77b30b7f8d 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -39,7 +39,6 @@ use datafusion::datasource::{ physical_plan::FileSinkConfig, }; use datafusion::logical_expr::BuiltinScalarFunction; -use datafusion::physical_expr::expressions::{GetFieldAccessExpr, GetIndexedFieldExpr}; use datafusion::physical_expr::window::{NthValueKind, SlidingAggregateWindowExpr}; use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::{ @@ -551,25 +550,6 @@ impl TryFrom> for protobuf::PhysicalExprNode { }), )), }) - } else if let Some(expr) = expr.downcast_ref::() { - let field = match expr.field() { - GetFieldAccessExpr::NamedStructField{name} => Some( - protobuf::physical_get_indexed_field_expr_node::Field::NamedStructFieldExpr(protobuf::NamedStructFieldExpr { - name: Some(ScalarValue::try_from(name)?) - }) - ), - }; - - Ok(protobuf::PhysicalExprNode { - expr_type: Some( - protobuf::physical_expr_node::ExprType::GetIndexedFieldExpr( - Box::new(protobuf::PhysicalGetIndexedFieldExprNode { - arg: Some(Box::new(expr.arg().to_owned().try_into()?)), - field, - }), - ), - ), - }) } else { internal_err!("physical_plan::to_proto() unsupported expression {value:?}") } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 3441a9f7fa11..7f0c6286a19d 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -22,7 +22,7 @@ use std::vec; use arrow::csv::WriterBuilder; use datafusion::arrow::array::ArrayRef; use datafusion::arrow::compute::kernels::sort::SortOptions; -use datafusion::arrow::datatypes::{DataType, Field, Fields, IntervalUnit, Schema}; +use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema}; use datafusion::datasource::file_format::csv::CsvSink; use datafusion::datasource::file_format::json::JsonSink; use datafusion::datasource::file_format::parquet::ParquetSink; @@ -46,8 +46,7 @@ use datafusion::physical_plan::analyze::AnalyzeExec; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::expressions::{ binary, cast, col, in_list, like, lit, Avg, BinaryExpr, Column, DistinctCount, - GetFieldAccessExpr, GetIndexedFieldExpr, NotExpr, NthValue, PhysicalSortExpr, - StringAgg, Sum, + NotExpr, NthValue, PhysicalSortExpr, StringAgg, Sum, }; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::functions; @@ -712,36 +711,6 @@ fn roundtrip_like() -> Result<()> { roundtrip_test(plan) } -#[test] -fn roundtrip_get_indexed_field_named_struct_field() -> Result<()> { - let fields = vec![ - Field::new("id", DataType::Int64, true), - Field::new_struct( - "arg", - Fields::from(vec![Field::new("name", DataType::Float64, true)]), - true, - ), - ]; - - let schema = Schema::new(fields); - let input = Arc::new(EmptyExec::new(Arc::new(schema.clone()))); - - let col_arg = col("arg", &schema)?; - let get_indexed_field_expr = Arc::new(GetIndexedFieldExpr::new( - col_arg, - GetFieldAccessExpr::NamedStructField { - name: ScalarValue::from("name"), - }, - )); - - let plan = Arc::new(ProjectionExec::try_new( - vec![(get_indexed_field_expr, "result".to_string())], - input, - )?); - - roundtrip_test(plan) -} - #[test] fn roundtrip_analyze() -> Result<()> { let field_a = Field::new("plan_type", DataType::Utf8, false);