diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs index 4ec2dc90c37b..a3ff7bee529e 100644 --- a/benchmarks/src/bin/parquet_filter_pushdown.rs +++ b/benchmarks/src/bin/parquet_filter_pushdown.rs @@ -21,6 +21,7 @@ use datafusion::logical_expr::{lit, or, Expr}; use datafusion::optimizer::utils::disjunction; use datafusion::physical_plan::collect; use datafusion::prelude::{col, SessionConfig, SessionContext}; +use parquet::file::properties::WriterProperties; use parquet_test_utils::{ParquetScanOptions, TestParquetFile}; use std::path::PathBuf; use std::time::Instant; @@ -73,7 +74,19 @@ async fn main() -> Result<()> { let path = opt.path.join("logs.parquet"); - let test_file = gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?; + let mut props_builder = WriterProperties::builder(); + + if let Some(s) = opt.page_size { + props_builder = props_builder + .set_data_pagesize_limit(s) + .set_write_batch_size(s); + } + + if let Some(s) = opt.row_group_size { + props_builder = props_builder.set_max_row_group_size(s); + } + + let test_file = gen_data(path, opt.scale_factor, props_builder.build())?; run_benchmarks(&mut ctx, &test_file, opt.iterations, opt.debug).await?; @@ -137,14 +150,9 @@ async fn run_benchmarks( println!("Using scan options {:?}", scan_options); for i in 0..iterations { let start = Instant::now(); - let rows = exec_scan( - ctx, - test_file, - filter_expr.clone(), - scan_options.clone(), - debug, - ) - .await?; + let rows = + exec_scan(ctx, test_file, filter_expr.clone(), *scan_options, debug) + .await?; println!( "Iteration {} returned {} rows in {} ms", i, @@ -179,17 +187,11 @@ async fn exec_scan( fn gen_data( path: PathBuf, scale_factor: f32, - page_size: Option, - row_group_size: Option, + props: WriterProperties, ) -> Result { let generator = AccessLogGenerator::new(); let num_batches = 100_f32 * scale_factor; - TestParquetFile::try_new( - path, - generator.take(num_batches as usize), - page_size, - row_group_size, - ) + TestParquetFile::try_new(path, props, generator.take(num_batches as usize)) } diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 3adf0ad7bd5f..a47815377609 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -51,6 +51,7 @@ use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter}; use datafusion_expr::utils::expr_to_columns; use datafusion_expr::{binary_expr, cast, try_cast, ExprSchemable}; use datafusion_physical_expr::create_physical_expr; +use log::trace; /// Interface to pass statistics information to [`PruningPredicate`] /// @@ -415,6 +416,12 @@ fn build_statistics_record_batch( let mut options = RecordBatchOptions::default(); options.row_count = Some(statistics.num_containers()); + trace!( + "Creating statistics batch for {:#?} with {:#?}", + required_columns, + arrays + ); + RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| { DataFusionError::Plan(format!("Can not create statistics record batch: {}", err)) }) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 61d2e5badb6b..b41ca46d05d1 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -643,7 +643,7 @@ struct RowGroupPruningStatistics<'a> { // Convert the bytes array to i128. // The endian of the input bytes array must be big-endian. // Copy from the arrow-rs -fn from_bytes_to_i128(b: &[u8]) -> i128 { +pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 { assert!(b.len() <= 16, "Decimal128Array supports only up to size 16"); let first_bit = b[0] & 128u8 == 128u8; let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] }; @@ -773,7 +773,9 @@ macro_rules! get_null_count_values { // Convert parquet column schema to arrow data type, and just consider the // decimal data type. -fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option { +pub(crate) fn parquet_to_arrow_decimal_type( + parquet_column: &ColumnDescriptor, +) -> Option { let type_ptr = parquet_column.self_type_ptr(); match type_ptr.get_basic_info().logical_type() { Some(LogicalType::Decimal { scale, precision }) => { diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs index 828d213758fd..3867c41edd34 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -17,12 +17,17 @@ //! Contains code to filter entire pages -use arrow::array::{BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array}; +use arrow::array::{ + BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array, Int64Array, + StringArray, +}; +use arrow::datatypes::DataType; use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError}; use datafusion_common::{Column, DataFusionError, Result}; use datafusion_expr::utils::expr_to_columns; use datafusion_optimizer::utils::split_conjunction; -use log::{debug, error}; +use log::{debug, error, trace}; +use parquet::schema::types::ColumnDescriptor; use parquet::{ arrow::arrow_reader::{RowSelection, RowSelector}, errors::ParquetError, @@ -36,6 +41,9 @@ use std::collections::{HashSet, VecDeque}; use std::sync::Arc; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use crate::physical_plan::file_format::parquet::{ + from_bytes_to_i128, parquet_to_arrow_decimal_type, +}; use super::metrics::ParquetFileMetrics; @@ -133,6 +141,7 @@ pub(crate) fn build_page_filter( &predicate, rg_offset_indexes.get(col_id), rg_page_indexes.get(col_id), + groups[*r].column(col_id).column_descr(), file_metrics, ) .map_err(|e| { @@ -143,6 +152,9 @@ pub(crate) fn build_page_filter( }), ); } else { + trace!( + "Did not have enough metadata to prune with page indexes, falling back, falling back to all rows", + ); // fallback select all rows let all_selected = vec![RowSelector::select(groups[*r].num_rows() as usize)]; @@ -150,8 +162,9 @@ pub(crate) fn build_page_filter( } } debug!( - "Use filter and page index create RowSelection {:?} from predicate:{:?}", - &selectors, predicate + "Use filter and page index create RowSelection {:?} from predicate: {:?}", + &selectors, + predicate.predicate_expr(), ); row_selections.push_back(selectors.into_iter().flatten().collect::>()); } @@ -303,15 +316,18 @@ fn prune_pages_in_one_row_group( predicate: &PruningPredicate, col_offset_indexes: Option<&Vec>, col_page_indexes: Option<&Index>, + col_desc: &ColumnDescriptor, metrics: &ParquetFileMetrics, ) -> Result> { let num_rows = group.num_rows() as usize; if let (Some(col_offset_indexes), Some(col_page_indexes)) = (col_offset_indexes, col_page_indexes) { + let target_type = parquet_to_arrow_decimal_type(col_desc); let pruning_stats = PagesPruningStatistics { col_page_indexes, col_offset_indexes, + target_type: &target_type, }; match predicate.prune(&pruning_stats) { @@ -321,7 +337,7 @@ fn prune_pages_in_one_row_group( assert_eq!(row_vec.len(), values.len()); let mut sum_row = *row_vec.first().unwrap(); let mut selected = *values.first().unwrap(); - + trace!("Pruned to to {:?} using {:?}", values, pruning_stats); for (i, &f) in values.iter().skip(1).enumerate() { if f == selected { sum_row += *row_vec.get(i).unwrap(); @@ -376,9 +392,13 @@ fn create_row_count_in_each_page( /// Wraps one col page_index in one rowGroup statistics in a way /// that implements [`PruningStatistics`] +#[derive(Debug)] struct PagesPruningStatistics<'a> { col_page_indexes: &'a Index, col_offset_indexes: &'a Vec, + // target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the + // real physical type in parquet file may be `INT32, INT64, FIXED_LEN_BYTE_ARRAY` + target_type: &'a Option, } // Extract the min or max value calling `func` from page idex @@ -387,16 +407,50 @@ macro_rules! get_min_max_values_for_page_index { match $self.col_page_indexes { Index::NONE => None, Index::INT32(index) => { - let vec = &index.indexes; - Some(Arc::new(Int32Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) + match $self.target_type { + // int32 to decimal with the precision and scale + Some(DataType::Decimal128(precision, scale)) => { + let vec = &index.indexes; + if let Ok(arr) = Decimal128Array::from_iter_values( + vec.iter().map(|x| *x.$func().unwrap() as i128), + ) + .with_precision_and_scale(*precision, *scale) + { + return Some(Arc::new(arr)); + } else { + return None; + } + } + _ => { + let vec = &index.indexes; + Some(Arc::new(Int32Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + } } Index::INT64(index) => { - let vec = &index.indexes; - Some(Arc::new(Int64Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) + match $self.target_type { + // int64 to decimal with the precision and scale + Some(DataType::Decimal128(precision, scale)) => { + let vec = &index.indexes; + if let Ok(arr) = Decimal128Array::from_iter_values( + vec.iter().map(|x| *x.$func().unwrap() as i128), + ) + .with_precision_and_scale(*precision, *scale) + { + return Some(Arc::new(arr)); + } else { + return None; + } + } + _ => { + let vec = &index.indexes; + Some(Arc::new(Int64Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + } } Index::FLOAT(index) => { let vec = &index.indexes; @@ -416,10 +470,37 @@ macro_rules! get_min_max_values_for_page_index { vec.iter().map(|x| x.$func().cloned()), ))) } - Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => { + Index::BYTE_ARRAY(index) => { + let vec = &index.indexes; + let array: StringArray = vec + .iter() + .map(|x| x.$func()) + .map(|x| x.and_then(|x| std::str::from_utf8(x).ok())) + .collect(); + Some(Arc::new(array)) + } + Index::INT96(_) => { //Todo support these type None } + Index::FIXED_LEN_BYTE_ARRAY(index) => { + match $self.target_type { + // int32 to decimal with the precision and scale + Some(DataType::Decimal128(precision, scale)) => { + let vec = &index.indexes; + if let Ok(array) = Decimal128Array::from_iter_values( + vec.iter().map(|x| from_bytes_to_i128(x.$func().unwrap())), + ) + .with_precision_and_scale(*precision, *scale) + { + return Some(Arc::new(array)); + } else { + return None; + } + } + _ => None, + } + } } }}; } diff --git a/datafusion/core/tests/parquet_filter_pushdown.rs b/datafusion/core/tests/parquet_filter_pushdown.rs index 2c0d75cc20be..4aa7725cbf24 100644 --- a/datafusion/core/tests/parquet_filter_pushdown.rs +++ b/datafusion/core/tests/parquet_filter_pushdown.rs @@ -30,12 +30,12 @@ use std::time::Instant; use arrow::compute::concat_batches; use arrow::record_batch::RecordBatch; -use datafusion::logical_expr::{lit, Expr}; use datafusion::physical_plan::collect; use datafusion::physical_plan::metrics::MetricsSet; -use datafusion::prelude::{col, SessionContext}; +use datafusion::prelude::{col, lit, lit_timestamp_nano, Expr, SessionContext}; use datafusion_optimizer::utils::{conjunction, disjunction, split_conjunction}; use itertools::Itertools; +use parquet::file::properties::WriterProperties; use parquet_test_utils::{ParquetScanOptions, TestParquetFile}; use tempfile::TempDir; use test_utils::AccessLogGenerator; @@ -43,24 +43,29 @@ use test_utils::AccessLogGenerator; /// how many rows of generated data to write to our parquet file (arbitrary) const NUM_ROWS: usize = 53819; +#[cfg(test)] +#[ctor::ctor] +fn init() { + // enable logging so RUST_LOG works + let _ = env_logger::try_init(); +} + #[cfg(not(target_family = "windows"))] #[tokio::test] async fn single_file() { // Only create the parquet file once as it is fairly large + let tempdir = TempDir::new().unwrap(); let generator = AccessLogGenerator::new().with_row_limit(Some(NUM_ROWS)); - // TODO: set the max page rows with some various / arbitrary sizes 8311 - // (using https://github.com/apache/arrow-rs/issues/2941) to ensure we get multiple pages - let page_size = None; - let row_group_size = None; + // default properties + let props = WriterProperties::builder().build(); let file = tempdir.path().join("data.parquet"); let start = Instant::now(); println!("Writing test data to {:?}", file); - let test_parquet_file = - TestParquetFile::try_new(file, generator, page_size, row_group_size).unwrap(); + let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap(); println!( "Completed generating test data in {:?}", Instant::now() - start @@ -225,6 +230,102 @@ async fn single_file() { .run() .await; } + +#[cfg(not(target_family = "windows"))] +#[tokio::test] +async fn single_file_small_data_pages() { + let tempdir = TempDir::new().unwrap(); + + let generator = AccessLogGenerator::new().with_row_limit(Some(NUM_ROWS)); + + // set the max page rows with arbitrary sizes 8311 to increase + // effectiveness of page filtering + let props = WriterProperties::builder() + .set_data_page_row_count_limit(8311) + .build(); + let file = tempdir.path().join("data_8311.parquet"); + + let start = Instant::now(); + println!("Writing test data to {:?}", file); + let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap(); + println!( + "Completed generating test data in {:?}", + Instant::now() - start + ); + + // The statistics on the 'pod' column are as follows: + // + // parquet-tools dump -d ~/Downloads/data_8311.parquet + // + // ... + // pod TV=53819 RL=0 DL=0 DS: 8 DE:PLAIN + // --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + // page 0: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: aqcathnxqsphdhgjtgvxsfyiwbmhlmg, max: bvjjmytpfzdfsvlzfhbunasihjgxpesbmxv, num_nulls not defined] CRC:[none] SZ:7 VC:9216 + // page 1: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: bvjjmytpfzdfsvlzfhbunasihjgxpesbmxv, max: bxyubzxbbmhroqhrdzttngxcpwwgkpaoizvgzd, num_nulls not defined] CRC:[none] SZ:7 VC:9216 + // page 2: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: bxyubzxbbmhroqhrdzttngxcpwwgkpaoizvgzd, max: djzdyiecnumrsrcbizwlqzdhnpoiqdh, num_nulls not defined] CRC:[none] SZ:10 VC:9216 + // page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: djzdyiecnumrsrcbizwlqzdhnpoiqdh, max: fktdcgtmzvoedpwhfevcvvrtaurzgex, num_nulls not defined] CRC:[none] SZ:7 VC:9216 + // page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fktdcgtmzvoedpwhfevcvvrtaurzgex, max: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, num_nulls not defined] CRC:[none] SZ:7 VC:9216 + // page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, max: iadnalqpdzthpifrvewossmpqibgtsuin, num_nulls not defined] CRC:[none] SZ:7 VC:7739 + + TestCase::new(&test_parquet_file) + .with_name("selective") + // predicate is chosen carefully to prune pages 0, 1, 2, 3, 4 + // pod = 'iadnalqpdzthpifrvewossmpqibgtsuin' + .with_filter(col("pod").eq(lit("iadnalqpdzthpifrvewossmpqibgtsuin"))) + .with_pushdown_expected(PushdownExpected::Some) + .with_page_index_filtering_expected(PageIndexFilteringExpected::Some) + .with_expected_rows(2574) + .run() + .await; + + // time TV=53819 RL=0 DL=0 DS: 7092 DE:PLAIN + // -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + // page 0: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.004133888, num_nulls not defined] CRC:[none] SZ:13844 VC:9216 + // page 1: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.006397952, num_nulls not defined] CRC:[none] SZ:14996 VC:9216 + // page 2: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.005650432, num_nulls not defined] CRC:[none] SZ:14996 VC:9216 + // page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.004269056, num_nulls not defined] CRC:[none] SZ:14996 VC:9216 + // page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.007261184, num_nulls not defined] CRC:[none] SZ:14996 VC:9216 + // page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.005330944, num_nulls not defined] CRC:[none] SZ:12601 VC:7739 + TestCase::new(&test_parquet_file) + .with_name("selective") + // predicagte is chosen carefully to prune pages + // time > 1970-01-01T00:00:00.004300000 + .with_filter(col("time").gt(lit_timestamp_nano(4300000))) + .with_pushdown_expected(PushdownExpected::Some) + .with_page_index_filtering_expected(PageIndexFilteringExpected::Some) + .with_expected_rows(9745) + .run() + .await; + + // decimal_price TV=53819 RL=0 DL=0 + // ---------------------------------------------------------------------------- + // row group 0: + // column index for column decimal_price: + // Boudary order: UNORDERED + // null count min max + // page-0 0 1 9216 + // page-1 0 9217 18432 + // page-2 0 18433 27648 + // page-3 0 27649 36864 + // page-4 0 36865 46080 + // page-5 0 46081 53819 + // + // offset index for column decimal_price: + // offset compressed size first row index + // page-0 5581636 147517 0 + // page-1 5729153 147517 9216 + TestCase::new(&test_parquet_file) + .with_name("selective_on_decimal") + // predicate is chosen carefully to prune pages 1, 2, 3, 4, and 5 + // decimal_price < 9200 + .with_filter(col("decimal_price").lt_eq(lit(9200))) + .with_pushdown_expected(PushdownExpected::Some) + .with_page_index_filtering_expected(PageIndexFilteringExpected::Some) + .with_expected_rows(9200) + .run() + .await; +} + /// Expected pushdown behavior #[derive(Debug, Clone, Copy)] enum PushdownExpected { @@ -234,6 +335,15 @@ enum PushdownExpected { Some, } +/// Expected pushdown behavior +#[derive(Debug, Clone, Copy)] +enum PageIndexFilteringExpected { + /// How many pages were expected to be pruned + None, + /// Expected that more than 0 were pruned + Some, +} + /// parameters for running a test struct TestCase<'a> { test_parquet_file: &'a TestParquetFile, @@ -243,6 +353,10 @@ struct TestCase<'a> { filter: Expr, /// Did we expect the pushdown filtering to have filtered any rows? pushdown_expected: PushdownExpected, + + /// Did we expect page filtering to filter out pages + page_index_filtering_expected: PageIndexFilteringExpected, + /// How many rows are expected to pass the predicate overall? expected_rows: usize, } @@ -255,6 +369,7 @@ impl<'a> TestCase<'a> { // default to a filter that passes everything filter: lit(true), pushdown_expected: PushdownExpected::None, + page_index_filtering_expected: PageIndexFilteringExpected::None, expected_rows: 0, } } @@ -271,8 +386,17 @@ impl<'a> TestCase<'a> { } /// Set the expected predicate pushdown - fn with_pushdown_expected(mut self, pushdown_expected: PushdownExpected) -> Self { - self.pushdown_expected = pushdown_expected; + fn with_pushdown_expected(mut self, v: PushdownExpected) -> Self { + self.pushdown_expected = v; + self + } + + /// Set the expected page filtering + fn with_page_index_filtering_expected( + mut self, + v: PageIndexFilteringExpected, + ) -> Self { + self.page_index_filtering_expected = v; self } @@ -307,8 +431,6 @@ impl<'a> TestCase<'a> { reorder_filters: false, enable_page_index: false, }, - // always expect no pushdown - PushdownExpected::None, filter, ) .await; @@ -320,7 +442,6 @@ impl<'a> TestCase<'a> { reorder_filters: false, enable_page_index: false, }, - self.pushdown_expected, filter, ) .await; @@ -334,7 +455,6 @@ impl<'a> TestCase<'a> { reorder_filters: true, enable_page_index: false, }, - self.pushdown_expected, filter, ) .await; @@ -348,8 +468,6 @@ impl<'a> TestCase<'a> { reorder_filters: false, enable_page_index: true, }, - // pushdown is off so no pushdown is expected - PushdownExpected::None, filter, ) .await; @@ -362,7 +480,6 @@ impl<'a> TestCase<'a> { reorder_filters: true, enable_page_index: true, }, - self.pushdown_expected, filter, ) .await; @@ -374,7 +491,6 @@ impl<'a> TestCase<'a> { async fn read_with_options( &self, scan_options: ParquetScanOptions, - pushdown_expected: PushdownExpected, filter: &Expr, ) -> RecordBatch { println!(" scan options: {scan_options:?}"); @@ -404,6 +520,14 @@ impl<'a> TestCase<'a> { // verify expected pushdown let metrics = TestParquetFile::parquet_metrics(exec).expect("found parquet metrics"); + + let pushdown_expected = if scan_options.pushdown_filters { + self.pushdown_expected + } else { + // if filter pushdown is not enabled we don't expect it to filter rows + PushdownExpected::None + }; + let pushdown_rows_filtered = get_value(&metrics, "pushdown_rows_filtered"); println!(" pushdown_rows_filtered: {}", pushdown_rows_filtered); @@ -419,6 +543,29 @@ impl<'a> TestCase<'a> { } }; + let page_index_rows_filtered = get_value(&metrics, "page_index_rows_filtered"); + println!(" page_index_rows_filtered: {}", page_index_rows_filtered); + + let page_index_filtering_expected = if scan_options.enable_page_index { + self.page_index_filtering_expected + } else { + // if page index filtering is not enabled, don't expect it + // to filter rows + PageIndexFilteringExpected::None + }; + + match page_index_filtering_expected { + PageIndexFilteringExpected::None => { + assert_eq!(page_index_rows_filtered, 0); + } + PageIndexFilteringExpected::Some => { + assert!( + page_index_rows_filtered > 0, + "Expected to filter rows via page index but none were", + ); + } + }; + batch } } diff --git a/parquet-test-utils/src/lib.rs b/parquet-test-utils/src/lib.rs index 6a3e6e0ec1c2..41066584e988 100644 --- a/parquet-test-utils/src/lib.rs +++ b/parquet-test-utils/src/lib.rs @@ -51,7 +51,7 @@ pub struct TestParquetFile { object_meta: ObjectMeta, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub struct ParquetScanOptions { pub pushdown_filters: bool, pub reorder_filters: bool, @@ -59,34 +59,20 @@ pub struct ParquetScanOptions { } impl TestParquetFile { - /// Creates a new parquet file at the specified location + /// Creates a new parquet file at the specified location with the + /// given properties pub fn try_new( path: PathBuf, + props: WriterProperties, batches: impl IntoIterator, - page_size: Option, - row_group_size: Option, ) -> Result { let file = File::create(&path).unwrap(); - let mut props_builder = WriterProperties::builder(); - - if let Some(s) = page_size { - props_builder = props_builder - .set_data_pagesize_limit(s) - .set_write_batch_size(s); - } - - if let Some(s) = row_group_size { - props_builder = props_builder.set_max_row_group_size(s); - } - let mut batches = batches.into_iter(); let first_batch = batches.next().expect("need at least one record batch"); let schema = first_batch.schema(); - let mut writer = - ArrowWriter::try_new(file, schema.clone(), Some(props_builder.build())) - .unwrap(); + let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)).unwrap(); writer.write(&first_batch).unwrap(); writer.flush()?; @@ -122,7 +108,9 @@ impl TestParquetFile { object_meta, }) } +} +impl TestParquetFile { /// return a `ParquetExec` and `FilterExec` with the specified options to scan this parquet file. /// /// This returns the same plan that DataFusion will make with a pushed down predicate followed by a filter: diff --git a/test-utils/src/data_gen.rs b/test-utils/src/data_gen.rs index adff4a514615..c82d56ef21f9 100644 --- a/test-utils/src/data_gen.rs +++ b/test-utils/src/data_gen.rs @@ -19,8 +19,8 @@ use std::ops::Range; use std::sync::Arc; use arrow::array::{ - Int32Builder, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder, - UInt16Builder, + Decimal128Builder, Int32Builder, StringBuilder, StringDictionaryBuilder, + TimestampNanosecondBuilder, UInt16Builder, }; use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit}; use arrow::record_batch::RecordBatch; @@ -43,6 +43,7 @@ struct BatchBuilder { request_bytes: Int32Builder, response_bytes: Int32Builder, response_status: UInt16Builder, + prices_status: Decimal128Builder, /// optional number of rows produced row_limit: Option, @@ -73,6 +74,7 @@ impl BatchBuilder { Field::new("request_bytes", DataType::Int32, true), Field::new("response_bytes", DataType::Int32, true), Field::new("response_status", DataType::UInt16, false), + Field::new("decimal_price", DataType::Decimal128(38, 0), false), ])) } @@ -146,6 +148,7 @@ impl BatchBuilder { .append_option(rng.gen_bool(0.9).then(|| rng.gen())); self.response_status .append_value(status[rng.gen_range(0..status.len())]); + self.prices_status.append_value(self.row_count as i128); } fn finish(mut self, schema: SchemaRef) -> RecordBatch { @@ -166,6 +169,12 @@ impl BatchBuilder { Arc::new(self.request_bytes.finish()), Arc::new(self.response_bytes.finish()), Arc::new(self.response_status.finish()), + Arc::new( + self.prices_status + .finish() + .with_precision_and_scale(38, 0) + .unwrap(), + ), ], ) .unwrap()