diff --git a/datafusion/src/physical_plan/file_format/avro.rs b/datafusion/src/physical_plan/file_format/avro.rs index 9de8ffded5f7..b38968665015 100644 --- a/datafusion/src/physical_plan/file_format/avro.rs +++ b/datafusion/src/physical_plan/file_format/avro.rs @@ -165,13 +165,14 @@ impl ExecutionPlan for AvroExec { #[cfg(test)] #[cfg(feature = "avro")] mod tests { - use crate::datasource::file_format::{avro::AvroFormat, FileFormat}; use crate::datasource::object_store::local::{ local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, }; use crate::scalar::ScalarValue; + use arrow::datatypes::{DataType, Field, Schema}; use futures::StreamExt; + use sqlparser::ast::ObjectType::Schema; use super::*; @@ -228,6 +229,67 @@ mod tests { Ok(()) } + #[tokio::test] + async fn avro_exec_missing_column() -> Result<()> { + let testdata = crate::test_util::arrow_test_data(); + let filename = format!("{}/avro/alltypes_plain.avro", testdata); + let actual_schema = AvroFormat {} + .infer_schema(local_object_reader_stream(vec![filename])) + .await?; + + let mut fields = actual_schema.fields().clone(); + fields.push(Field::new("missing_col", DataType::Int32, true)); + + let file_schema = Arc::new(Schema::new(fields)); + + let avro_exec = AvroExec::new(FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_groups: vec![vec![local_unpartitioned_file(filename.clone())]], + file_schema, + statistics: Statistics::default(), + // Include the missing column in the projection + projection: Some(vec![0, 1, 2, file_schema.fields().len()]), + limit: None, + table_partition_cols: vec![], + }); + assert_eq!(avro_exec.output_partitioning().partition_count(), 1); + + let mut results = avro_exec.execute(0).await.expect("plan execution failed"); + let batch = results + .next() + .await + .expect("plan iterator empty") + .expect("plan iterator returned an error"); + + let expected = vec![ + "+----+----------+-------------+-------------+", + "| id | bool_col | tinyint_col | missing_col |", + "+----+----------+-------------+-------------+", + "| 4 | true | 0 | |", + "| 5 | false | 1 | |", + "| 6 | true | 0 | |", + "| 7 | false | 1 | |", + "| 2 | true | 0 | |", + "| 3 | false | 1 | |", + "| 0 | true | 0 | |", + "| 1 | false | 1 | |", + "+----+----------+-------------+-------------+", + ]; + + crate::assert_batches_eq!(expected, &[batch]); + + let batch = results.next().await; + assert!(batch.is_none()); + + let batch = results.next().await; + assert!(batch.is_none()); + + let batch = results.next().await; + assert!(batch.is_none()); + + Ok(()) + } + #[tokio::test] async fn avro_exec_with_partition() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index 5cff3b6c7296..4cf70f6e5cfd 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -170,6 +170,7 @@ impl ExecutionPlan for CsvExec { #[cfg(test)] mod tests { use super::*; + use crate::test_util::aggr_test_schema_with_missing_col; use crate::{ datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem}, scalar::ScalarValue, @@ -269,6 +270,52 @@ mod tests { Ok(()) } + #[tokio::test] + async fn csv_exec_with_missing_column() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); + let file_schema = aggr_test_schema_with_missing_col(); + let testdata = crate::test_util::arrow_test_data(); + let filename = "aggregate_test_100.csv"; + let path = format!("{}/csv/{}", testdata, filename); + let csv = CsvExec::new( + FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_schema, + file_groups: vec![vec![local_unpartitioned_file(path)]], + statistics: Statistics::default(), + projection: None, + limit: Some(5), + table_partition_cols: vec![], + }, + true, + b',', + ); + assert_eq!(14, csv.base_config.file_schema.fields().len()); + assert_eq!(14, csv.projected_schema.fields().len()); + assert_eq!(14, csv.schema().fields().len()); + + let mut it = csv.execute(0, runtime).await?; + let batch = it.next().await.unwrap()?; + assert_eq!(14, batch.num_columns()); + assert_eq!(5, batch.num_rows()); + + let expected = vec![ + "+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+", + "| c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | c9 | c10 | c11 | c12 | c13 | missing_col |", + "+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+", + "| c | 2 | 1 | 18109 | 2033001162 | -6513304855495910254 | 25 | 43062 | 1491205016 | 5863949479783605708 | 0.110830784 | 0.9294097332465232 | 6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW | |", + "| d | 5 | -40 | 22614 | 706441268 | -7542719935673075327 | 155 | 14337 | 3373581039 | 11720144131976083864 | 0.69632107 | 0.3114712539863804 | C2GT5KVyOPZpgKVl110TyZO0NcJ434 | |", + "| b | 1 | 29 | -18218 | 994303988 | 5983957848665088916 | 204 | 9489 | 3275293996 | 14857091259186476033 | 0.53840446 | 0.17909035118828576 | AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz | |", + "| a | 1 | -85 | -15154 | 1171968280 | 1919439543497968449 | 77 | 52286 | 774637006 | 12101411955859039553 | 0.12285209 | 0.6864391962767343 | 0keZ5G8BffGwgF2RwQD59TFzMStxCB | |", + "| b | 5 | -82 | 22080 | 1824882165 | 7373730676428214987 | 208 | 34331 | 3342719438 | 3330177516592499461 | 0.82634634 | 0.40975383525297016 | Ig1QcuKsjHXkproePdERo2w0mYzIqd | |", + "+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+", + ]; + + crate::assert_batches_eq!(expected, &[batch]); + + Ok(()) + } + #[tokio::test] async fn csv_exec_with_partition() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); diff --git a/datafusion/src/physical_plan/file_format/json.rs b/datafusion/src/physical_plan/file_format/json.rs index 0fc95d1e4933..ac413062caf8 100644 --- a/datafusion/src/physical_plan/file_format/json.rs +++ b/datafusion/src/physical_plan/file_format/json.rs @@ -137,6 +137,8 @@ impl ExecutionPlan for NdJsonExec { #[cfg(test)] mod tests { + use arrow::array::Array; + use arrow::datatypes::{Field, Schema}; use futures::StreamExt; use crate::datasource::{ @@ -211,6 +213,47 @@ mod tests { Ok(()) } + #[tokio::test] + async fn nd_json_exec_file_with_missing_column() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); + use arrow::datatypes::DataType; + let path = format!("{}/1.json", TEST_DATA_BASE); + + let actual_schema = infer_schema(path.clone()).await?; + + let mut fields = actual_schema.fields().clone(); + fields.push(Field::new("missing_col", DataType::Int32, true)); + let missing_field_idx = fields.len() - 1; + + let file_schema = Arc::new(Schema::new(fields)); + + let exec = NdJsonExec::new(FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_groups: vec![vec![local_unpartitioned_file(path.clone())]], + file_schema, + statistics: Statistics::default(), + projection: None, + limit: Some(3), + table_partition_cols: vec![], + }); + + let mut it = exec.execute(0, runtime).await?; + let batch = it.next().await.unwrap()?; + + assert_eq!(batch.num_rows(), 3); + let values = batch + .column(missing_field_idx) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(values.len(), 3); + assert!(values.is_null(0)); + assert!(values.is_null(1)); + assert!(values.is_null(2)); + + Ok(()) + } + #[tokio::test] async fn nd_json_exec_file_projection() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index b655cdb09dbb..7658addd3561 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -35,11 +35,15 @@ pub use avro::AvroExec; pub use csv::CsvExec; pub use json::NdJsonExec; +use crate::error::DataFusionError; use crate::{ datasource::{object_store::ObjectStore, PartitionedFile}, + error::Result, scalar::ScalarValue, }; +use arrow::array::new_null_array; use lazy_static::lazy_static; +use log::info; use std::{ collections::HashMap, fmt::{Display, Formatter, Result as FmtResult}, @@ -165,6 +169,87 @@ impl<'a> Display for FileGroupsDisplay<'a> { } } +/// A utility which can adapt file-level record batches to a table schema which may have a schema +/// obtained from merging multiple file-level schemas. +/// +/// This is useful for enabling schema evolution in partitioned datasets. +/// +/// This has to be done in two stages. +/// +/// 1. Before reading the file, we have to map projected column indexes from the table schema to +/// the file schema. +/// +/// 2. After reading a record batch we need to map the read columns back to the expected columns +/// indexes and insert null-valued columns wherever the file schema was missing a colum present +/// in the table schema. +#[derive(Clone, Debug)] +pub(crate) struct SchemaAdapter { + /// Schema for the table + table_schema: SchemaRef, +} + +impl SchemaAdapter { + pub(crate) fn new(table_schema: SchemaRef) -> SchemaAdapter { + Self { table_schema } + } + + /// Map projected column indexes to the file schema. This will fail if the table schema + /// and the file schema contain a field with the same name and different types. + pub fn map_projections( + &self, + file_schema: &Schema, + projections: &[usize], + ) -> Result> { + let mut mapped: Vec = vec![]; + for idx in projections { + let field = self.table_schema.field(*idx); + if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) { + if file_schema.field(mapped_idx).data_type() == field.data_type() { + mapped.push(mapped_idx) + } else { + let msg = format!("Failed to map column projection for field {}. Incompatible data types {:?} and {:?}", field.name(), file_schema.field(mapped_idx).data_type(), field.data_type()); + info!("{}", msg); + return Err(DataFusionError::Execution(msg)); + } + } + } + Ok(mapped) + } + + /// Re-order projected columns by index in record batch to match table schema column ordering. If the record + /// batch does not contain a column for an expected field, insert a null-valued column at the + /// required column index. + pub fn adapt_batch( + &self, + batch: RecordBatch, + projections: &[usize], + ) -> Result { + let batch_rows = batch.num_rows(); + + let batch_schema = batch.schema(); + + let mut cols: Vec = Vec::with_capacity(batch.columns().len()); + let batch_cols = batch.columns().to_vec(); + + for field_idx in projections { + let table_field = &self.table_schema.fields()[*field_idx]; + if let Some((batch_idx, _name)) = + batch_schema.column_with_name(table_field.name().as_str()) + { + cols.push(batch_cols[batch_idx].clone()); + } else { + cols.push(new_null_array(table_field.data_type(), batch_rows)) + } + } + + let projected_schema = Arc::new(self.table_schema.clone().project(projections)?); + + let merged_batch = RecordBatch::try_new(projected_schema, cols)?; + + Ok(merged_batch) + } +} + /// A helper that projects partition columns into the file record batches. /// /// One interesting trick is the usage of a cache for the key buffers of the partition column @@ -467,6 +552,61 @@ mod tests { crate::assert_batches_eq!(expected, &[projected_batch]); } + #[test] + fn schema_adapter_adapt_projections() { + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Int64, true), + Field::new("c3", DataType::Int8, true), + ])); + + let file_schema = Schema::new(vec![ + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Int64, true), + ]); + + let file_schema_2 = Arc::new(Schema::new(vec![ + Field::new("c3", DataType::Int8, true), + Field::new("c2", DataType::Int64, true), + ])); + + let file_schema_3 = + Arc::new(Schema::new(vec![Field::new("c3", DataType::Float32, true)])); + + let adapter = SchemaAdapter::new(table_schema); + + let projections1: Vec = vec![0, 1, 2]; + let projections2: Vec = vec![2]; + + let mapped = adapter + .map_projections(&file_schema, projections1.as_slice()) + .expect("mapping projections"); + + assert_eq!(mapped, vec![0, 1]); + + let mapped = adapter + .map_projections(&file_schema, projections2.as_slice()) + .expect("mapping projections"); + + assert!(mapped.is_empty()); + + let mapped = adapter + .map_projections(&file_schema_2, projections1.as_slice()) + .expect("mapping projections"); + + assert_eq!(mapped, vec![1, 0]); + + let mapped = adapter + .map_projections(&file_schema_2, projections2.as_slice()) + .expect("mapping projections"); + + assert_eq!(mapped, vec![0]); + + let mapped = adapter.map_projections(&file_schema_3, projections1.as_slice()); + + assert!(mapped.is_err()); + } + // sets default for configs that play no role in projections fn config_for_projection( file_schema: SchemaRef, diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 905bb1e28f9a..40acf5a51c17 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -44,14 +44,13 @@ use arrow::{ error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; -use log::{debug, info}; +use log::debug; use parquet::file::{ metadata::RowGroupMetaData, reader::{FileReader, SerializedFileReader}, statistics::Statistics as ParquetStatistics, }; -use arrow::array::new_null_array; use fmt::Debug; use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; @@ -61,6 +60,7 @@ use tokio::{ }; use crate::execution::runtime_env::RuntimeEnv; +use crate::physical_plan::file_format::SchemaAdapter; use async_trait::async_trait; use super::PartitionColumnProjector; @@ -215,11 +215,12 @@ impl ExecutionPlan for ParquetExec { &self.base_config.table_partition_cols, ); - let file_schema_ref = self.base_config().file_schema.clone(); + let adapter = SchemaAdapter::new(self.base_config.file_schema.clone()); + let join_handle = task::spawn_blocking(move || { if let Err(e) = read_partition( object_store.as_ref(), - file_schema_ref, + adapter, partition_index, &partition, metrics, @@ -420,33 +421,10 @@ fn build_row_group_predicate( } } -// Map projections from the schema which merges all file schemas to projections on a particular -// file -fn map_projections( - merged_schema: &Schema, - file_schema: &Schema, - projections: &[usize], -) -> Result> { - let mut mapped: Vec = vec![]; - for idx in projections { - let field = merged_schema.field(*idx); - if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) { - if file_schema.field(mapped_idx).data_type() == field.data_type() { - mapped.push(mapped_idx) - } else { - let msg = format!("Failed to map column projection for field {}. Incompatible data types {:?} and {:?}", field.name(), file_schema.field(mapped_idx).data_type(), field.data_type()); - info!("{}", msg); - return Err(DataFusionError::Execution(msg)); - } - } - } - Ok(mapped) -} - #[allow(clippy::too_many_arguments)] fn read_partition( object_store: &dyn ObjectStore, - file_schema: SchemaRef, + schema_adapter: SchemaAdapter, partition_index: usize, partition: &[PartitionedFile], metrics: ExecutionPlanMetricsSet, @@ -480,44 +458,20 @@ fn read_partition( } let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); - let mapped_projections = - map_projections(&file_schema, &arrow_reader.get_schema()?, projection)?; + let adapted_projections = + schema_adapter.map_projections(&arrow_reader.get_schema()?, projection)?; let mut batch_reader = - arrow_reader.get_record_reader_by_columns(mapped_projections, batch_size)?; + arrow_reader.get_record_reader_by_columns(adapted_projections, batch_size)?; loop { match batch_reader.next() { Some(Ok(batch)) => { - let total_cols = &file_schema.fields().len(); - let batch_rows = batch.num_rows(); total_rows += batch.num_rows(); - let batch_schema = batch.schema(); - - let mut cols: Vec = Vec::with_capacity(*total_cols); - let batch_cols = batch.columns().to_vec(); - - for field_idx in projection { - let merged_field = &file_schema.fields()[*field_idx]; - if let Some((batch_idx, _name)) = - batch_schema.column_with_name(merged_field.name().as_str()) - { - cols.push(batch_cols[batch_idx].clone()); - } else { - cols.push(new_null_array( - merged_field.data_type(), - batch_rows, - )) - } - } - - let projected_schema = file_schema.clone().project(projection)?; - - let merged_batch = - RecordBatch::try_new(Arc::new(projected_schema), cols)?; + let adapted_batch = schema_adapter.adapt_batch(batch, projection)?; let proj_batch = partition_column_projector - .project(merged_batch, &partitioned_file.partition_values); + .project(adapted_batch, &partitioned_file.partition_values); send_result(&response_tx, proj_batch)?; if limit.map(|l| total_rows >= l).unwrap_or(false) { diff --git a/datafusion/src/test_util.rs b/datafusion/src/test_util.rs index af6650361123..8ee0298f72ce 100644 --- a/datafusion/src/test_util.rs +++ b/datafusion/src/test_util.rs @@ -257,6 +257,32 @@ pub fn aggr_test_schema() -> SchemaRef { Arc::new(schema) } +/// Get the schema for the aggregate_test_* csv files with an additional filed not present in the files. +pub fn aggr_test_schema_with_missing_col() -> SchemaRef { + let mut f1 = Field::new("c1", DataType::Utf8, false); + f1.set_metadata(Some(BTreeMap::from_iter( + vec![("testing".into(), "test".into())].into_iter(), + ))); + let schema = Schema::new(vec![ + f1, + Field::new("c2", DataType::UInt32, false), + Field::new("c3", DataType::Int8, false), + Field::new("c4", DataType::Int16, false), + Field::new("c5", DataType::Int32, false), + Field::new("c6", DataType::Int64, false), + Field::new("c7", DataType::UInt8, false), + Field::new("c8", DataType::UInt16, false), + Field::new("c9", DataType::UInt32, false), + Field::new("c10", DataType::UInt64, false), + Field::new("c11", DataType::Float32, false), + Field::new("c12", DataType::Float64, false), + Field::new("c13", DataType::Utf8, false), + Field::new("missing_col", DataType::Int64, true), + ]); + + Arc::new(schema) +} + #[cfg(test)] mod tests { use super::*;