Skip to content

Commit ab48e69

Browse files
tustvoldalamb
andauthored
Extract method to drive PageIterator -> RecordReader (#1031)
Co-authored-by: Andrew Lamb <[email protected]>
1 parent 07660c6 commit ab48e69

File tree

1 file changed

+37
-51
lines changed

1 file changed

+37
-51
lines changed

parquet/src/arrow/array_reader.rs

+37-51
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,36 @@ pub trait ArrayReader {
100100
fn get_rep_levels(&self) -> Option<&[i16]>;
101101
}
102102

103+
/// Uses `record_reader` to read up to `batch_size` records from `pages`
104+
///
105+
/// Returns the number of records read, which can be less than batch_size if
106+
/// pages is exhausted.
107+
fn read_records<T: DataType>(
108+
record_reader: &mut RecordReader<T>,
109+
pages: &mut dyn PageIterator,
110+
batch_size: usize,
111+
) -> Result<usize> {
112+
let mut records_read = 0usize;
113+
while records_read < batch_size {
114+
let records_to_read = batch_size - records_read;
115+
116+
let records_read_once = record_reader.read_records(records_to_read)?;
117+
records_read += records_read_once;
118+
119+
// Record reader exhausted
120+
if records_read_once < records_to_read {
121+
if let Some(page_reader) = pages.next() {
122+
// Read from new page reader (i.e. column chunk)
123+
record_reader.set_page_reader(page_reader?)?;
124+
} else {
125+
// Page reader also exhausted
126+
break;
127+
}
128+
}
129+
}
130+
Ok(records_read)
131+
}
132+
103133
/// A NullArrayReader reads Parquet columns stored as null int32s with an Arrow
104134
/// NullArray type.
105135
pub struct NullArrayReader<T: DataType> {
@@ -114,14 +144,8 @@ pub struct NullArrayReader<T: DataType> {
114144

115145
impl<T: DataType> NullArrayReader<T> {
116146
/// Construct null array reader.
117-
pub fn new(
118-
mut pages: Box<dyn PageIterator>,
119-
column_desc: ColumnDescPtr,
120-
) -> Result<Self> {
121-
let mut record_reader = RecordReader::<T>::new(column_desc.clone());
122-
if let Some(page_reader) = pages.next() {
123-
record_reader.set_page_reader(page_reader?)?;
124-
}
147+
pub fn new(pages: Box<dyn PageIterator>, column_desc: ColumnDescPtr) -> Result<Self> {
148+
let record_reader = RecordReader::<T>::new(column_desc.clone());
125149

126150
Ok(Self {
127151
data_type: ArrowType::Null,
@@ -148,25 +172,8 @@ impl<T: DataType> ArrayReader for NullArrayReader<T> {
148172

149173
/// Reads at most `batch_size` records into array.
150174
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
151-
let mut records_read = 0usize;
152-
while records_read < batch_size {
153-
let records_to_read = batch_size - records_read;
154-
155-
// NB can be 0 if at end of page
156-
let records_read_once = self.record_reader.read_records(records_to_read)?;
157-
records_read += records_read_once;
158-
159-
// Record reader exhausted
160-
if records_read_once < records_to_read {
161-
if let Some(page_reader) = self.pages.next() {
162-
// Read from new page reader
163-
self.record_reader.set_page_reader(page_reader?)?;
164-
} else {
165-
// Page reader also exhausted
166-
break;
167-
}
168-
}
169-
}
175+
let records_read =
176+
read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?;
170177

171178
// convert to arrays
172179
let array = arrow::array::NullArray::new(records_read);
@@ -206,7 +213,7 @@ pub struct PrimitiveArrayReader<T: DataType> {
206213
impl<T: DataType> PrimitiveArrayReader<T> {
207214
/// Construct primitive array reader.
208215
pub fn new(
209-
mut pages: Box<dyn PageIterator>,
216+
pages: Box<dyn PageIterator>,
210217
column_desc: ColumnDescPtr,
211218
arrow_type: Option<ArrowType>,
212219
) -> Result<Self> {
@@ -218,10 +225,7 @@ impl<T: DataType> PrimitiveArrayReader<T> {
218225
.clone(),
219226
};
220227

221-
let mut record_reader = RecordReader::<T>::new(column_desc.clone());
222-
if let Some(page_reader) = pages.next() {
223-
record_reader.set_page_reader(page_reader?)?;
224-
}
228+
let record_reader = RecordReader::<T>::new(column_desc.clone());
225229

226230
Ok(Self {
227231
data_type,
@@ -248,25 +252,7 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
248252

249253
/// Reads at most `batch_size` records into array.
250254
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
251-
let mut records_read = 0usize;
252-
while records_read < batch_size {
253-
let records_to_read = batch_size - records_read;
254-
255-
// NB can be 0 if at end of page
256-
let records_read_once = self.record_reader.read_records(records_to_read)?;
257-
records_read += records_read_once;
258-
259-
// Record reader exhausted
260-
if records_read_once < records_to_read {
261-
if let Some(page_reader) = self.pages.next() {
262-
// Read from new page reader
263-
self.record_reader.set_page_reader(page_reader?)?;
264-
} else {
265-
// Page reader also exhausted
266-
break;
267-
}
268-
}
269-
}
255+
read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?;
270256

271257
let target_type = self.get_data_type().clone();
272258
let arrow_data_type = match T::get_physical_type() {

0 commit comments

Comments
 (0)