Skip to content

Commit

Permalink
Fix index out of bounds for stats on nested fields
Browse files Browse the repository at this point in the history
  • Loading branch information
andrei-ionescu committed Dec 2, 2021
1 parent 0338b73 commit e3b4d6a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 9 deletions.
11 changes: 8 additions & 3 deletions datafusion/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use parquet::file::statistics::Statistics as ParquetStatistics;
use super::FileFormat;
use super::PhysicalPlanConfig;
use crate::arrow::datatypes::{DataType, Field};
use crate::datasource::flatten_schema;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::datasource::{create_max_min_accs, get_col_stats};
use crate::error::DataFusionError;
Expand Down Expand Up @@ -123,7 +124,7 @@ impl FileFormat for ParquetFormat {
fn summarize_min_max(
max_values: &mut Vec<Option<MaxAccumulator>>,
min_values: &mut Vec<Option<MinAccumulator>>,
fields: &[Field],
fields: &Vec<&Field>,
i: usize,
stat: &ParquetStatistics,
) {
Expand Down Expand Up @@ -258,8 +259,9 @@ fn fetch_statistics(object_reader: Arc<dyn ObjectReader>) -> Result<Statistics>
let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let schema = arrow_reader.get_schema()?;
let num_fields = schema.fields().len();
let fields = schema.fields().to_vec();
let flat_schema = flatten_schema(&schema);
let num_fields = flat_schema.len();
let fields = flat_schema.clone();
let meta_data = arrow_reader.get_metadata();

let mut num_rows = 0;
Expand All @@ -278,6 +280,9 @@ fn fetch_statistics(object_reader: Arc<dyn ObjectReader>) -> Result<Statistics>
.iter()
.flat_map(|c| c.statistics().map(|stats| stats.null_count()));

let cols_vec: Vec<u64> = columns_null_counts.clone().collect();
let cols_vec_num = cols_vec.len();

for (i, cnt) in columns_null_counts.enumerate() {
null_counts[i] += cnt as usize
}
Expand Down
38 changes: 32 additions & 6 deletions datafusion/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod listing;
pub mod memory;
pub mod object_store;

use arrow::datatypes::{DataType, Field};
use futures::Stream;

pub use self::datasource::{TableProvider, TableType};
Expand All @@ -48,8 +49,9 @@ pub async fn get_statistics_with_limit(
) -> Result<(Vec<PartitionedFile>, Statistics)> {
let mut result_files = vec![];

let flat_schema = flatten_schema(&file_schema);
let mut total_byte_size = 0;
let mut null_counts = vec![0; file_schema.fields().len()];
let mut null_counts = vec![0; flat_schema.len()];
let mut has_statistics = false;
let (mut max_values, mut min_values) = create_max_min_accs(&file_schema);

Expand Down Expand Up @@ -160,13 +162,12 @@ impl std::fmt::Display for PartitionedFile {
fn create_max_min_accs(
schema: &Schema,
) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
let max_values: Vec<Option<MaxAccumulator>> = schema
.fields()
let flat_schema = flatten_schema(schema);
let max_values: Vec<Option<MaxAccumulator>> = flat_schema
.iter()
.map(|field| MaxAccumulator::try_new(field.data_type()).ok())
.collect::<Vec<_>>();
let min_values: Vec<Option<MinAccumulator>> = schema
.fields()
let min_values: Vec<Option<MinAccumulator>> = flat_schema
.iter()
.map(|field| MinAccumulator::try_new(field.data_type()).ok())
.collect::<Vec<_>>();
Expand All @@ -179,7 +180,8 @@ fn get_col_stats(
max_values: &mut Vec<Option<MaxAccumulator>>,
min_values: &mut Vec<Option<MinAccumulator>>,
) -> Vec<ColumnStatistics> {
(0..schema.fields().len())
let flat_schema = flatten_schema(schema);
(0..flat_schema.len())
.map(|i| {
let max_value = match &max_values[i] {
Some(max_value) => max_value.evaluate().ok(),
Expand All @@ -198,3 +200,27 @@ fn get_col_stats(
})
.collect()
}

fn flatten_schema(schema: &Schema) -> Vec<&Field> {
fn fetch_children(field: &Field) -> Vec<&Field> {
let mut collected_fields: Vec<&Field> = vec![];
let data_type = field.data_type();
match data_type {
DataType::Struct(fields) | DataType::Union(fields) => collected_fields
.extend(fields.iter().map(|f| fetch_children(f)).flatten()),
DataType::List(f)
| DataType::LargeList(f)
| DataType::FixedSizeList(f, _)
| DataType::Map(f, _) => collected_fields.extend(fetch_children(f)),
_ => collected_fields.push(field),
}
collected_fields
}
let top_level_fields = schema.fields();
let flatten = top_level_fields
.iter()
.map(|f| fetch_children(f))
.flatten()
.collect();
flatten
}

0 comments on commit e3b4d6a

Please sign in to comment.