diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index f4139b2f654d..ba2e1f7f41d9 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -717,13 +717,7 @@ impl ArrayReader for StructArrayReader { .children .iter_mut() .map(|reader| reader.next_batch(batch_size)) - .try_fold( - Vec::new(), - |mut result, child_array| -> Result> { - result.push(child_array?); - Ok(result) - }, - )?; + .collect::>>()?; // check that array child data has same size let children_array_len = diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 2836c699c39e..6026492590b2 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -38,7 +38,7 @@ use crate::data_type::{ Int96Type, }; use crate::errors::ParquetError::ArrowError; -use crate::errors::{Result}; +use crate::errors::{ParquetError, Result}; use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr}; use crate::schema::visitor::TypeVisitor; @@ -129,9 +129,10 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext let null_mask_only = match cur_type.get_basic_info().repetition() { Repetition::REPEATED => { - new_context.def_level += 1; - new_context.rep_level += 1; - false + return Err(ArrowError(format!( + "Reading repeated primitive ({:?}) is not supported yet!", + cur_type.name() + ))); } Repetition::OPTIONAL => { new_context.def_level += 1; @@ -143,19 +144,12 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext }; let reader = self.build_for_primitive_type_inner( - cur_type.clone(), + cur_type, &new_context, null_mask_only, )?; - if cur_type.get_basic_info().repetition() == Repetition::REPEATED { - Err(ArrowError(format!( - "Reading repeated field ({:?}) is not supported yet!", - cur_type.name() - ))) - } else { - Ok(Some(reader)) - } + Ok(Some(reader)) } else { Ok(None) } @@ -173,30 +167,19 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext if cur_type.get_basic_info().has_repetition() { match cur_type.get_basic_info().repetition() { Repetition::REPEATED => { - new_context.def_level += 1; - new_context.rep_level += 1; + return Err(ArrowError(format!( + "Reading repeated struct ({:?}) is not supported yet!", + cur_type.name(), + ))) } Repetition::OPTIONAL => { new_context.def_level += 1; } - _ => (), + Repetition::REQUIRED => {} } } - if let Some(reader) = self.build_for_struct_type_inner(&cur_type, &new_context)? { - if cur_type.get_basic_info().has_repetition() - && cur_type.get_basic_info().repetition() == Repetition::REPEATED - { - Err(ArrowError(format!( - "Reading repeated field ({:?}) is not supported yet!", - cur_type.name(), - ))) - } else { - Ok(Some(reader)) - } - } else { - Ok(None) - } + self.build_for_struct_type_inner(&cur_type, &new_context) } /// Build array reader for map type. @@ -208,42 +191,61 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext // Add map type to context let mut new_context = context.clone(); new_context.path.append(vec![map_type.name().to_string()]); - if let Repetition::OPTIONAL = map_type.get_basic_info().repetition() { - new_context.def_level += 1; + + match map_type.get_basic_info().repetition() { + Repetition::REQUIRED => {} + Repetition::OPTIONAL => { + new_context.def_level += 1; + } + Repetition::REPEATED => { + return Err(ArrowError("Map cannot be repeated".to_string())) + } + } + + if map_type.get_fields().len() != 1 { + return Err(ArrowError(format!( + "Map field must have exactly one key_value child, found {}", + map_type.get_fields().len() + ))); } // Add map entry (key_value) to context - let map_key_value = map_type.get_fields().first().ok_or_else(|| { - ArrowError("Map field must have a key_value entry".to_string()) - })?; + let map_key_value = &map_type.get_fields()[0]; + if map_key_value.get_basic_info().repetition() != Repetition::REPEATED { + return Err(ArrowError( + "Child of map field must be repeated".to_string(), + )); + } + new_context .path .append(vec![map_key_value.name().to_string()]); + new_context.rep_level += 1; + new_context.def_level += 1; + + if map_key_value.get_fields().len() != 2 { + // According to the specification the values are optional (#1642) + return Err(ArrowError(format!( + "Child of map field must have two children, found {}", + map_key_value.get_fields().len() + ))); + } // Get key and value, and create context for each - let map_key = map_key_value - .get_fields() - .first() - .ok_or_else(|| ArrowError("Map entry must have a key".to_string()))?; - let map_value = map_key_value - .get_fields() - .get(1) - .ok_or_else(|| ArrowError("Map entry must have a value".to_string()))?; - - let key_reader = { - let mut key_context = new_context.clone(); - key_context.def_level += 1; - key_context.path.append(vec![map_key.name().to_string()]); - self.dispatch(map_key.clone(), &key_context)?.unwrap() - }; - let value_reader = { - let mut value_context = new_context.clone(); - if let Repetition::OPTIONAL = map_value.get_basic_info().repetition() { - value_context.def_level += 1; - } - self.dispatch(map_value.clone(), &value_context)?.unwrap() - }; + let map_key = &map_key_value.get_fields()[0]; + let map_value = &map_key_value.get_fields()[1]; + + if map_key.get_basic_info().repetition() != Repetition::REQUIRED { + return Err(ArrowError("Map keys must be required".to_string())); + } + + if map_value.get_basic_info().repetition() == Repetition::REPEATED { + return Err(ArrowError("Map values cannot be repeated".to_string())); + } + + let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap(); + let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap(); let arrow_type = self .arrow_schema @@ -295,96 +297,80 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext .first() .ok_or_else(|| ArrowError("List field must have a child.".to_string()))? .clone(); - let mut new_context = context.clone(); + // If the list can contain nulls + let nullable = match list_type.get_basic_info().repetition() { + Repetition::REQUIRED => false, + Repetition::OPTIONAL => true, + Repetition::REPEATED => { + return Err(general_err!("List type cannot be repeated")) + } + }; + + let mut new_context = context.clone(); new_context.path.append(vec![list_type.name().to_string()]); - // We need to know at what definition a list or its child is null - let list_null_def = new_context.def_level; - let mut list_empty_def = new_context.def_level; + + // The repeated field + new_context.rep_level += 1; + new_context.def_level += 1; // If the list's root is nullable - if let Repetition::OPTIONAL = list_type.get_basic_info().repetition() { + if nullable { new_context.def_level += 1; - // current level is nullable, increment to get level for empty list slot - list_empty_def += 1; } - match list_child.get_basic_info().repetition() { - Repetition::REPEATED => { - new_context.def_level += 1; - new_context.rep_level += 1; - } - Repetition::OPTIONAL => { - new_context.def_level += 1; - } - _ => (), - } + match self.dispatch(item_type, &new_context) { + Ok(Some(item_reader)) => { + let item_type = item_reader.get_data_type().clone(); + + // a list is a group type with a single child. The list child's + // name comes from the child's field name. + // if the child's name is "list" and it has a child, then use this child + if list_child.name() == "list" && !list_child.get_fields().is_empty() { + list_child = list_child.get_fields().first().unwrap(); + } - let reader = self.dispatch(item_type.clone(), &new_context); - if let Ok(Some(item_reader)) = reader { - let item_reader_type = item_reader.get_data_type().clone(); - - match item_reader_type { - ArrowType::List(_) - | ArrowType::FixedSizeList(_, _) - | ArrowType::Dictionary(_, _) => Err(ArrowError(format!( - "reading List({:?}) into arrow not supported yet", - item_type - ))), - _ => { - // a list is a group type with a single child. The list child's - // name comes from the child's field name. - // if the child's name is "list" and it has a child, then use this child - if list_child.name() == "list" && !list_child.get_fields().is_empty() - { - list_child = list_child.get_fields().first().unwrap(); + let arrow_type = self + .arrow_schema + .field_with_name(list_type.name()) + .ok() + .map(|f| f.data_type().to_owned()) + .unwrap_or_else(|| { + ArrowType::List(Box::new(Field::new( + list_child.name(), + item_type.clone(), + list_child.is_optional(), + ))) + }); + + let list_array_reader: Box = match arrow_type { + ArrowType::List(_) => Box::new(ListArrayReader::::new( + item_reader, + arrow_type, + item_type, + new_context.def_level, + new_context.rep_level, + nullable, + )), + ArrowType::LargeList(_) => Box::new(ListArrayReader::::new( + item_reader, + arrow_type, + item_type, + new_context.def_level, + new_context.rep_level, + nullable, + )), + _ => { + return Err(ArrowError(format!( + "creating ListArrayReader with type {:?} should be unreachable", + arrow_type + ))) } - let arrow_type = self - .arrow_schema - .field_with_name(list_type.name()) - .ok() - .map(|f| f.data_type().to_owned()) - .unwrap_or_else(|| { - ArrowType::List(Box::new(Field::new( - list_child.name(), - item_reader_type.clone(), - list_child.is_optional(), - ))) - }); - - let list_array_reader: Box = match arrow_type { - ArrowType::List(_) => Box::new(ListArrayReader::::new( - item_reader, - arrow_type, - item_reader_type, - new_context.def_level, - new_context.rep_level, - list_null_def, - list_empty_def, - )), - ArrowType::LargeList(_) => Box::new(ListArrayReader::::new( - item_reader, - arrow_type, - item_reader_type, - new_context.def_level, - new_context.rep_level, - list_null_def, - list_empty_def, - )), - - _ => { - return Err(ArrowError(format!( - "creating ListArrayReader with type {:?} should be unreachable", - arrow_type - ))) - } - }; + }; - Ok(Some(list_array_reader)) - } + Ok(Some(list_array_reader)) } - } else { - reader + result => result, } } } @@ -637,10 +623,10 @@ impl<'a> ArrayReaderBuilder { let mut children_reader = Vec::with_capacity(cur_type.get_fields().len()); for child in cur_type.get_fields() { - let mut struct_context = context.clone(); if let Some(child_reader) = self.dispatch(child.clone(), context)? { // TODO: this results in calling get_arrow_field twice, it could be reused // from child_reader above, by making child_reader carry its `Field` + let mut struct_context = context.clone(); struct_context.path.append(vec![child.name().to_string()]); let field = match self.get_arrow_field(child, &struct_context) { Some(f) => f.clone(), diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 31c52c9c5bf9..611c2cbea8e8 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -16,17 +16,17 @@ // under the License. use crate::arrow::array_reader::ArrayReader; -use crate::errors::ParquetError::ArrowError; +use crate::errors::ParquetError; use crate::errors::Result; use arrow::array::{ - new_empty_array, ArrayData, ArrayRef, GenericListArray, - OffsetSizeTrait, UInt32Array, + new_empty_array, Array, ArrayData, ArrayRef, BooleanBufferBuilder, GenericListArray, + MutableArrayData, OffsetSizeTrait, }; -use arrow::buffer::{Buffer, MutableBuffer}; +use arrow::buffer::Buffer; use arrow::datatypes::DataType as ArrowType; use arrow::datatypes::ToByteSlice; -use arrow::util::bit_util; use std::any::Any; +use std::cmp::Ordering; use std::marker::PhantomData; use std::sync::Arc; @@ -35,12 +35,12 @@ pub struct ListArrayReader { item_reader: Box, data_type: ArrowType, item_type: ArrowType, - list_def_level: i16, - list_rep_level: i16, - list_empty_def_level: i16, - list_null_def_level: i16, - def_level_buffer: Option, - rep_level_buffer: Option, + // The definition level at which this list is not null + def_level: i16, + // The repetition level that corresponds to a new value in this array + rep_level: i16, + // If this list is nullable + nullable: bool, _marker: PhantomData, } @@ -52,19 +52,15 @@ impl ListArrayReader { item_type: ArrowType, def_level: i16, rep_level: i16, - list_null_def_level: i16, - list_empty_def_level: i16, + nullable: bool, ) -> Self { Self { item_reader, data_type, item_type, - list_def_level: def_level, - list_rep_level: rep_level, - list_null_def_level, - list_empty_def_level, - def_level_buffer: None, - rep_level_buffer: None, + def_level, + rep_level, + nullable, _marker: PhantomData, } } @@ -88,97 +84,147 @@ impl ArrayReader for ListArrayReader { if next_batch_array.len() == 0 { return Ok(new_empty_array(&self.data_type)); } + let def_levels = self .item_reader .get_def_levels() - .ok_or_else(|| ArrowError("item_reader def levels are None.".to_string()))?; + .ok_or_else(|| general_err!("item_reader def levels are None."))?; + let rep_levels = self .item_reader .get_rep_levels() - .ok_or_else(|| ArrowError("item_reader rep levels are None.".to_string()))?; + .ok_or_else(|| general_err!("item_reader rep levels are None."))?; - if !((def_levels.len() == rep_levels.len()) - && (rep_levels.len() == next_batch_array.len())) - { - return Err(ArrowError( - format!("Expected item_reader def_levels {} and rep_levels {} to be same length as batch {}", def_levels.len(), rep_levels.len(), next_batch_array.len()), + if OffsetSize::from_usize(next_batch_array.len()).is_none() { + return Err(general_err!( + "offset of {} would overflow list array", + next_batch_array.len() )); } - // List definitions can be encoded as 4 values: - // - n + 0: the list slot is null - // - n + 1: the list slot is not null, but is empty (i.e. []) - // - n + 2: the list slot is not null, but its child is empty (i.e. [ null ]) - // - n + 3: the list slot is not null, and its child is not empty - // Where n is the max definition level of the list's parent. - // If a Parquet schema's only leaf is the list, then n = 0. - - // If the list index is at empty definition, the child slot is null - let non_null_list_indices = - def_levels.iter().enumerate().filter_map(|(index, def)| { - (*def > self.list_empty_def_level).then(|| index as u32) - }); - let indices = UInt32Array::from_iter_values(non_null_list_indices); - let batch_values = - arrow::compute::take(&*next_batch_array.clone(), &indices, None)?; - - // first item in each list has rep_level = 0, subsequent items have rep_level = 1 - let mut offsets: Vec = Vec::new(); - let mut cur_offset = OffsetSize::zero(); - def_levels.iter().zip(rep_levels).for_each(|(d, r)| { - if *r == 0 || d == &self.list_empty_def_level { - offsets.push(cur_offset); - } - if d > &self.list_empty_def_level { - cur_offset += OffsetSize::one(); - } - }); - - offsets.push(cur_offset); - - let num_bytes = bit_util::ceil(offsets.len(), 8); - // TODO: A useful optimization is to use the null count to fill with - // 0 or null, to reduce individual bits set in a loop. - // To favour dense data, set every slot to true, then unset - let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); - let null_slice = null_buf.as_slice_mut(); - let mut list_index = 0; - for i in 0..rep_levels.len() { - // If the level is lower than empty, then the slot is null. - // When a list is non-nullable, its empty level = null level, - // so this automatically factors that in. - if rep_levels[i] == 0 && def_levels[i] < self.list_empty_def_level { - bit_util::unset_bit(null_slice, list_index); + // A non-nullable list has a single definition level indicating if the list is empty + // + // A nullable list has two definition levels associated with it: + // + // The first identifies if the list is null + // The second identifies if the list is empty + + // Whilst nulls may have a non-zero slice in the offsets array, an empty slice + // must not, we must therefore filter out these empty slices + + // The offset of the current element being considered + let mut cur_offset = 0; + + // The offsets identifying the list start and end offsets + let mut list_offsets: Vec = + Vec::with_capacity(next_batch_array.len()); + + // The validity mask of the final list + let mut validity = self + .nullable + .then(|| BooleanBufferBuilder::new(next_batch_array.len())); + + // The position of the current slice of child data not corresponding to empty lists + let mut cur_start_offset = None; + + // The number of child values skipped due to empty lists + let mut skipped = 0; + + // Builder used to construct child data, skipping empty lists + let mut child_data_builder = MutableArrayData::new( + vec![next_batch_array.data()], + false, + next_batch_array.len(), + ); + + def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| { + match r.cmp(&self.rep_level) { + Ordering::Greater => { + // Repetition level greater than current => already handled by inner array + if *d < self.def_level { + return Err(general_err!( + "Encountered repetition level too large for definition level" + )); + } + } + Ordering::Equal => { + // New value in the current list + cur_offset += 1; + } + Ordering::Less => { + // Create new array slice + list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap()); + + if *d + 1 == self.def_level { + // Empty list + if let Some(start) = cur_start_offset.take() { + child_data_builder.extend( + 0, + start + skipped, + cur_offset + skipped, + ); + } + + if let Some(validity) = validity.as_mut() { + validity.append(true) + } + + skipped += 1; + } else { + cur_start_offset.get_or_insert(cur_offset); + cur_offset += 1; + + if let Some(validity) = validity.as_mut() { + validity.append(*d >= self.def_level) + } + } + } } - if rep_levels[i] == 0 { - list_index += 1; + Ok(()) + })?; + + list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap()); + + let child_data = if skipped == 0 { + // No empty lists - can reuse original array + next_batch_array.data().clone() + } else { + // One or more empty lists - must build new array + if let Some(start) = cur_start_offset.take() { + child_data_builder.extend(0, start + skipped, cur_offset + skipped) } + + child_data_builder.freeze() + }; + + if cur_offset != child_data.len() { + return Err(general_err!("Failed to reconstruct list from level data")); } - let value_offsets = Buffer::from(&offsets.to_byte_slice()); - let list_data = ArrayData::builder(self.get_data_type().clone()) - .len(offsets.len() - 1) + let value_offsets = Buffer::from(&list_offsets.to_byte_slice()); + + let mut data_builder = ArrayData::builder(self.get_data_type().clone()) + .len(list_offsets.len() - 1) .add_buffer(value_offsets) - .add_child_data(batch_values.data().clone()) - .null_bit_buffer(null_buf.into()) - .offset(next_batch_array.offset()); + .add_child_data(child_data); + + if let Some(mut builder) = validity { + assert_eq!(builder.len(), list_offsets.len() - 1); + data_builder = data_builder.null_bit_buffer(builder.finish()) + } - let list_data = unsafe { list_data.build_unchecked() }; + let list_data = unsafe { data_builder.build_unchecked() }; let result_array = GenericListArray::::from(list_data); Ok(Arc::new(result_array)) } fn get_def_levels(&self) -> Option<&[i16]> { - self.def_level_buffer - .as_ref() - .map(|buf| unsafe { buf.typed_data() }) + self.item_reader.get_def_levels() } fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_level_buffer - .as_ref() - .map(|buf| unsafe { buf.typed_data() }) + self.item_reader.get_rep_levels() } } @@ -193,18 +239,18 @@ mod tests { use crate::file::reader::{FileReader, SerializedFileReader}; use crate::schema::parser::parse_message_type; use crate::schema::types::SchemaDescriptor; - use arrow::array::{Array, LargeListArray, ListArray, PrimitiveArray}; - use arrow::datatypes::{Field, Int32Type as ArrowInt32}; + use arrow::array::{Array, PrimitiveArray}; + use arrow::datatypes::{Field, Int32Type as ArrowInt32, Int32Type}; use std::sync::Arc; - #[test] - fn test_list_array_reader() { - // [[1, null, 2], null, [3, 4]] + fn test_list_array() { + // [[1, null, 2], null, [], [3, 4]] let array = Arc::new(PrimitiveArray::::from(vec![ Some(1), None, Some(2), None, + None, Some(3), Some(4), ])); @@ -212,103 +258,49 @@ mod tests { let item_array_reader = InMemoryArrayReader::new( ArrowType::Int32, array, - Some(vec![3, 2, 3, 0, 3, 3]), - Some(vec![0, 1, 1, 0, 0, 1]), - ); - - let mut list_array_reader = ListArrayReader::::new( - Box::new(item_array_reader), - ArrowType::List(Box::new(Field::new("item", ArrowType::Int32, true))), - ArrowType::Int32, - 1, - 1, - 0, - 1, - ); - - let next_batch = list_array_reader.next_batch(1024).unwrap(); - let list_array = next_batch.as_any().downcast_ref::().unwrap(); - - assert_eq!(3, list_array.len()); - // This passes as I expect - assert_eq!(1, list_array.null_count()); - - assert_eq!( - list_array - .value(0) - .as_any() - .downcast_ref::>() - .unwrap(), - &PrimitiveArray::::from(vec![Some(1), None, Some(2)]) + Some(vec![3, 2, 3, 0, 1, 3, 3]), + Some(vec![0, 1, 1, 0, 0, 0, 1]), ); - assert!(list_array.is_null(1)); + let field = Box::new(Field::new("item", ArrowType::Int32, true)); + let data_type = match OffsetSize::is_large() { + true => ArrowType::LargeList(field), + false => ArrowType::List(field), + }; - assert_eq!( - list_array - .value(2) - .as_any() - .downcast_ref::>() - .unwrap(), - &PrimitiveArray::::from(vec![Some(3), Some(4)]) - ); - } - - #[test] - fn test_large_list_array_reader() { - // [[1, null, 2], null, [3, 4]] - let array = Arc::new(PrimitiveArray::::from(vec![ - Some(1), - None, - Some(2), - None, - Some(3), - Some(4), - ])); - let item_array_reader = InMemoryArrayReader::new( - ArrowType::Int32, - array, - Some(vec![3, 2, 3, 0, 3, 3]), - Some(vec![0, 1, 1, 0, 0, 1]), - ); - - let mut list_array_reader = ListArrayReader::::new( + let mut list_array_reader = ListArrayReader::::new( Box::new(item_array_reader), - ArrowType::LargeList(Box::new(Field::new("item", ArrowType::Int32, true))), + data_type, ArrowType::Int32, + 2, 1, - 1, - 0, - 1, + true, ); let next_batch = list_array_reader.next_batch(1024).unwrap(); let list_array = next_batch .as_any() - .downcast_ref::() + .downcast_ref::>() .unwrap(); - assert_eq!(3, list_array.len()); - - assert_eq!( - list_array - .value(0) - .as_any() - .downcast_ref::>() - .unwrap(), - &PrimitiveArray::::from(vec![Some(1), None, Some(2)]) - ); + let expected = + GenericListArray::::from_iter_primitive::(vec![ + Some(vec![Some(1), None, Some(2)]), + None, + Some(vec![]), + Some(vec![Some(3), Some(4)]), + ]); + assert_eq!(&expected, list_array) + } - assert!(list_array.is_null(1)); + #[test] + fn test_list_array_reader() { + test_list_array::(); + } - assert_eq!( - list_array - .value(2) - .as_any() - .downcast_ref::>() - .unwrap(), - &PrimitiveArray::::from(vec![Some(3), Some(4)]) - ); + #[test] + fn test_large_list_array_reader() { + test_list_array::() } #[test] diff --git a/parquet/src/arrow/array_reader/test_util.rs b/parquet/src/arrow/array_reader/test_util.rs index f212d05b0712..94fee8eeee8f 100644 --- a/parquet/src/arrow/array_reader/test_util.rs +++ b/parquet/src/arrow/array_reader/test_util.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::ArrayRef; +use arrow::array::{Array, ArrayRef}; use arrow::datatypes::DataType as ArrowType; use std::any::Any; use std::sync::Arc; @@ -111,6 +111,16 @@ impl InMemoryArrayReader { def_levels: Option>, rep_levels: Option>, ) -> Self { + assert!(def_levels + .as_ref() + .map(|d| d.len() == array.len()) + .unwrap_or(true)); + + assert!(rep_levels + .as_ref() + .map(|r| r.len() == array.len()) + .unwrap_or(true)); + Self { data_type, array, diff --git a/parquet/src/schema/visitor.rs b/parquet/src/schema/visitor.rs index 8ed079fb4237..d1cd96fbb0a9 100644 --- a/parquet/src/schema/visitor.rs +++ b/parquet/src/schema/visitor.rs @@ -27,17 +27,30 @@ pub trait TypeVisitor { /// Default implementation when visiting a list. /// - /// It checks list type definition and calls `visit_list_with_item` with extracted + /// It checks list type definition and calls [`visit_list_with_item`] with extracted /// item type. /// /// To fully understand this algorithm, please refer to /// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md). + /// + /// For example, a standard list type looks like: + /// + /// ```ignore + /// required/optional group my_list (LIST) { + // repeated group list { + // required/optional binary element (UTF8); + // } + // } + /// ``` + /// + /// In such a case, [`visit_list_with_item`] will be called with `my_list` as the list + /// type, and `element` as the `item_type` + /// fn visit_list(&mut self, list_type: TypePtr, context: C) -> Result { match list_type.as_ref() { - Type::PrimitiveType { .. } => panic!( - "{:?} is a list type and can't be processed as primitive.", - list_type - ), + Type::PrimitiveType { .. } => { + panic!("{:?} is a list type and must be a group type", list_type) + } Type::GroupType { basic_info: _, fields,