From 175adbda616ca2f339cfba375c1f86535f6e2ad1 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 8 Nov 2022 06:12:09 -0500 Subject: [PATCH] Add parquet predicate pushdown tests with smaller pages (#4131) --- benchmarks/src/bin/parquet_filter_pushdown.rs | 36 ++-- .../core/src/physical_optimizer/pruning.rs | 7 + .../file_format/parquet/page_filter.rs | 13 +- .../core/tests/parquet_filter_pushdown.rs | 158 ++++++++++++++++-- parquet-test-utils/src/lib.rs | 26 +-- 5 files changed, 182 insertions(+), 58 deletions(-) diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs index 4ec2dc90c37b..a3ff7bee529e 100644 --- a/benchmarks/src/bin/parquet_filter_pushdown.rs +++ b/benchmarks/src/bin/parquet_filter_pushdown.rs @@ -21,6 +21,7 @@ use datafusion::logical_expr::{lit, or, Expr}; use datafusion::optimizer::utils::disjunction; use datafusion::physical_plan::collect; use datafusion::prelude::{col, SessionConfig, SessionContext}; +use parquet::file::properties::WriterProperties; use parquet_test_utils::{ParquetScanOptions, TestParquetFile}; use std::path::PathBuf; use std::time::Instant; @@ -73,7 +74,19 @@ async fn main() -> Result<()> { let path = opt.path.join("logs.parquet"); - let test_file = gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?; + let mut props_builder = WriterProperties::builder(); + + if let Some(s) = opt.page_size { + props_builder = props_builder + .set_data_pagesize_limit(s) + .set_write_batch_size(s); + } + + if let Some(s) = opt.row_group_size { + props_builder = props_builder.set_max_row_group_size(s); + } + + let test_file = gen_data(path, opt.scale_factor, props_builder.build())?; run_benchmarks(&mut ctx, &test_file, opt.iterations, opt.debug).await?; @@ -137,14 +150,9 @@ async fn run_benchmarks( println!("Using scan options {:?}", scan_options); for i in 0..iterations { let start = Instant::now(); - let rows = exec_scan( - ctx, - test_file, - filter_expr.clone(), - scan_options.clone(), - debug, - ) - .await?; + let rows = + exec_scan(ctx, test_file, filter_expr.clone(), *scan_options, debug) + .await?; println!( "Iteration {} returned {} rows in {} ms", i, @@ -179,17 +187,11 @@ async fn exec_scan( fn gen_data( path: PathBuf, scale_factor: f32, - page_size: Option, - row_group_size: Option, + props: WriterProperties, ) -> Result { let generator = AccessLogGenerator::new(); let num_batches = 100_f32 * scale_factor; - TestParquetFile::try_new( - path, - generator.take(num_batches as usize), - page_size, - row_group_size, - ) + TestParquetFile::try_new(path, props, generator.take(num_batches as usize)) } diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 3adf0ad7bd5f..a47815377609 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -51,6 +51,7 @@ use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter}; use datafusion_expr::utils::expr_to_columns; use datafusion_expr::{binary_expr, cast, try_cast, ExprSchemable}; use datafusion_physical_expr::create_physical_expr; +use log::trace; /// Interface to pass statistics information to [`PruningPredicate`] /// @@ -415,6 +416,12 @@ fn build_statistics_record_batch( let mut options = RecordBatchOptions::default(); options.row_count = Some(statistics.num_containers()); + trace!( + "Creating statistics batch for {:#?} with {:#?}", + required_columns, + arrays + ); + RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| { DataFusionError::Plan(format!("Can not create statistics record batch: {}", err)) }) diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs index 828d213758fd..95c93151af02 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -22,7 +22,7 @@ use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError}; use datafusion_common::{Column, DataFusionError, Result}; use datafusion_expr::utils::expr_to_columns; use datafusion_optimizer::utils::split_conjunction; -use log::{debug, error}; +use log::{debug, error, trace}; use parquet::{ arrow::arrow_reader::{RowSelection, RowSelector}, errors::ParquetError, @@ -143,6 +143,9 @@ pub(crate) fn build_page_filter( }), ); } else { + trace!( + "Did not have enough metadata to prune with page indexes, falling back, falling back to all rows", + ); // fallback select all rows let all_selected = vec![RowSelector::select(groups[*r].num_rows() as usize)]; @@ -150,8 +153,9 @@ pub(crate) fn build_page_filter( } } debug!( - "Use filter and page index create RowSelection {:?} from predicate:{:?}", - &selectors, predicate + "Use filter and page index create RowSelection {:?} from predicate: {:?}", + &selectors, + predicate.predicate_expr(), ); row_selections.push_back(selectors.into_iter().flatten().collect::>()); } @@ -321,7 +325,7 @@ fn prune_pages_in_one_row_group( assert_eq!(row_vec.len(), values.len()); let mut sum_row = *row_vec.first().unwrap(); let mut selected = *values.first().unwrap(); - + trace!("Pruned to to {:?} using {:?}", values, pruning_stats); for (i, &f) in values.iter().skip(1).enumerate() { if f == selected { sum_row += *row_vec.get(i).unwrap(); @@ -376,6 +380,7 @@ fn create_row_count_in_each_page( /// Wraps one col page_index in one rowGroup statistics in a way /// that implements [`PruningStatistics`] +#[derive(Debug)] struct PagesPruningStatistics<'a> { col_page_indexes: &'a Index, col_offset_indexes: &'a Vec, diff --git a/datafusion/core/tests/parquet_filter_pushdown.rs b/datafusion/core/tests/parquet_filter_pushdown.rs index 2c0d75cc20be..54b7d8d169e4 100644 --- a/datafusion/core/tests/parquet_filter_pushdown.rs +++ b/datafusion/core/tests/parquet_filter_pushdown.rs @@ -30,12 +30,12 @@ use std::time::Instant; use arrow::compute::concat_batches; use arrow::record_batch::RecordBatch; -use datafusion::logical_expr::{lit, Expr}; use datafusion::physical_plan::collect; use datafusion::physical_plan::metrics::MetricsSet; -use datafusion::prelude::{col, SessionContext}; +use datafusion::prelude::{col, lit, lit_timestamp_nano, Expr, SessionContext}; use datafusion_optimizer::utils::{conjunction, disjunction, split_conjunction}; use itertools::Itertools; +use parquet::file::properties::WriterProperties; use parquet_test_utils::{ParquetScanOptions, TestParquetFile}; use tempfile::TempDir; use test_utils::AccessLogGenerator; @@ -43,24 +43,29 @@ use test_utils::AccessLogGenerator; /// how many rows of generated data to write to our parquet file (arbitrary) const NUM_ROWS: usize = 53819; +#[cfg(test)] +#[ctor::ctor] +fn init() { + // enable logging so RUST_LOG works + let _ = env_logger::try_init(); +} + #[cfg(not(target_family = "windows"))] #[tokio::test] async fn single_file() { // Only create the parquet file once as it is fairly large + let tempdir = TempDir::new().unwrap(); let generator = AccessLogGenerator::new().with_row_limit(Some(NUM_ROWS)); - // TODO: set the max page rows with some various / arbitrary sizes 8311 - // (using https://github.com/apache/arrow-rs/issues/2941) to ensure we get multiple pages - let page_size = None; - let row_group_size = None; + // default properties + let props = WriterProperties::builder().build(); let file = tempdir.path().join("data.parquet"); let start = Instant::now(); println!("Writing test data to {:?}", file); - let test_parquet_file = - TestParquetFile::try_new(file, generator, page_size, row_group_size).unwrap(); + let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap(); println!( "Completed generating test data in {:?}", Instant::now() - start @@ -225,6 +230,77 @@ async fn single_file() { .run() .await; } + +#[cfg(not(target_family = "windows"))] +#[tokio::test] +async fn single_file_small_data_pages() { + let tempdir = TempDir::new().unwrap(); + + let generator = AccessLogGenerator::new().with_row_limit(Some(NUM_ROWS)); + + // set the max page rows with arbitrary sizes 8311 to increase + // effectiveness of page filtering + let props = WriterProperties::builder() + .set_data_page_row_count_limit(8311) + .build(); + let file = tempdir.path().join("data_8311.parquet"); + + let start = Instant::now(); + println!("Writing test data to {:?}", file); + let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap(); + println!( + "Completed generating test data in {:?}", + Instant::now() - start + ); + + // The statistics on the 'pod' column are as follows: + // + // parquet-tools dump -d ~/Downloads/data_8311.parquet + // + // ... + // pod TV=53819 RL=0 DL=0 DS: 8 DE:PLAIN + // --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + // page 0: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: aqcathnxqsphdhgjtgvxsfyiwbmhlmg, max: bvjjmytpfzdfsvlzfhbunasihjgxpesbmxv, num_nulls not defined] CRC:[none] SZ:7 VC:9216 + // page 1: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: bvjjmytpfzdfsvlzfhbunasihjgxpesbmxv, max: bxyubzxbbmhroqhrdzttngxcpwwgkpaoizvgzd, num_nulls not defined] CRC:[none] SZ:7 VC:9216 + // page 2: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: bxyubzxbbmhroqhrdzttngxcpwwgkpaoizvgzd, max: djzdyiecnumrsrcbizwlqzdhnpoiqdh, num_nulls not defined] CRC:[none] SZ:10 VC:9216 + // page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: djzdyiecnumrsrcbizwlqzdhnpoiqdh, max: fktdcgtmzvoedpwhfevcvvrtaurzgex, num_nulls not defined] CRC:[none] SZ:7 VC:9216 + // page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fktdcgtmzvoedpwhfevcvvrtaurzgex, max: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, num_nulls not defined] CRC:[none] SZ:7 VC:9216 + // page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, max: iadnalqpdzthpifrvewossmpqibgtsuin, num_nulls not defined] CRC:[none] SZ:7 VC:7739 + // + // This test currently fails due to https://github.com/apache/arrow-datafusion/issues/3833 + // (page index pruning not implemented for byte array) + + // TestCase::new(&test_parquet_file) + // .with_name("selective") + // // predicagte is chosen carefully to prune pages 0, 1, 2, 3, 4 + // // pod = 'iadnalqpdzthpifrvewossmpqibgtsuin' + // .with_filter(col("pod").eq(lit("iadnalqpdzthpifrvewossmpqibgtsuin"))) + // .with_pushdown_expected(PushdownExpected::Some) + // .with_page_index_filtering_expected(PageIndexFilteringExpected::Some) + // .with_expected_rows(2574) + // .run() + // .await; + + // time TV=53819 RL=0 DL=0 DS: 7092 DE:PLAIN + // -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + // page 0: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.004133888, num_nulls not defined] CRC:[none] SZ:13844 VC:9216 + // page 1: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.006397952, num_nulls not defined] CRC:[none] SZ:14996 VC:9216 + // page 2: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.005650432, num_nulls not defined] CRC:[none] SZ:14996 VC:9216 + // page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.004269056, num_nulls not defined] CRC:[none] SZ:14996 VC:9216 + // page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.007261184, num_nulls not defined] CRC:[none] SZ:14996 VC:9216 + // page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.005330944, num_nulls not defined] CRC:[none] SZ:12601 VC:7739 + TestCase::new(&test_parquet_file) + .with_name("selective") + // predicagte is chosen carefully to prune pages + // time > 1970-01-01T00:00:00.004300000 + .with_filter(col("time").gt(lit_timestamp_nano(4300000))) + .with_pushdown_expected(PushdownExpected::Some) + .with_page_index_filtering_expected(PageIndexFilteringExpected::Some) + .with_expected_rows(9745) + .run() + .await; +} + /// Expected pushdown behavior #[derive(Debug, Clone, Copy)] enum PushdownExpected { @@ -234,6 +310,15 @@ enum PushdownExpected { Some, } +/// Expected pushdown behavior +#[derive(Debug, Clone, Copy)] +enum PageIndexFilteringExpected { + /// How many pages were expected to be pruned + None, + /// Expected that more than 0 were pruned + Some, +} + /// parameters for running a test struct TestCase<'a> { test_parquet_file: &'a TestParquetFile, @@ -243,6 +328,10 @@ struct TestCase<'a> { filter: Expr, /// Did we expect the pushdown filtering to have filtered any rows? pushdown_expected: PushdownExpected, + + /// Did we expect page filtering to filter out pages + page_index_filtering_expected: PageIndexFilteringExpected, + /// How many rows are expected to pass the predicate overall? expected_rows: usize, } @@ -255,6 +344,7 @@ impl<'a> TestCase<'a> { // default to a filter that passes everything filter: lit(true), pushdown_expected: PushdownExpected::None, + page_index_filtering_expected: PageIndexFilteringExpected::None, expected_rows: 0, } } @@ -271,8 +361,17 @@ impl<'a> TestCase<'a> { } /// Set the expected predicate pushdown - fn with_pushdown_expected(mut self, pushdown_expected: PushdownExpected) -> Self { - self.pushdown_expected = pushdown_expected; + fn with_pushdown_expected(mut self, v: PushdownExpected) -> Self { + self.pushdown_expected = v; + self + } + + /// Set the expected page filtering + fn with_page_index_filtering_expected( + mut self, + v: PageIndexFilteringExpected, + ) -> Self { + self.page_index_filtering_expected = v; self } @@ -307,8 +406,6 @@ impl<'a> TestCase<'a> { reorder_filters: false, enable_page_index: false, }, - // always expect no pushdown - PushdownExpected::None, filter, ) .await; @@ -320,7 +417,6 @@ impl<'a> TestCase<'a> { reorder_filters: false, enable_page_index: false, }, - self.pushdown_expected, filter, ) .await; @@ -334,7 +430,6 @@ impl<'a> TestCase<'a> { reorder_filters: true, enable_page_index: false, }, - self.pushdown_expected, filter, ) .await; @@ -348,8 +443,6 @@ impl<'a> TestCase<'a> { reorder_filters: false, enable_page_index: true, }, - // pushdown is off so no pushdown is expected - PushdownExpected::None, filter, ) .await; @@ -362,7 +455,6 @@ impl<'a> TestCase<'a> { reorder_filters: true, enable_page_index: true, }, - self.pushdown_expected, filter, ) .await; @@ -374,7 +466,6 @@ impl<'a> TestCase<'a> { async fn read_with_options( &self, scan_options: ParquetScanOptions, - pushdown_expected: PushdownExpected, filter: &Expr, ) -> RecordBatch { println!(" scan options: {scan_options:?}"); @@ -404,6 +495,14 @@ impl<'a> TestCase<'a> { // verify expected pushdown let metrics = TestParquetFile::parquet_metrics(exec).expect("found parquet metrics"); + + let pushdown_expected = if scan_options.pushdown_filters { + self.pushdown_expected + } else { + // if filter pushdown is not enabled we don't expect it to filter rows + PushdownExpected::None + }; + let pushdown_rows_filtered = get_value(&metrics, "pushdown_rows_filtered"); println!(" pushdown_rows_filtered: {}", pushdown_rows_filtered); @@ -419,6 +518,29 @@ impl<'a> TestCase<'a> { } }; + let page_index_rows_filtered = get_value(&metrics, "page_index_rows_filtered"); + println!(" page_index_rows_filtered: {}", page_index_rows_filtered); + + let page_index_filtering_expected = if scan_options.enable_page_index { + self.page_index_filtering_expected + } else { + // if page index filtering is not enabled, don't expect it + // to filter rows + PageIndexFilteringExpected::None + }; + + match page_index_filtering_expected { + PageIndexFilteringExpected::None => { + assert_eq!(page_index_rows_filtered, 0); + } + PageIndexFilteringExpected::Some => { + assert!( + page_index_rows_filtered > 0, + "Expected to filter rows via page index but none were", + ); + } + }; + batch } } diff --git a/parquet-test-utils/src/lib.rs b/parquet-test-utils/src/lib.rs index 6a3e6e0ec1c2..41066584e988 100644 --- a/parquet-test-utils/src/lib.rs +++ b/parquet-test-utils/src/lib.rs @@ -51,7 +51,7 @@ pub struct TestParquetFile { object_meta: ObjectMeta, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub struct ParquetScanOptions { pub pushdown_filters: bool, pub reorder_filters: bool, @@ -59,34 +59,20 @@ pub struct ParquetScanOptions { } impl TestParquetFile { - /// Creates a new parquet file at the specified location + /// Creates a new parquet file at the specified location with the + /// given properties pub fn try_new( path: PathBuf, + props: WriterProperties, batches: impl IntoIterator, - page_size: Option, - row_group_size: Option, ) -> Result { let file = File::create(&path).unwrap(); - let mut props_builder = WriterProperties::builder(); - - if let Some(s) = page_size { - props_builder = props_builder - .set_data_pagesize_limit(s) - .set_write_batch_size(s); - } - - if let Some(s) = row_group_size { - props_builder = props_builder.set_max_row_group_size(s); - } - let mut batches = batches.into_iter(); let first_batch = batches.next().expect("need at least one record batch"); let schema = first_batch.schema(); - let mut writer = - ArrowWriter::try_new(file, schema.clone(), Some(props_builder.build())) - .unwrap(); + let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)).unwrap(); writer.write(&first_batch).unwrap(); writer.flush()?; @@ -122,7 +108,9 @@ impl TestParquetFile { object_meta, }) } +} +impl TestParquetFile { /// return a `ParquetExec` and `FilterExec` with the specified options to scan this parquet file. /// /// This returns the same plan that DataFusion will make with a pushed down predicate followed by a filter: