Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unnest_column to DataFrame #5106

Merged
merged 20 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,26 @@ impl DataFrame {
Ok(DataFrame::new(self.session_state, project_plan))
}

/// Expand each list element of a column to multiple rows.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
/// let df = df.unnest_column("a")?;
/// # Ok(())
/// # }
/// ```
pub fn unnest_column(self, column: &str) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::from(self.plan)
.unnest_column(column)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
}

/// Filter a DataFrame to only include rows that match the specified filter expression.
///
/// ```
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ pub mod stream;
pub mod streaming;
pub mod udaf;
pub mod union;
pub mod unnest;
pub mod values;
pub mod windows;

Expand Down
10 changes: 9 additions & 1 deletion datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Physical query planner

use super::analyze::AnalyzeExec;
use super::unnest::UnnestExec;
use super::{
aggregates, empty::EmptyExec, joins::PartitionMode, udaf, union::UnionExec,
values::ValuesExec, windows,
Expand All @@ -27,7 +28,7 @@ use crate::execution::context::{ExecutionProps, SessionState};
use crate::logical_expr::utils::generate_sort_key;
use crate::logical_expr::{
Aggregate, Distinct, EmptyRelation, Join, Projection, Sort, SubqueryAlias, TableScan,
Window,
Unnest, Window,
};
use crate::logical_expr::{
CrossJoin, Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType,
Expand Down Expand Up @@ -1109,6 +1110,13 @@ impl DefaultPhysicalPlanner {

Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
}
LogicalPlan::Unnest(Unnest { input, column, schema }) => {
let input = self.create_initial_plan(input, session_state).await?;
let column_exec = schema.index_of_column(column)
.map(|idx| Column::new(&column.name, idx))?;
let schema = SchemaRef::new(schema.as_ref().to_owned().into());
Ok(Arc::new(UnnestExec::new(input, column_exec, schema)))
}
LogicalPlan::CreateExternalTable(_) => {
// There is no default plan for "CREATE EXTERNAL
// TABLE" -- it must be handled at a higher level (so
Expand Down
305 changes: 305 additions & 0 deletions datafusion/core/src/physical_plan/unnest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines the unnest column plan for unnesting values in a column that contains a list
//! type, conceptually is like joining each row with all the values in the list column.
use arrow::array::{
new_null_array, Array, ArrayAccessor, ArrayRef, FixedSizeListArray, LargeListArray,
ListArray,
};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use futures::Stream;
use futures::StreamExt;
use log::debug;
use std::time::Instant;
use std::{any::Any, sync::Arc};

use crate::execution::context::TaskContext;
use crate::physical_plan::{
coalesce_batches::concat_batches, expressions::Column, DisplayFormatType,
Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, PhysicalExpr,
PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use crate::{
error::{DataFusionError, Result},
scalar::ScalarValue,
};

/// Unnest the given column by joining the row with each value in the nested type.
#[derive(Debug)]
pub struct UnnestExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
/// The schema once the unnest is applied
schema: SchemaRef,
/// The unnest column
column: Column,
}

impl UnnestExec {
/// Create a new [UnnestExec].
pub fn new(input: Arc<dyn ExecutionPlan>, column: Column, schema: SchemaRef) -> Self {
UnnestExec {
input,
schema,
column,
}
}
}

impl ExecutionPlan for UnnestExec {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(UnnestExec::new(
children[0].clone(),
self.column.clone(),
self.schema.clone(),
)))
}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::UnspecifiedDistribution]
}

fn output_partitioning(&self) -> Partitioning {
self.input.output_partitioning()
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn equivalence_properties(&self) -> EquivalenceProperties {
self.input.equivalence_properties()
}

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, context)?;

Ok(Box::pin(UnnestStream {
input,
schema: self.schema.clone(),
column: self.column.clone(),
num_input_batches: 0,
num_input_rows: 0,
num_output_batches: 0,
num_output_rows: 0,
unnest_time: 0,
}))
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "UnnestExec")
}
}
}

