Skip to content

Commit

Permalink
Add metadata_size_hint for optimistic fetching of parquet metadata (#…
Browse files Browse the repository at this point in the history
…2946)

* Add metadata_size_hint for optimistic fetching of parquet metadata

* Formatting

* Update datafusion/core/src/datasource/file_format/parquet.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Update datafusion/core/src/datasource/file_format/parquet.rs

Co-authored-by: Andrew Lamb <[email protected]>

* PR comments: Guard against size_hint larger than file size and verify request counts in unit test

* Update datafusion/core/src/datasource/file_format/parquet.rs

Co-authored-by: Andrew Lamb <[email protected]>

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
thinkharderdev and alamb authored Jul 21, 2022
1 parent b49093c commit 834924f
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 24 deletions.
262 changes: 247 additions & 15 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use datafusion_common::DataFusionError;
use hashbrown::HashMap;
use object_store::{ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -52,12 +53,14 @@ pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";
#[derive(Debug)]
pub struct ParquetFormat {
enable_pruning: bool,
metadata_size_hint: Option<usize>,
}

impl Default for ParquetFormat {
fn default() -> Self {
Self {
enable_pruning: true,
metadata_size_hint: None,
}
}
}
Expand All @@ -69,10 +72,24 @@ impl ParquetFormat {
self.enable_pruning = enable;
self
}

/// Provide a hint to the size of the file metadata. If a hint is provided
/// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
/// With out a hint, two read are required. One read to fetch the 8-byte parquet footer and then
/// another read to fetch the metadata length encoded in the footer.
pub fn with_metadata_size_hint(mut self, size_hint: usize) -> Self {
self.metadata_size_hint = Some(size_hint);
self
}
/// Return true if pruning is enabled
pub fn enable_pruning(&self) -> bool {
self.enable_pruning
}

/// Return the metadata size hint if set
pub fn metadata_size_hint(&self) -> Option<usize> {
self.metadata_size_hint
}
}

#[async_trait]
Expand All @@ -88,7 +105,8 @@ impl FileFormat for ParquetFormat {
) -> Result<SchemaRef> {
let mut schemas = Vec::with_capacity(objects.len());
for object in objects {
let schema = fetch_schema(store.as_ref(), object).await?;
let schema =
fetch_schema(store.as_ref(), object, self.metadata_size_hint).await?;
schemas.push(schema)
}
let schema = Schema::try_merge(schemas)?;
Expand All @@ -101,7 +119,13 @@ impl FileFormat for ParquetFormat {
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<Statistics> {
let stats = fetch_statistics(store.as_ref(), table_schema, object).await?;
let stats = fetch_statistics(
store.as_ref(),
table_schema,
object,
self.metadata_size_hint,
)
.await?;
Ok(stats)
}

Expand All @@ -119,7 +143,11 @@ impl FileFormat for ParquetFormat {
None
};

Ok(Arc::new(ParquetExec::new(conf, predicate)))
Ok(Arc::new(ParquetExec::new(
conf,
predicate,
self.metadata_size_hint(),
)))
}
}

Expand Down Expand Up @@ -290,6 +318,7 @@ fn summarize_min_max(
pub(crate) async fn fetch_parquet_metadata(
store: &dyn ObjectStore,
meta: &ObjectMeta,
size_hint: Option<usize>,
) -> Result<ParquetMetaData> {
if meta.size < 8 {
return Err(DataFusionError::Execution(format!(
Expand All @@ -298,13 +327,22 @@ pub(crate) async fn fetch_parquet_metadata(
)));
}

let footer_start = meta.size - 8;
// If a size hint is provided, read more than the minimum size
// to try and avoid a second fetch.
let footer_start = if let Some(size_hint) = size_hint {
meta.size.saturating_sub(size_hint)
} else {
meta.size - 8
};

let suffix = store
.get_range(&meta.location, footer_start..meta.size)
.await?;

let suffix_len = suffix.len();

let mut footer = [0; 8];
footer.copy_from_slice(suffix.as_ref());
footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);

let length = decode_footer(&footer)?;

Expand All @@ -316,17 +354,35 @@ pub(crate) async fn fetch_parquet_metadata(
)));
}

let metadata_start = meta.size - length - 8;
let metadata = store
.get_range(&meta.location, metadata_start..footer_start)
.await?;
// Did not fetch the entire file metadata in the initial read, need to make a second request
if length > suffix_len - 8 {
let metadata_start = meta.size - length - 8;
let remaining_metadata = store
.get_range(&meta.location, metadata_start..footer_start)
.await?;

let mut metadata = BytesMut::with_capacity(length);

Ok(decode_metadata(metadata.as_ref())?)
metadata.put(remaining_metadata.as_ref());
metadata.put(&suffix[..suffix_len - 8]);

Ok(decode_metadata(metadata.as_ref())?)
} else {
let metadata_start = meta.size - length - 8;

Ok(decode_metadata(
&suffix[metadata_start - footer_start..suffix_len - 8],
)?)
}
}

