diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index f8ee54ca0837..557e04a3b66f 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -144,6 +144,26 @@ impl DataFrame { Ok(DataFrame::new(self.session_state, project_plan)) } + /// Expand each list element of a column to multiple rows. + /// + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let ctx = SessionContext::new(); + /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + /// let df = df.unnest_column("a")?; + /// # Ok(()) + /// # } + /// ``` + pub fn unnest_column(self, column: &str) -> Result { + let plan = LogicalPlanBuilder::from(self.plan) + .unnest_column(column)? + .build()?; + Ok(DataFrame::new(self.session_state, plan)) + } + /// Filter a DataFrame to only include rows that match the specified filter expression. /// /// ``` diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index d0a4e556689b..c8f5eb793910 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -660,6 +660,7 @@ pub mod stream; pub mod streaming; pub mod udaf; pub mod union; +pub mod unnest; pub mod values; pub mod windows; diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 5cdddc63474b..65b4a6ed462c 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -18,6 +18,7 @@ //! Physical query planner use super::analyze::AnalyzeExec; +use super::unnest::UnnestExec; use super::{ aggregates, empty::EmptyExec, joins::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, @@ -27,7 +28,7 @@ use crate::execution::context::{ExecutionProps, SessionState}; use crate::logical_expr::utils::generate_sort_key; use crate::logical_expr::{ Aggregate, Distinct, EmptyRelation, Join, Projection, Sort, SubqueryAlias, TableScan, - Window, + Unnest, Window, }; use crate::logical_expr::{ CrossJoin, Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType, @@ -1109,6 +1110,13 @@ impl DefaultPhysicalPlanner { Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch))) } + LogicalPlan::Unnest(Unnest { input, column, schema }) => { + let input = self.create_initial_plan(input, session_state).await?; + let column_exec = schema.index_of_column(column) + .map(|idx| Column::new(&column.name, idx))?; + let schema = SchemaRef::new(schema.as_ref().to_owned().into()); + Ok(Arc::new(UnnestExec::new(input, column_exec, schema))) + } LogicalPlan::CreateExternalTable(_) => { // There is no default plan for "CREATE EXTERNAL // TABLE" -- it must be handled at a higher level (so diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs new file mode 100644 index 000000000000..5ae80c8d7755 --- /dev/null +++ b/datafusion/core/src/physical_plan/unnest.rs @@ -0,0 +1,305 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the unnest column plan for unnesting values in a column that contains a list +//! type, conceptually is like joining each row with all the values in the list column. +use arrow::array::{ + new_null_array, Array, ArrayAccessor, ArrayRef, FixedSizeListArray, LargeListArray, + ListArray, +}; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use futures::Stream; +use futures::StreamExt; +use log::debug; +use std::time::Instant; +use std::{any::Any, sync::Arc}; + +use crate::execution::context::TaskContext; +use crate::physical_plan::{ + coalesce_batches::concat_batches, expressions::Column, DisplayFormatType, + Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, PhysicalExpr, + PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, +}; +use crate::{ + error::{DataFusionError, Result}, + scalar::ScalarValue, +}; + +/// Unnest the given column by joining the row with each value in the nested type. +#[derive(Debug)] +pub struct UnnestExec { + /// Input execution plan + input: Arc, + /// The schema once the unnest is applied + schema: SchemaRef, + /// The unnest column + column: Column, +} + +impl UnnestExec { + /// Create a new [UnnestExec]. + pub fn new(input: Arc, column: Column, schema: SchemaRef) -> Self { + UnnestExec { + input, + schema, + column, + } + } +} + +impl ExecutionPlan for UnnestExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + /// Specifies whether this plan generates an infinite stream of records. + /// If the plan does not support pipelining, but it its input(s) are + /// infinite, returns an error to indicate this. + fn unbounded_output(&self, children: &[bool]) -> Result { + Ok(children[0]) + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(UnnestExec::new( + children[0].clone(), + self.column.clone(), + self.schema.clone(), + ))) + } + + fn required_input_distribution(&self) -> Vec { + vec![Distribution::UnspecifiedDistribution] + } + + fn output_partitioning(&self) -> Partitioning { + self.input.output_partitioning() + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn equivalence_properties(&self) -> EquivalenceProperties { + self.input.equivalence_properties() + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let input = self.input.execute(partition, context)?; + + Ok(Box::pin(UnnestStream { + input, + schema: self.schema.clone(), + column: self.column.clone(), + num_input_batches: 0, + num_input_rows: 0, + num_output_batches: 0, + num_output_rows: 0, + unnest_time: 0, + })) + } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "UnnestExec") + } + } + } + + fn statistics(&self) -> Statistics { + self.input.statistics() + } +} + +/// A stream that issues [RecordBatch]es with unnested column data. +struct UnnestStream { + /// Input stream + input: SendableRecordBatchStream, + /// Unnested schema + schema: Arc, + /// The unnest column + column: Column, + /// number of input batches + num_input_batches: usize, + /// number of input rows + num_input_rows: usize, + /// number of batches produced + num_output_batches: usize, + /// number of rows produced + num_output_rows: usize, + /// total time for column unnesting, in ms + unnest_time: usize, +} + +impl RecordBatchStream for UnnestStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +#[async_trait] +impl Stream for UnnestStream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.poll_next_impl(cx) + } +} + +impl UnnestStream { + /// Separate implementation function that unpins the [`UnnestStream`] so + /// that partial borrows work correctly + fn poll_next_impl( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll>> { + self.input + .poll_next_unpin(cx) + .map(|maybe_batch| match maybe_batch { + Some(Ok(batch)) => { + let start = Instant::now(); + let result = build_batch(&batch, &self.schema, &self.column); + self.num_input_batches += 1; + self.num_input_rows += batch.num_rows(); + if let Ok(ref batch) = result { + self.unnest_time += start.elapsed().as_millis() as usize; + self.num_output_batches += 1; + self.num_output_rows += batch.num_rows(); + } + + Some(result) + } + other => { + debug!( + "Processed {} probe-side input batches containing {} rows and \ + produced {} output batches containing {} rows in {} ms", + self.num_input_batches, + self.num_input_rows, + self.num_output_batches, + self.num_output_rows, + self.unnest_time, + ); + other + } + }) + } +} + +fn build_batch( + batch: &RecordBatch, + schema: &SchemaRef, + column: &Column, +) -> Result { + let list_array = column.evaluate(batch)?.into_array(batch.num_rows()); + match list_array.data_type() { + arrow::datatypes::DataType::List(_) => { + let list_array = list_array.as_any().downcast_ref::().unwrap(); + unnest_batch(batch, schema, column, &list_array) + } + arrow::datatypes::DataType::LargeList(_) => { + let list_array = list_array + .as_any() + .downcast_ref::() + .unwrap(); + unnest_batch(batch, schema, column, &list_array) + } + arrow::datatypes::DataType::FixedSizeList(_, _) => { + let list_array = list_array + .as_any() + .downcast_ref::() + .unwrap(); + unnest_batch(batch, schema, column, list_array) + } + _ => { + return Err(DataFusionError::Execution(format!( + "Invalid unnest column {column}" + ))); + } + } +} + +fn unnest_batch( + batch: &RecordBatch, + schema: &SchemaRef, + column: &Column, + list_array: &T, +) -> Result +where + T: ArrayAccessor, +{ + let mut batches = Vec::new(); + let mut num_rows = 0; + + for row in 0..batch.num_rows() { + let arrays = batch + .columns() + .iter() + .enumerate() + .map(|(col_idx, arr)| { + if col_idx == column.index() { + // Unnest the value at the given row. + if list_array.value(row).is_empty() { + // If nested array is empty add an array with 1 null. + Ok(new_null_array(list_array.value(row).data_type(), 1)) + } else { + Ok(list_array.value(row)) + } + } else { + // Number of elements to duplicate, use max(1) to handle null. + let nested_len = list_array.value(row).len().max(1); + // Duplicate rows for each value in the nested array. + if arr.is_null(row) { + Ok(new_null_array(arr.data_type(), nested_len)) + } else { + let scalar = ScalarValue::try_from_array(arr, row)?; + Ok(scalar.to_array_of_size(nested_len)) + } + } + }) + .collect::>>()?; + + let rb = RecordBatch::try_new(schema.clone(), arrays.to_vec())?; + num_rows += rb.num_rows(); + batches.push(rb); + } + + concat_batches(schema, &batches, num_rows).map_err(Into::into) +} diff --git a/datafusion/core/tests/dataframe.rs b/datafusion/core/tests/dataframe.rs index 39b7739af8b1..644cc1a0bc3b 100644 --- a/datafusion/core/tests/dataframe.rs +++ b/datafusion/core/tests/dataframe.rs @@ -17,7 +17,10 @@ use arrow::datatypes::{DataType, Field, Schema}; use arrow::{ - array::{Int32Array, StringArray, UInt32Array}, + array::{ + ArrayRef, Int32Array, Int32Builder, ListBuilder, StringArray, StringBuilder, + StructBuilder, UInt32Array, UInt32Builder, + }, record_batch::RecordBatch, }; use datafusion::from_slice::FromSlice; @@ -500,6 +503,110 @@ async fn right_anti_filter_push_down() -> Result<()> { Ok(()) } +#[tokio::test] +async fn unnest_columns() -> Result<()> { + const NUM_ROWS: usize = 4; + let df = table_with_nested_types(NUM_ROWS).await?; + let results = df.collect().await?; + let expected = vec![ + r#"+----------+------------------------------------------------------------+--------------------+"#, + r#"| shape_id | points | tags |"#, + r#"+----------+------------------------------------------------------------+--------------------+"#, + r#"| 1 | [{"x": -3, "y": -4}, {"x": -3, "y": 6}, {"x": 2, "y": -2}] | [tag1] |"#, + r#"| 2 | | [tag1, tag2] |"#, + r#"| 3 | [{"x": -9, "y": 2}, {"x": -10, "y": -4}] | |"#, + r#"| 4 | [{"x": -3, "y": 5}, {"x": 2, "y": -1}] | [tag1, tag2, tag3] |"#, + r#"+----------+------------------------------------------------------------+--------------------+"#, + ]; + assert_batches_sorted_eq!(expected, &results); + + // Unnest tags + let df = table_with_nested_types(NUM_ROWS).await?; + let results = df.unnest_column("tags")?.collect().await?; + let expected = vec![ + r#"+----------+------------------------------------------------------------+------+"#, + r#"| shape_id | points | tags |"#, + r#"+----------+------------------------------------------------------------+------+"#, + r#"| 1 | [{"x": -3, "y": -4}, {"x": -3, "y": 6}, {"x": 2, "y": -2}] | tag1 |"#, + r#"| 2 | | tag1 |"#, + r#"| 2 | | tag2 |"#, + r#"| 3 | [{"x": -9, "y": 2}, {"x": -10, "y": -4}] | |"#, + r#"| 4 | [{"x": -3, "y": 5}, {"x": 2, "y": -1}] | tag1 |"#, + r#"| 4 | [{"x": -3, "y": 5}, {"x": 2, "y": -1}] | tag2 |"#, + r#"| 4 | [{"x": -3, "y": 5}, {"x": 2, "y": -1}] | tag3 |"#, + r#"+----------+------------------------------------------------------------+------+"#, + ]; + assert_batches_sorted_eq!(expected, &results); + + // Test aggregate results for tags. + let df = table_with_nested_types(NUM_ROWS).await?; + let count = df.unnest_column("tags")?.count().await?; + assert_eq!(count, results.iter().map(|r| r.num_rows()).sum::()); + + // Unnest points + let df = table_with_nested_types(NUM_ROWS).await?; + let results = df.unnest_column("points")?.collect().await?; + let expected = vec![ + r#"+----------+---------------------+--------------------+"#, + r#"| shape_id | points | tags |"#, + r#"+----------+---------------------+--------------------+"#, + r#"| 1 | {"x": -3, "y": -4} | [tag1] |"#, + r#"| 1 | {"x": -3, "y": 6} | [tag1] |"#, + r#"| 1 | {"x": 2, "y": -2} | [tag1] |"#, + r#"| 2 | | [tag1, tag2] |"#, + r#"| 3 | {"x": -9, "y": 2} | |"#, + r#"| 3 | {"x": -10, "y": -4} | |"#, + r#"| 4 | {"x": -3, "y": 5} | [tag1, tag2, tag3] |"#, + r#"| 4 | {"x": 2, "y": -1} | [tag1, tag2, tag3] |"#, + r#"+----------+---------------------+--------------------+"#, + ]; + assert_batches_sorted_eq!(expected, &results); + + // Test aggregate results for points. + let df = table_with_nested_types(NUM_ROWS).await?; + let count = df.unnest_column("points")?.count().await?; + assert_eq!(count, results.iter().map(|r| r.num_rows()).sum::()); + + // Unnest both points and tags. + let df = table_with_nested_types(NUM_ROWS).await?; + let results = df + .unnest_column("points")? + .unnest_column("tags")? + .collect() + .await?; + let expected = vec![ + r#"+----------+---------------------+------+"#, + r#"| shape_id | points | tags |"#, + r#"+----------+---------------------+------+"#, + r#"| 1 | {"x": -3, "y": -4} | tag1 |"#, + r#"| 1 | {"x": -3, "y": 6} | tag1 |"#, + r#"| 1 | {"x": 2, "y": -2} | tag1 |"#, + r#"| 2 | | tag1 |"#, + r#"| 2 | | tag2 |"#, + r#"| 3 | {"x": -9, "y": 2} | |"#, + r#"| 3 | {"x": -10, "y": -4} | |"#, + r#"| 4 | {"x": -3, "y": 5} | tag1 |"#, + r#"| 4 | {"x": -3, "y": 5} | tag2 |"#, + r#"| 4 | {"x": -3, "y": 5} | tag3 |"#, + r#"| 4 | {"x": 2, "y": -1} | tag1 |"#, + r#"| 4 | {"x": 2, "y": -1} | tag2 |"#, + r#"| 4 | {"x": 2, "y": -1} | tag3 |"#, + r#"+----------+---------------------+------+"#, + ]; + assert_batches_sorted_eq!(expected, &results); + + // Test aggregate results for points and tags. + let df = table_with_nested_types(NUM_ROWS).await?; + let count = df + .unnest_column("points")? + .unnest_column("tags")? + .count() + .await?; + assert_eq!(count, results.iter().map(|r| r.num_rows()).sum::()); + + Ok(()) +} + async fn create_test_table() -> Result { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Utf8, false), @@ -575,3 +682,70 @@ fn create_join_context() -> Result { Ok(ctx) } + +/// Create a data frame that contains nested types. +/// +/// Create a data frame with nested types, each row contains: +/// - shape_id an integer primary key +/// - points A list of points structs {x, y} +/// - A list of tags. +async fn table_with_nested_types(n: usize) -> Result { + use rand::prelude::*; + + let mut shape_id_builder = UInt32Builder::new(); + let mut points_builder = ListBuilder::new(StructBuilder::from_fields( + vec![ + Field::new("x", DataType::Int32, false), + Field::new("y", DataType::Int32, false), + ], + 5, + )); + let mut tags_builder = ListBuilder::new(StringBuilder::new()); + + let mut rng = StdRng::seed_from_u64(197); + + for idx in 0..n { + // Append shape id. + shape_id_builder.append_value(idx as u32 + 1); + + // Add a random number of points + let num_points: usize = rng.gen_range(0..4); + if num_points > 0 { + for _ in 0..num_points.max(2) { + // Add x value + points_builder + .values() + .field_builder::(0) + .unwrap() + .append_value(rng.gen_range(-10..10)); + // Add y value + points_builder + .values() + .field_builder::(1) + .unwrap() + .append_value(rng.gen_range(-10..10)); + points_builder.values().append(true); + } + } + + // Append null if num points is 0. + points_builder.append(num_points > 0); + + // Append tags. + let num_tags: usize = rng.gen_range(0..5); + for id in 0..num_tags { + tags_builder.values().append_value(format!("tag{}", id + 1)); + } + tags_builder.append(num_tags > 0); + } + + let batch = RecordBatch::try_from_iter(vec![ + ("shape_id", Arc::new(shape_id_builder.finish()) as ArrayRef), + ("points", Arc::new(points_builder.finish()) as ArrayRef), + ("tags", Arc::new(tags_builder.finish()) as ArrayRef), + ])?; + + let ctx = SessionContext::new(); + ctx.register_batch("shapes", batch)?; + ctx.table("shapes").await +} diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index f901fc879669..9594e74f0796 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -75,8 +75,8 @@ pub use logical_plan::{ DropTable, DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, LogicalPlanBuilder, Partitioning, PlanType, PlanVisitor, Projection, Repartition, SetVariable, Sort, StringifiedPlan, Subquery, - SubqueryAlias, TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values, - Window, WriteOp, + SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, UserDefinedLogicalNode, + Values, Window, WriteOp, }; pub use nullif::SUPPORTED_NULLIF_TYPES; pub use operator::Operator; diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index f6213d439987..23256662f7fd 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -29,7 +29,7 @@ use crate::{ Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, Projection, Repartition, Sort, SubqueryAlias, TableScan, ToStringifiedPlan, - Union, Values, Window, + Union, Unnest, Values, Window, }, utils::{ can_hash, expand_qualified_wildcard, expand_wildcard, @@ -891,6 +891,11 @@ impl LogicalPlanBuilder { null_equals_null: false, }))) } + + /// Unnest the given column. + pub fn unnest_column(self, column: impl Into) -> Result { + Ok(Self::from(unnest(self.plan, column.into())?)) + } } /// Creates a schema for a join operation. @@ -1174,6 +1179,52 @@ impl TableSource for LogicalTableSource { } } +/// Create an unnest plan. +pub fn unnest(input: LogicalPlan, column: Column) -> Result { + let unnest_field = input.schema().field_from_column(&column)?; + + // Extract the type of the nested field in the list. + let unnested_field = match unnest_field.data_type() { + DataType::List(field) + | DataType::FixedSizeList(field, _) + | DataType::LargeList(field) => DFField::new( + unnest_field.qualifier().map(String::as_str), + unnest_field.name(), + field.data_type().clone(), + unnest_field.is_nullable(), + ), + _ => { + // If the unnest field is not a list type return the input plan. + return Ok(input); + } + }; + + // Update the schema with the unnest column type changed to contain the nested type. + let input_schema = input.schema(); + let fields = input_schema + .fields() + .iter() + .map(|f| { + if f == unnest_field { + unnested_field.clone() + } else { + f.clone() + } + }) + .collect::>(); + + let schema = Arc::new(DFSchema::new_with_metadata( + fields, + input_schema.metadata().clone(), + )?); + + Ok(LogicalPlan::Unnest(Unnest { + input: Arc::new(input), + column: unnested_field.qualified_column(), + schema, + })) +} + #[cfg(test)] mod tests { use crate::{expr, expr_fn::exists}; @@ -1559,4 +1610,77 @@ mod tests { Ok(()) } + + #[test] + fn plan_builder_unnest() -> Result<()> { + // Unnesting a simple column should return the child plan. + let plan = nested_table_scan("test_table")? + .unnest_column("scalar")? + .build()?; + + let expected = "TableScan: test_table"; + assert_eq!(expected, format!("{plan:?}")); + + // Unnesting the strings list. + let plan = nested_table_scan("test_table")? + .unnest_column("strings")? + .build()?; + + let expected = "\ + Unnest: test_table.strings\ + \n TableScan: test_table"; + assert_eq!(expected, format!("{plan:?}")); + + // Check unnested field is a scalar + let field = plan + .schema() + .field_with_name(Some("test_table"), "strings") + .unwrap(); + assert_eq!(&DataType::Utf8, field.data_type()); + + // Unnesting multiple fields. + let plan = nested_table_scan("test_table")? + .unnest_column("strings")? + .unnest_column("structs")? + .build()?; + + let expected = "\ + Unnest: test_table.structs\ + \n Unnest: test_table.strings\ + \n TableScan: test_table"; + assert_eq!(expected, format!("{plan:?}")); + + // Check unnested struct list field should be a struct. + let field = plan + .schema() + .field_with_name(Some("test_table"), "structs") + .unwrap(); + assert!(matches!(field.data_type(), DataType::Struct(_))); + + // Unnesting missing column should fail. + let plan = nested_table_scan("test_table")?.unnest_column("missing"); + assert!(plan.is_err()); + + Ok(()) + } + + fn nested_table_scan(table_name: &str) -> Result { + // Create a schema with a scalar field, a list of strings, and a list of structs. + let struct_field = Box::new(Field::new( + "item", + DataType::Struct(vec![ + Field::new("a", DataType::UInt32, false), + Field::new("b", DataType::UInt32, false), + ]), + false, + )); + let string_field = Box::new(Field::new("item", DataType::Utf8, false)); + let schema = Schema::new(vec![ + Field::new("scalar", DataType::UInt32, false), + Field::new("strings", DataType::List(string_field), false), + Field::new("structs", DataType::List(struct_field), false), + ]); + + table_scan(Some(table_name), &schema, None) + } } diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index dd2ed1e42044..4ad90dc3c940 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -27,7 +27,7 @@ pub use plan::{ DropTable, DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Prepare, Projection, Repartition, SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, - TableScan, ToStringifiedPlan, Union, Values, Window, WriteOp, + TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, WriteOp, }; pub use display::display_schema; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index a1cf08a84ba3..96fbeb94e6ac 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -123,6 +123,8 @@ pub enum LogicalPlan { Dml(DmlStatement), /// Describe the schema of table DescribeTable(DescribeTable), + /// Unnest a column that contains a nested list type. + Unnest(Unnest), } impl LogicalPlan { @@ -167,6 +169,7 @@ impl LogicalPlan { dummy_schema } LogicalPlan::Dml(DmlStatement { table_schema, .. }) => table_schema, + LogicalPlan::Unnest(Unnest { schema, .. }) => schema, } } @@ -179,7 +182,8 @@ impl LogicalPlan { LogicalPlan::Values(Values { schema, .. }) => vec![schema], LogicalPlan::Window(Window { input, schema, .. }) | LogicalPlan::Projection(Projection { input, schema, .. }) - | LogicalPlan::Aggregate(Aggregate { input, schema, .. }) => { + | LogicalPlan::Aggregate(Aggregate { input, schema, .. }) + | LogicalPlan::Unnest(Unnest { input, schema, .. }) => { let mut schemas = input.all_schemas(); schemas.insert(0, schema); schemas @@ -309,6 +313,9 @@ impl LogicalPlan { LogicalPlan::TableScan(TableScan { filters, .. }) => { filters.iter().try_for_each(f) } + LogicalPlan::Unnest(Unnest { column, .. }) => { + f(&Expr::Column(column.clone())) + } // plans without expressions LogicalPlan::EmptyRelation(_) | LogicalPlan::Subquery(_) @@ -361,6 +368,7 @@ impl LogicalPlan { | LogicalPlan::Prepare(Prepare { input, .. }) => { vec![input] } + LogicalPlan::Unnest(Unnest { input, .. }) => vec![input], // plans without inputs LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation { .. } @@ -560,6 +568,7 @@ impl LogicalPlan { LogicalPlan::Explain(explain) => explain.plan.accept(visitor)?, LogicalPlan::Analyze(analyze) => analyze.input.accept(visitor)?, LogicalPlan::Dml(write) => write.input.accept(visitor)?, + LogicalPlan::Unnest(Unnest { input, .. }) => input.accept(visitor)?, // plans without inputs LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation(_) @@ -1174,6 +1183,9 @@ impl LogicalPlan { LogicalPlan::DescribeTable(DescribeTable { .. }) => { write!(f, "DescribeTable") } + LogicalPlan::Unnest(Unnest { column, .. }) => { + write!(f, "Unnest: {column}") + } } } } @@ -1971,6 +1983,17 @@ pub trait ToStringifiedPlan { fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan; } +/// Unnest a column that contains a nested list type. +#[derive(Debug, Clone)] +pub struct Unnest { + /// The incoming logical plan + pub input: Arc, + /// The column to unnest + pub column: Column, + /// The output schema, containing the unnested field column. + pub schema: DFSchemaRef, +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 15ecb1e5c2ba..6f64bc14f812 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -26,7 +26,7 @@ use crate::logical_plan::builder::build_join_schema; use crate::logical_plan::{ Aggregate, Analyze, CreateMemoryTable, CreateView, Distinct, Extension, Filter, Join, Limit, Partitioning, Prepare, Projection, Repartition, Sort as SortPlan, Subquery, - SubqueryAlias, Union, Values, Window, + SubqueryAlias, Union, Unnest, Values, Window, }; use crate::{ BinaryExpr, Cast, DmlStatement, Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder, @@ -739,6 +739,35 @@ pub fn from_plan( Ok(plan.clone()) } LogicalPlan::DescribeTable(_) => Ok(plan.clone()), + LogicalPlan::Unnest(Unnest { column, schema, .. }) => { + // Update schema with unnested column type. + let input = Arc::new(inputs[0].clone()); + let nested_field = input.schema().field_from_column(column)?; + let unnested_field = schema.field_from_column(column)?; + let fields = input + .schema() + .fields() + .iter() + .map(|f| { + if f == nested_field { + unnested_field.clone() + } else { + f.clone() + } + }) + .collect::>(); + + let schema = Arc::new(DFSchema::new_with_metadata( + fields, + input.schema().metadata().clone(), + )?); + + Ok(LogicalPlan::Unnest(Unnest { + input, + column: column.clone(), + schema, + })) + } } } diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 8351813e33dd..e1830390dee9 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -241,6 +241,7 @@ impl OptimizerRule for CommonSubexprEliminate { | LogicalPlan::Distinct(_) | LogicalPlan::Extension(_) | LogicalPlan::Dml(_) + | LogicalPlan::Unnest(_) | LogicalPlan::Prepare(_) => { // apply the optimization to all inputs of the plan utils::optimize_children(self, plan, config)? diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs index f3107d01b45f..6d9d342ecbba 100644 --- a/datafusion/optimizer/src/push_down_projection.rs +++ b/datafusion/optimizer/src/push_down_projection.rs @@ -412,6 +412,7 @@ fn optimize_plan( | LogicalPlan::DescribeTable(_) | LogicalPlan::CrossJoin(_) | LogicalPlan::Dml(_) + | LogicalPlan::Unnest(_) | LogicalPlan::Extension { .. } | LogicalPlan::Prepare(_) => { let expr = plan.expressions(); diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 60fcc95e885f..6f44542b90b2 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1333,6 +1333,9 @@ impl AsLogicalPlan for LogicalPlanNode { ))), }) } + LogicalPlan::Unnest(_) => Err(proto_error( + "LogicalPlan serde is not yet implemented for Unnest", + )), LogicalPlan::CreateMemoryTable(_) => Err(proto_error( "LogicalPlan serde is not yet implemented for CreateMemoryTable", )),