fn statistics(&self) -> Statistics {
self.input.statistics()
}
}

/// A stream that issues [RecordBatch]es with unnested column data.
struct UnnestStream {
/// Input stream
input: SendableRecordBatchStream,
/// Unnested schema
schema: Arc<Schema>,
/// The unnest column
column: Column,
/// number of input batches
num_input_batches: usize,
/// number of input rows
num_input_rows: usize,
/// number of batches produced
num_output_batches: usize,
/// number of rows produced
num_output_rows: usize,
/// total time for column unnesting, in ms
unnest_time: usize,
}

impl RecordBatchStream for UnnestStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

#[async_trait]
impl Stream for UnnestStream {
type Item = Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.poll_next_impl(cx)
}
}

impl UnnestStream {
/// Separate implementation function that unpins the [`UnnestStream`] so
/// that partial borrows work correctly
fn poll_next_impl(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<RecordBatch>>> {
self.input
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
Some(Ok(batch)) => {
let start = Instant::now();
let result = build_batch(&batch, &self.schema, &self.column);
self.num_input_batches += 1;
self.num_input_rows += batch.num_rows();
if let Ok(ref batch) = result {
self.unnest_time += start.elapsed().as_millis() as usize;
self.num_output_batches += 1;
self.num_output_rows += batch.num_rows();
}

Some(result)
}
other => {
debug!(
"Processed {} probe-side input batches containing {} rows and \
produced {} output batches containing {} rows in {} ms",
self.num_input_batches,
self.num_input_rows,
self.num_output_batches,
self.num_output_rows,
self.unnest_time,
);
other
}
})
}
}

fn build_batch(
batch: &RecordBatch,
schema: &SchemaRef,
column: &Column,
) -> Result<RecordBatch> {
let list_array = column.evaluate(batch)?.into_array(batch.num_rows());
match list_array.data_type() {
arrow::datatypes::DataType::List(_) => {
let list_array = list_array.as_any().downcast_ref::<ListArray>().unwrap();
unnest_batch(batch, schema, column, &list_array)
}
arrow::datatypes::DataType::LargeList(_) => {
let list_array = list_array
.as_any()
.downcast_ref::<LargeListArray>()
.unwrap();
unnest_batch(batch, schema, column, &list_array)
}
arrow::datatypes::DataType::FixedSizeList(_, _) => {
let list_array = list_array
.as_any()
.downcast_ref::<FixedSizeListArray>()
.unwrap();
unnest_batch(batch, schema, column, list_array)
}
_ => {
return Err(DataFusionError::Execution(format!(
"Invalid unnest column {column}"
)));
}
}
}

fn unnest_batch<T>(
batch: &RecordBatch,
schema: &SchemaRef,
column: &Column,
list_array: &T,
) -> Result<RecordBatch>
where
T: ArrayAccessor<Item = ArrayRef>,
{
let mut batches = Vec::new();
let mut num_rows = 0;

for row in 0..batch.num_rows() {
let arrays = batch
.columns()
.iter()
.enumerate()
.map(|(col_idx, arr)| {
if col_idx == column.index() {
// Unnest the value at the given row.
if list_array.value(row).is_empty() {
// If nested array is empty add an array with 1 null.
Ok(new_null_array(list_array.value(row).data_type(), 1))
} else {
Ok(list_array.value(row))
}
} else {
// Number of elements to duplicate, use max(1) to handle null.
let nested_len = list_array.value(row).len().max(1);
// Duplicate rows for each value in the nested array.
if arr.is_null(row) {
Ok(new_null_array(arr.data_type(), nested_len))
} else {
let scalar = ScalarValue::try_from_array(arr, row)?;
Ok(scalar.to_array_of_size(nested_len))
}
}
})
.collect::<Result<Vec<_>>>()?;

let rb = RecordBatch::try_new(schema.clone(), arrays.to_vec())?;
num_rows += rb.num_rows();
batches.push(rb);
}

concat_batches(schema, &batches, num_rows).map_err(Into::into)
}
Loading