Skip to content

Commit

Permalink
Add parquet predicate pushdown tests with smaller pages (#4131)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Nov 8, 2022
1 parent 47f0e5a commit 175adbd
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 58 deletions.
36 changes: 19 additions & 17 deletions benchmarks/src/bin/parquet_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use datafusion::logical_expr::{lit, or, Expr};
use datafusion::optimizer::utils::disjunction;
use datafusion::physical_plan::collect;
use datafusion::prelude::{col, SessionConfig, SessionContext};
use parquet::file::properties::WriterProperties;
use parquet_test_utils::{ParquetScanOptions, TestParquetFile};
use std::path::PathBuf;
use std::time::Instant;
Expand Down Expand Up @@ -73,7 +74,19 @@ async fn main() -> Result<()> {

let path = opt.path.join("logs.parquet");

let test_file = gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;
let mut props_builder = WriterProperties::builder();

if let Some(s) = opt.page_size {
props_builder = props_builder
.set_data_pagesize_limit(s)
.set_write_batch_size(s);
}

if let Some(s) = opt.row_group_size {
props_builder = props_builder.set_max_row_group_size(s);
}

let test_file = gen_data(path, opt.scale_factor, props_builder.build())?;

run_benchmarks(&mut ctx, &test_file, opt.iterations, opt.debug).await?;

Expand Down Expand Up @@ -137,14 +150,9 @@ async fn run_benchmarks(
println!("Using scan options {:?}", scan_options);
for i in 0..iterations {
let start = Instant::now();
let rows = exec_scan(
ctx,
test_file,
filter_expr.clone(),
scan_options.clone(),
debug,
)
.await?;
let rows =
exec_scan(ctx, test_file, filter_expr.clone(), *scan_options, debug)
.await?;
println!(
"Iteration {} returned {} rows in {} ms",
i,
Expand Down Expand Up @@ -179,17 +187,11 @@ async fn exec_scan(
fn gen_data(
path: PathBuf,
scale_factor: f32,
page_size: Option<usize>,
row_group_size: Option<usize>,
props: WriterProperties,
) -> Result<TestParquetFile> {
let generator = AccessLogGenerator::new();

let num_batches = 100_f32 * scale_factor;

TestParquetFile::try_new(
path,
generator.take(num_batches as usize),
page_size,
row_group_size,
)
TestParquetFile::try_new(path, props, generator.take(num_batches as usize))
}
7 changes: 7 additions & 0 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::{binary_expr, cast, try_cast, ExprSchemable};
use datafusion_physical_expr::create_physical_expr;
use log::trace;

/// Interface to pass statistics information to [`PruningPredicate`]
///
Expand Down Expand Up @@ -415,6 +416,12 @@ fn build_statistics_record_batch<S: PruningStatistics>(
let mut options = RecordBatchOptions::default();
options.row_count = Some(statistics.num_containers());

trace!(
"Creating statistics batch for {:#?} with {:#?}",
required_columns,
arrays
);

RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
DataFusionError::Plan(format!("Can not create statistics record batch: {}", err))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
use datafusion_common::{Column, DataFusionError, Result};
use datafusion_expr::utils::expr_to_columns;
use datafusion_optimizer::utils::split_conjunction;
use log::{debug, error};
use log::{debug, error, trace};
use parquet::{
arrow::arrow_reader::{RowSelection, RowSelector},
errors::ParquetError,
Expand Down Expand Up @@ -143,15 +143,19 @@ pub(crate) fn build_page_filter(
}),
);
} else {
trace!(
"Did not have enough metadata to prune with page indexes, falling back, falling back to all rows",
);
// fallback select all rows
let all_selected =
vec![RowSelector::select(groups[*r].num_rows() as usize)];
selectors.push(all_selected);
}
}
debug!(
"Use filter and page index create RowSelection {:?} from predicate:{:?}",
&selectors, predicate
"Use filter and page index create RowSelection {:?} from predicate: {:?}",
&selectors,
predicate.predicate_expr(),
);
row_selections.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
}
Expand Down Expand Up @@ -321,7 +325,7 @@ fn prune_pages_in_one_row_group(
assert_eq!(row_vec.len(), values.len());
let mut sum_row = *row_vec.first().unwrap();
let mut selected = *values.first().unwrap();

trace!("Pruned to to {:?} using {:?}", values, pruning_stats);
for (i, &f) in values.iter().skip(1).enumerate() {
if f == selected {
sum_row += *row_vec.get(i).unwrap();
Expand Down Expand Up @@ -376,6 +380,7 @@ fn create_row_count_in_each_page(

/// Wraps one col page_index in one rowGroup statistics in a way
/// that implements [`PruningStatistics`]
#[derive(Debug)]
struct PagesPruningStatistics<'a> {
col_page_indexes: &'a Index,
col_offset_indexes: &'a Vec<PageLocation>,
Expand Down
Loading

0 comments on commit 175adbd

Please sign in to comment.