/// Read and parse the schema of the Parquet file at location `path`
async fn fetch_schema(store: &dyn ObjectStore, file: &ObjectMeta) -> Result<Schema> {
let metadata = fetch_parquet_metadata(store, file).await?;
async fn fetch_schema(
store: &dyn ObjectStore,
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
) -> Result<Schema> {
let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
let file_metadata = metadata.file_metadata();
let schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
Expand All @@ -340,8 +396,9 @@ async fn fetch_statistics(
store: &dyn ObjectStore,
table_schema: SchemaRef,
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
) -> Result<Statistics> {
let metadata = fetch_parquet_metadata(store, file).await?;
let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
let file_metadata = metadata.file_metadata();

let file_schema = parquet_to_arrow_schema(
Expand Down Expand Up @@ -458,6 +515,9 @@ pub(crate) mod test_util {
mod tests {
use super::super::test_util::scan_format;
use crate::physical_plan::collect;
use std::fmt::{Display, Formatter};
use std::ops::Range;
use std::sync::atomic::{AtomicUsize, Ordering};

use super::*;

Expand All @@ -469,9 +529,14 @@ mod tests {
StringArray, TimestampNanosecondArray,
};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion_common::ScalarValue;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{GetResult, ListResult};

#[tokio::test]
async fn read_merged_batches() -> Result<()> {
Expand All @@ -489,15 +554,16 @@ mod tests {
let format = ParquetFormat::default();
let schema = format.infer_schema(&store, &meta).await.unwrap();

let stats = fetch_statistics(store.as_ref(), schema.clone(), &meta[0]).await?;
let stats =
fetch_statistics(store.as_ref(), schema.clone(), &meta[0], None).await?;

assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
assert_eq!(c1_stats.null_count, Some(1));
assert_eq!(c2_stats.null_count, Some(3));

let stats = fetch_statistics(store.as_ref(), schema, &meta[1]).await?;
let stats = fetch_statistics(store.as_ref(), schema, &meta[1], None).await?;
assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
Expand All @@ -509,6 +575,172 @@ mod tests {
Ok(())
}

#[derive(Debug)]
struct RequestCountingObjectStore {
inner: Arc<dyn ObjectStore>,
request_count: AtomicUsize,
}

impl Display for RequestCountingObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "RequestCounting({})", self.inner)
}
}

impl RequestCountingObjectStore {
pub fn new(inner: Arc<dyn ObjectStore>) -> Self {
Self {
inner,
request_count: Default::default(),
}
}

pub fn request_count(&self) -> usize {
self.request_count.load(Ordering::SeqCst)
}

pub fn upcast(self: &Arc<Self>) -> Arc<dyn ObjectStore> {
self.clone()
}
}

#[async_trait]
impl ObjectStore for RequestCountingObjectStore {
async fn put(&self, _location: &Path, _bytes: Bytes) -> object_store::Result<()> {
Err(object_store::Error::NotImplemented)
}

async fn get(&self, _location: &Path) -> object_store::Result<GetResult> {
Err(object_store::Error::NotImplemented)
}

async fn get_range(
&self,
location: &Path,
range: Range<usize>,
) -> object_store::Result<Bytes> {
self.request_count.fetch_add(1, Ordering::SeqCst);
self.inner.get_range(location, range).await
}

async fn head(&self, _location: &Path) -> object_store::Result<ObjectMeta> {
Err(object_store::Error::NotImplemented)
}

async fn delete(&self, _location: &Path) -> object_store::Result<()> {
Err(object_store::Error::NotImplemented)
}

async fn list(
&self,
_prefix: Option<&Path>,
) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>>
{
Err(object_store::Error::NotImplemented)
}

async fn list_with_delimiter(
&self,
_prefix: Option<&Path>,
) -> object_store::Result<ListResult> {
Err(object_store::Error::NotImplemented)
}

async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
Err(object_store::Error::NotImplemented)
}

async fn copy_if_not_exists(
&self,
_from: &Path,
_to: &Path,
) -> object_store::Result<()> {
Err(object_store::Error::NotImplemented)
}
}

#[tokio::test]
async fn fetch_metadata_with_size_hint() -> Result<()> {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));

let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));

let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap();
let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();

let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
LocalFileSystem::new(),
)));
let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;

// Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch
// for the remaining metadata
fetch_parquet_metadata(store.as_ref() as &dyn ObjectStore, &meta[0], Some(9))
.await
.expect("error reading metadata with hint");

assert_eq!(store.request_count(), 2);

let format = ParquetFormat::default().with_metadata_size_hint(9);
let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap();

let stats =
fetch_statistics(store.upcast().as_ref(), schema.clone(), &meta[0], Some(9))
.await?;

assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
assert_eq!(c1_stats.null_count, Some(1));
assert_eq!(c2_stats.null_count, Some(3));

let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
LocalFileSystem::new(),
)));

// Use the file size as the hint so we can get the full metadata from the first fetch
let size_hint = meta[0].size;

fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint))
.await
.expect("error reading metadata with hint");

// ensure the requests were coalesced into a single request
assert_eq!(store.request_count(), 1);

let format = ParquetFormat::default().with_metadata_size_hint(size_hint);
let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap();
let stats = fetch_statistics(
store.upcast().as_ref(),
schema.clone(),
&meta[0],
Some(size_hint),
)
.await?;

assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
assert_eq!(c1_stats.null_count, Some(1));
assert_eq!(c2_stats.null_count, Some(3));

let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
LocalFileSystem::new(),
)));

// Use the a size hint larger than the file size to make sure we don't panic
let size_hint = meta[0].size + 100;

fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint))
.await
.expect("error reading metadata with hint");

assert_eq!(store.request_count(), 1);

Ok(())
}

#[tokio::test]
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ mod tests {
table_partition_cols: vec![],
},
None,
None,
))
}

Expand Down
Loading

0 comments on commit 834924f

Please sign in to comment.