diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs index 8ebae4110f04..b2bfcf72e1d3 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -251,7 +251,7 @@ fn prune_pages_in_one_row_group( let mut sum_row = *row_vec.first().unwrap(); let mut selected = *values.first().unwrap(); trace!("Pruned to to {:?} using {:?}", values, pruning_stats); - for (i, &f) in values.iter().skip(1).enumerate() { + for (i, &f) in values.iter().enumerate().skip(1) { if f == selected { sum_row += *row_vec.get(i).unwrap(); } else { diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index 59350113c943..101a8e14170a 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -26,7 +26,6 @@ //! select * from data limit 10; //! ``` -use std::sync::Arc; use std::time::Instant; use arrow::compute::concat_batches; @@ -42,60 +41,64 @@ use tempfile::TempDir; use test_utils::AccessLogGenerator; /// how many rows of generated data to write to our parquet file (arbitrary) -const NUM_ROWS: usize = 53819; -const ROW_LIMIT: usize = 4096; - -#[cfg(test)] -#[ctor::ctor] -fn init() { - // enable logging so RUST_LOG works - let _ = env_logger::try_init(); -} - -#[cfg(not(target_family = "windows"))] -// Use multi-threaded executor as this test consumes CPU -#[tokio::test(flavor = "multi_thread")] -async fn single_file() { - // Only create the parquet file once as it is fairly large - - let tempdir = TempDir::new().unwrap(); +const NUM_ROWS: usize = 4096; +fn generate_file(tempdir: &TempDir, props: WriterProperties) -> TestParquetFile { + // Tune down the generator for smaller files let generator = AccessLogGenerator::new() .with_row_limit(NUM_ROWS) - .with_max_batch_size(ROW_LIMIT); + .with_pods_per_host(1..4) + .with_containers_per_pod(1..2) + .with_entries_per_container(128..256); - // default properties - let props = WriterProperties::builder().build(); let file = tempdir.path().join("data.parquet"); let start = Instant::now(); println!("Writing test data to {:?}", file); - let test_parquet_file = - Arc::new(TestParquetFile::try_new(file, props, generator).unwrap()); + let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap(); println!( "Completed generating test data in {:?}", Instant::now() - start ); + test_parquet_file +} + +#[cfg(test)] +#[ctor::ctor] +fn init() { + // enable logging so RUST_LOG works + let _ = env_logger::try_init(); +} + +#[cfg(not(target_family = "windows"))] +#[tokio::test] +async fn single_file() { + // Only create the parquet file once as it is fairly large - let mut set = tokio::task::JoinSet::new(); + let tempdir = TempDir::new().unwrap(); + // Set row group size smaller so can test with fewer rows + let props = WriterProperties::builder() + .set_max_row_group_size(1024) + .build(); + let test_parquet_file = generate_file(&tempdir, props); - let case = TestCase::new(test_parquet_file.clone()) + let case = TestCase::new(&test_parquet_file) .with_name("selective") // request_method = 'GET' .with_filter(col("request_method").eq(lit("GET"))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(8875); - set.spawn(async move { case.run().await }); + .with_expected_rows(688); + case.run().await; - let case = TestCase::new(test_parquet_file.clone()) + let case = TestCase::new(&test_parquet_file) .with_name("non_selective") // request_method != 'GET' .with_filter(col("request_method").not_eq(lit("GET"))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(44944); - set.spawn(async move { case.run().await }); + .with_expected_rows(3408); + case.run().await; - let case = TestCase::new(test_parquet_file.clone()) + let case = TestCase::new(&test_parquet_file) .with_name("basic_conjunction") // request_method = 'POST' AND // response_status = 503 @@ -107,109 +110,108 @@ async fn single_file() { .unwrap(), ) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(1731); - set.spawn(async move { case.run().await }); + .with_expected_rows(135); + case.run().await; - let case = TestCase::new(test_parquet_file.clone()) + let case = TestCase::new(&test_parquet_file) .with_name("everything") // filter filters everything (no row has this status) // response_status = 429 .with_filter(col("response_status").eq(lit(429_u16))) .with_pushdown_expected(PushdownExpected::Some) .with_expected_rows(0); - set.spawn(async move { case.run().await }); + case.run().await; - let case = TestCase::new(test_parquet_file.clone()) + let case = TestCase::new(&test_parquet_file) .with_name("nothing") // No rows are filtered out -- all are returned // response_status > 0 .with_filter(col("response_status").gt(lit(0_u16))) .with_pushdown_expected(PushdownExpected::None) .with_expected_rows(NUM_ROWS); - set.spawn(async move { case.run().await }); + case.run().await; - let case = TestCase::new(test_parquet_file.clone()) + let case = TestCase::new(&test_parquet_file) .with_name("dict_selective") // container = 'backend_container_0' .with_filter(col("container").eq(lit("backend_container_0"))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(15911); - set.spawn(async move { case.run().await }); + .with_expected_rows(802); + case.run().await; - let case = TestCase::new(test_parquet_file.clone()) + let case = TestCase::new(&test_parquet_file) .with_name("not eq") // container != 'backend_container_0' .with_filter(col("container").not_eq(lit("backend_container_0"))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(37908); - set.spawn(async move { case.run().await }); + .with_expected_rows(3294); + case.run().await; - let case = TestCase::new(test_parquet_file.clone()) + let case = TestCase::new(&test_parquet_file) .with_name("dict_conjunction") // container == 'backend_container_0' AND - // pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg' + // pod = 'cvcjfhwtjttxhiugepoojxrplihywu' .with_filter( conjunction([ col("container").eq(lit("backend_container_0")), - col("pod").eq(lit("aqcathnxqsphdhgjtgvxsfyiwbmhlmg")), + col("pod").eq(lit("cvcjfhwtjttxhiugepoojxrplihywu")), ]) .unwrap(), ) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(3052); - set.spawn(async move { case.run().await }); + .with_expected_rows(134); + case.run().await; - let case = TestCase::new(test_parquet_file.clone()) + let case = TestCase::new(&test_parquet_file) .with_name("dict_very_selective") // request_bytes > 2B AND // container == 'backend_container_0' AND - // pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg' + // pod = 'cvcjfhwtjttxhiugepoojxrplihywu' .with_filter( conjunction([ col("request_bytes").gt(lit(2000000000)), col("container").eq(lit("backend_container_0")), - col("pod").eq(lit("aqcathnxqsphdhgjtgvxsfyiwbmhlmg")), + col("pod").eq(lit("cvcjfhwtjttxhiugepoojxrplihywu")), ]) .unwrap(), ) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(88); - set.spawn(async move { case.run().await }); + .with_expected_rows(2); + case.run().await; - let case = TestCase::new(test_parquet_file.clone()) + let case = TestCase::new(&test_parquet_file) .with_name("dict_very_selective2") - // picks only 2 rows // client_addr = '204.47.29.82' AND // container == 'backend_container_0' AND - // pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg' + // pod = 'cvcjfhwtjttxhiugepoojxrplihywu' .with_filter( conjunction(vec![ - col("request_bytes").gt(lit(2000000000)), + col("client_addr").eq(lit("58.242.143.99")), col("container").eq(lit("backend_container_0")), - col("pod").eq(lit("aqcathnxqsphdhgjtgvxsfyiwbmhlmg")), + col("pod").eq(lit("cvcjfhwtjttxhiugepoojxrplihywu")), ]) .unwrap(), ) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(88); - set.spawn(async move { case.run().await }); + .with_expected_rows(1); + case.run().await; - let case = TestCase::new(test_parquet_file.clone()) + let case = TestCase::new(&test_parquet_file) .with_name("dict_disjunction") // container = 'backend_container_0' OR - // pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg' + // pod = 'cvcjfhwtjttxhiugepoojxrplihywu' .with_filter( disjunction([ col("container").eq(lit("backend_container_0")), - col("pod").eq(lit("aqcathnxqsphdhgjtgvxsfyiwbmhlmg")), + col("pod").eq(lit("cvcjfhwtjttxhiugepoojxrplihywu")), ]) .unwrap(), ) - .with_pushdown_expected(PushdownExpected::None) - .with_expected_rows(16955); - set.spawn(async move { case.run().await }); + .with_pushdown_expected(PushdownExpected::Some) + .with_expected_rows(802); + case.run().await; - let case = TestCase::new(test_parquet_file.clone()) + let case = TestCase::new(&test_parquet_file) .with_name("dict_disjunction3") // request_method != 'GET' OR // response_status = 400 OR @@ -223,13 +225,8 @@ async fn single_file() { .unwrap(), ) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(48919); - set.spawn(async move { case.run().await }); - - // Join all the cases. - while let Some(result) = set.join_next().await { - result.unwrap() - } + .with_expected_rows(3672); + case.run().await; } #[cfg(not(target_family = "windows"))] @@ -237,93 +234,113 @@ async fn single_file() { async fn single_file_small_data_pages() { let tempdir = TempDir::new().unwrap(); - let generator = AccessLogGenerator::new().with_row_limit(NUM_ROWS); - - // set the max page rows with arbitrary sizes 8311 to increase - // effectiveness of page filtering + // Set low row count limit to improve page filtering let props = WriterProperties::builder() - .set_data_page_row_count_limit(8311) + .set_max_row_group_size(2048) + .set_data_page_row_count_limit(512) + .set_write_batch_size(512) .build(); - let file = tempdir.path().join("data_8311.parquet"); - - let start = Instant::now(); - println!("Writing test data to {:?}", file); - let test_parquet_file = - Arc::new(TestParquetFile::try_new(file, props, generator).unwrap()); - println!( - "Completed generating test data in {:?}", - Instant::now() - start - ); + let test_parquet_file = generate_file(&tempdir, props); // The statistics on the 'pod' column are as follows: // - // parquet-tools dump -d ~/Downloads/data_8311.parquet + // docker run -v /tmp:/tmp nathanhowell/parquet-tools dump -d -c pod -n /tmp/.tmppkTohR/data.parquet + // + // ``` + // row group 0 + // -------------------------------------------------------------------------------- + // pod: BINARY UNCOMPRESSED DO:782 FPO:1215 SZ:744/744/1.00 VC:2048 ENC:RLE,RLE_DICTIONARY,PLAIN ST:[min: azvagebjesrqboyqxmgaskvpwddebuptqyy, max: zamirxzhihhfqdvhuxeziuukkqyutmczbhfgx, num_nulls not defined] + // + // pod TV=2048 RL=0 DL=0 DS: 11 DE:PLAIN + // ---------------------------------------------------------------------------- + // page 0: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: azvagebjesrqboyqxmgaskvpwddebuptqyy, max: ksjzzqfxvawhmlkopjsbponfdwsurxff, num_nulls not defined] CRC:[none] SZ:10 VC:512 + // page 1: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: azvagebjesrqboyqxmgaskvpwddebuptqyy, max: wlftgepiwhnmzqrsyijhqbauhjplru, num_nulls not defined] CRC:[none] SZ:18 VC:1013 + // page 2: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: ewzlijvnljqeqhqhftfalqbqfsyidw, max: zamirxzhihhfqdvhuxeziuukkqyutmczbhfgx, num_nulls not defined] CRC:[none] SZ:12 VC:523 // - // ... - // pod TV=53819 RL=0 DL=0 DS: 8 DE:PLAIN - // --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - // page 0: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: aqcathnxqsphdhgjtgvxsfyiwbmhlmg, max: bvjjmytpfzdfsvlzfhbunasihjgxpesbmxv, num_nulls not defined] CRC:[none] SZ:7 VC:9216 - // page 1: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: bvjjmytpfzdfsvlzfhbunasihjgxpesbmxv, max: bxyubzxbbmhroqhrdzttngxcpwwgkpaoizvgzd, num_nulls not defined] CRC:[none] SZ:7 VC:9216 - // page 2: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: bxyubzxbbmhroqhrdzttngxcpwwgkpaoizvgzd, max: djzdyiecnumrsrcbizwlqzdhnpoiqdh, num_nulls not defined] CRC:[none] SZ:10 VC:9216 - // page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: djzdyiecnumrsrcbizwlqzdhnpoiqdh, max: fktdcgtmzvoedpwhfevcvvrtaurzgex, num_nulls not defined] CRC:[none] SZ:7 VC:9216 - // page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fktdcgtmzvoedpwhfevcvvrtaurzgex, max: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, num_nulls not defined] CRC:[none] SZ:7 VC:9216 - // page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, max: iadnalqpdzthpifrvewossmpqibgtsuin, num_nulls not defined] CRC:[none] SZ:7 VC:7739 - - TestCase::new(test_parquet_file.clone()) + // row group 1 + // -------------------------------------------------------------------------------- + // pod: BINARY UNCOMPRESSED DO:249244 FPO:249724 SZ:901/901/1.00 VC:2048 ENC:RLE,RLE_DICTIONARY,PLAIN ST:[min: csvnvrdcuzoftoidzmczrtqnrzgfpj, max: zamirxzhihhfqdvhuxeziuukkqyutmczbhfgx, num_nulls not defined] + // + // pod TV=2048 RL=0 DL=0 DS: 12 DE:PLAIN + // ---------------------------------------------------------------------------- + // page 0: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: dhhqgbsjutqdvqpikmnwqdnrhkqnjyieoviujkj, max: zamirxzhihhfqdvhuxeziuukkqyutmczbhfgx, num_nulls not defined] CRC:[none] SZ:12 VC:512 + // page 1: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: dlowgwtqjiifqajbobiuqoflbmsbobwsqtrc, max: uipgzhbptpinjcwbdwhkfdjzdfzrlffrifzh, num_nulls not defined] CRC:[none] SZ:12 VC:671 + // page 2: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: csvnvrdcuzoftoidzmczrtqnrzgfpj, max: xacatvakpxztzuucoxhjiofxykryoxc, num_nulls not defined] CRC:[none] SZ:16 VC:781 + // page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: nxihlfujkdzymexwpqurhawwchvkdrntixjs, max: xacatvakpxztzuucoxhjiofxykryoxc, num_nulls not defined] CRC:[none] SZ:9 VC:84 + // ``` + + TestCase::new(&test_parquet_file) .with_name("selective") - // predicate is chosen carefully to prune pages 0, 1, 2, 3, 4 - // pod = 'iadnalqpdzthpifrvewossmpqibgtsuin' - .with_filter(col("pod").eq(lit("iadnalqpdzthpifrvewossmpqibgtsuin"))) + // predicate is chosen carefully to prune all bar 0-2 and 1-0 + // pod = 'zamirxzhihhfqdvhuxeziuukkqyutmczbhfgx' + .with_filter(col("pod").eq(lit("zamirxzhihhfqdvhuxeziuukkqyutmczbhfgx"))) .with_pushdown_expected(PushdownExpected::Some) .with_page_index_filtering_expected(PageIndexFilteringExpected::Some) - .with_expected_rows(2574) + .with_expected_rows(174) .run() .await; - // time TV=53819 RL=0 DL=0 DS: 7092 DE:PLAIN - // -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - // page 0: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.004133888, num_nulls not defined] CRC:[none] SZ:13844 VC:9216 - // page 1: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.006397952, num_nulls not defined] CRC:[none] SZ:14996 VC:9216 - // page 2: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.005650432, num_nulls not defined] CRC:[none] SZ:14996 VC:9216 - // page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.004269056, num_nulls not defined] CRC:[none] SZ:14996 VC:9216 - // page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.007261184, num_nulls not defined] CRC:[none] SZ:14996 VC:9216 - // page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.005330944, num_nulls not defined] CRC:[none] SZ:12601 VC:7739 - TestCase::new(test_parquet_file.clone()) + // row group 0 + // -------------------------------------------------------------------------------- + // time: INT64 UNCOMPRESSED DO:3317 FPO:5334 SZ:4209/4209/1.00 VC:2048 ENC:RLE,RLE_DICTIONARY,PLAIN ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.000254976, num_nulls not defined] + // + // time TV=2048 RL=0 DL=0 DS: 250 DE:PLAIN + // ---------------------------------------------------------------------------- + // page 0: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.000203776, num_nulls not defined] CRC:[none] SZ:515 VC:512 + // page 1: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.000254976, num_nulls not defined] CRC:[none] SZ:1020 VC:1013 + // page 2: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.000216064, num_nulls not defined] CRC:[none] SZ:531 VC:523 + // + // row group 1 + // -------------------------------------------------------------------------------- + // time: INT64 UNCOMPRESSED DO:252201 FPO:254186 SZ:4220/4220/1.00 VC:2048 ENC:RLE,RLE_DICTIONARY,PLAIN ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.000250880, num_nulls not defined] + // + // time TV=2048 RL=0 DL=0 DS: 246 DE:PLAIN + // ---------------------------------------------------------------------------- + // page 0: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.000231424, num_nulls not defined] CRC:[none] SZ:515 VC:512 + // page 1: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.000250880, num_nulls not defined] CRC:[none] SZ:675 VC:671 + // page 2: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.000211968, num_nulls not defined] CRC:[none] SZ:787 VC:781 + // page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.000177152, num_nulls not defined] CRC:[none] SZ:90 VC:84 + + TestCase::new(&test_parquet_file) .with_name("selective") - // predicate is chosen carefully to prune pages 1, 2, 4, and 5 - // time > 1970-01-01T00:00:00.004300000 - .with_filter(col("time").gt(lit_timestamp_nano(4300000))) + // predicate is chosen carefully to prune all bar 0-1, 1-0, 1-1 + // time > 1970-01-01T00:00:00.000216064 + .with_filter(col("time").gt(lit_timestamp_nano(216064))) .with_pushdown_expected(PushdownExpected::Some) .with_page_index_filtering_expected(PageIndexFilteringExpected::Some) - .with_expected_rows(9745) + .with_expected_rows(178) .run() .await; - // decimal_price TV=53819 RL=0 DL=0 - // ---------------------------------------------------------------------------- - // row group 0: - // column index for column decimal_price: - // Boudary order: UNORDERED - // null count min max - // page-0 0 1 9216 - // page-1 0 9217 18432 - // page-2 0 18433 27648 - // page-3 0 27649 36864 - // page-4 0 36865 46080 - // page-5 0 46081 53819 + // row group 0 + // -------------------------------------------------------------------------------- + // decimal_price: FIXED_LEN_BYTE_ARRAY UNCOMPRESSED DO:0 FPO:215263 SZ:32948/32948/1.00 VC:2048 ENC:RLE,PLAIN ST:[min: 1, max: 1013, num_nulls not defined] + // + // decimal_price TV=2048 RL=0 DL=0 + // ---------------------------------------------------------------------------- + // page 0: DLE:RLE RLE:RLE VLE:PLAIN ST:[min: 1, max: 512, num_nulls not defined] CRC:[none] SZ:8192 VC:512 + // page 1: DLE:RLE RLE:RLE VLE:PLAIN ST:[min: 1, max: 1013, num_nulls not defined] CRC:[none] SZ:16208 VC:1013 + // page 2: DLE:RLE RLE:RLE VLE:PLAIN ST:[min: 1, max: 919, num_nulls not defined] CRC:[none] SZ:8368 VC:523 + // + // row group 1 + // -------------------------------------------------------------------------------- + // decimal_price: FIXED_LEN_BYTE_ARRAY UNCOMPRESSED DO:0 FPO:461433 SZ:33006/33006/1.00 VC:2048 ENC:RLE,PLAIN ST:[min: 1, max: 787, num_nulls not defined] // - // offset index for column decimal_price: - // offset compressed size first row index - // page-0 5581636 147517 0 - // page-1 5729153 147517 9216 - TestCase::new(test_parquet_file.clone()) + // decimal_price TV=2048 RL=0 DL=0 + // ---------------------------------------------------------------------------- + // page 0: DLE:RLE RLE:RLE VLE:PLAIN ST:[min: 117, max: 628, num_nulls not defined] CRC:[none] SZ:8192 VC:512 + // page 1: DLE:RLE RLE:RLE VLE:PLAIN ST:[min: 1, max: 787, num_nulls not defined] CRC:[none] SZ:10736 VC:671 + // page 2: DLE:RLE RLE:RLE VLE:PLAIN ST:[min: 1, max: 781, num_nulls not defined] CRC:[none] SZ:12496 VC:781 + // page 3: DLE:RLE RLE:RLE VLE:PLAIN ST:[min: 1, max: 515, num_nulls not defined] CRC:[none] SZ:1344 VC:84 + + TestCase::new(&test_parquet_file) .with_name("selective_on_decimal") - // predicate is chosen carefully to prune pages 1, 2, 3, 4, and 5 + // predicate is chosen carefully to prune all bar 0-1 // decimal_price < 9200 - .with_filter(col("decimal_price").lt_eq(lit(9200))) + .with_filter(col("decimal_price").gt(lit(919))) .with_pushdown_expected(PushdownExpected::Some) .with_page_index_filtering_expected(PageIndexFilteringExpected::Some) - .with_expected_rows(9200) + .with_expected_rows(94) .run() .await; } @@ -347,8 +364,8 @@ enum PageIndexFilteringExpected { } /// parameters for running a test -struct TestCase { - test_parquet_file: Arc, +struct TestCase<'a> { + test_parquet_file: &'a TestParquetFile, /// Human readable name to help debug failures name: String, /// The filter to apply @@ -363,8 +380,8 @@ struct TestCase { expected_rows: usize, } -impl TestCase { - fn new(test_parquet_file: Arc) -> Self { +impl<'a> TestCase<'a> { + fn new(test_parquet_file: &'a TestParquetFile) -> Self { Self { test_parquet_file, name: "".into(), diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 4d2830ad884b..73a6c6419fde 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -95,10 +95,10 @@ async fn page_index_filter_one_col() { // `month = 1` from the page index should create below RowSelection // vec.push(RowSelector::select(312)); // vec.push(RowSelector::skip(3330)); - // vec.push(RowSelector::select(333)); - // vec.push(RowSelector::skip(3330)); - // total 645 row - assert_eq!(batch.num_rows(), 645); + // vec.push(RowSelector::select(339)); + // vec.push(RowSelector::skip(3319)); + // total 651 row + assert_eq!(batch.num_rows(), 651); // 2. create filter month == 1 or month == 2; let filter = col("month").eq(lit(1_i32)).or(col("month").eq(lit(2_i32))); @@ -109,14 +109,16 @@ async fn page_index_filter_one_col() { let batch = results.next().await.unwrap().unwrap(); - // `month = 12` from the page index should create below RowSelection - // vec.push(RowSelector::skip(894)); - // vec.push(RowSelector::select(339)); - // vec.push(RowSelector::skip(3330)); + // `month = 1` or `month = 2` from the page index should create below RowSelection + // vec.push(RowSelector::select(312)); + // vec.push(RowSelector::skip(900)); // vec.push(RowSelector::select(312)); - // vec.push(RowSelector::skip(2430)); - // combine with before filter total 1275 row - assert_eq!(batch.num_rows(), 1275); + // vec.push(RowSelector::skip(2118)); + // vec.push(RowSelector::select(339)); + // vec.push(RowSelector::skip(873)); + // vec.push(RowSelector::select(318)); + // vec.push(RowSelector::skip(2128)); + assert_eq!(batch.num_rows(), 1281); // 3. create filter month == 1 and month == 12; let filter = col("month") @@ -141,7 +143,7 @@ async fn page_index_filter_one_col() { let batch = results.next().await.unwrap().unwrap(); // should same with `month = 1` - assert_eq!(batch.num_rows(), 645); + assert_eq!(batch.num_rows(), 651); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); @@ -192,7 +194,7 @@ async fn page_index_filter_multi_col() { let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); let batch = results.next().await.unwrap().unwrap(); - assert_eq!(batch.num_rows(), 645); + assert_eq!(batch.num_rows(), 651); // create filter (year = 2009 or id = 1) // this filter use two columns will not push down diff --git a/parquet-test-utils/src/lib.rs b/parquet-test-utils/src/lib.rs index eb36b11dfe4c..7a873b8c702a 100644 --- a/parquet-test-utils/src/lib.rs +++ b/parquet-test-utils/src/lib.rs @@ -85,12 +85,10 @@ impl TestParquetFile { let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)).unwrap(); writer.write(&first_batch).unwrap(); - writer.flush()?; let mut num_rows = first_batch.num_rows(); for batch in batches { writer.write(&batch).unwrap(); - writer.flush()?; num_rows += batch.num_rows(); } writer.close().unwrap(); diff --git a/test-utils/src/data_gen.rs b/test-utils/src/data_gen.rs index 19db65400a17..e6e3aa39845d 100644 --- a/test-utils/src/data_gen.rs +++ b/test-utils/src/data_gen.rs @@ -27,6 +27,25 @@ use arrow::record_batch::RecordBatch; use rand::prelude::StdRng; use rand::{Rng, SeedableRng}; +#[derive(Debug, Clone)] +struct GeneratorOptions { + row_limit: usize, + pods_per_host: Range, + containers_per_pod: Range, + entries_per_container: Range, +} + +impl Default for GeneratorOptions { + fn default() -> Self { + Self { + row_limit: usize::MAX, + pods_per_host: 1..15, + containers_per_pod: 1..3, + entries_per_container: 1024..8192, + } + } +} + #[derive(Default)] struct BatchBuilder { service: StringDictionaryBuilder, @@ -45,8 +64,7 @@ struct BatchBuilder { response_status: UInt16Builder, prices_status: Decimal128Builder, - /// optional number of rows produced - row_limit: Option, + options: GeneratorOptions, row_count: usize, } @@ -79,24 +97,23 @@ impl BatchBuilder { } fn is_finished(&self) -> bool { - self.row_limit - .as_ref() - .map(|x| *x <= self.row_count) - .unwrap_or_default() + self.options.row_limit <= self.row_count } fn append(&mut self, rng: &mut StdRng, host: &str, service: &str) { - let num_pods = rng.gen_range(1..15); + let num_pods = rng.gen_range(self.options.pods_per_host.clone()); let pods = generate_sorted_strings(rng, num_pods, 30..40); for pod in pods { - for container_idx in 0..rng.gen_range(1..3) { + let num_containers = rng.gen_range(self.options.containers_per_pod.clone()); + for container_idx in 0..num_containers { let container = format!("{}_container_{}", service, container_idx); let image = format!( "{}@sha256:30375999bf03beec2187843017b10c9e88d8b1a91615df4eb6350fb39472edd9", container ); - let num_entries = rng.gen_range(1024..8192); + let num_entries = + rng.gen_range(self.options.entries_per_container.clone()); for i in 0..num_entries { if self.is_finished() { return; @@ -184,12 +201,6 @@ impl BatchBuilder { ) .unwrap() } - - /// Return up to row_limit rows; - pub fn with_row_limit(mut self, row_limit: Option) -> Self { - self.row_limit = row_limit; - self - } } fn random_string(rng: &mut StdRng, len_range: Range) -> String { @@ -241,12 +252,12 @@ pub struct AccessLogGenerator { schema: SchemaRef, rng: StdRng, host_idx: usize, - /// optional number of rows produced - row_limit: usize, /// maximum rows per batch max_batch_size: usize, /// How many rows have been returned so far row_count: usize, + /// Options + options: GeneratorOptions, } impl Default for AccessLogGenerator { @@ -266,9 +277,9 @@ impl AccessLogGenerator { schema: BatchBuilder::schema(), host_idx: 0, rng: StdRng::from_seed(seed), - row_limit: usize::MAX, max_batch_size: usize::MAX, row_count: 0, + options: Default::default(), } } @@ -285,7 +296,25 @@ impl AccessLogGenerator { /// Return up to row_limit rows; pub fn with_row_limit(mut self, row_limit: usize) -> Self { - self.row_limit = row_limit; + self.options.row_limit = row_limit; + self + } + + /// Set the number of pods per host + pub fn with_pods_per_host(mut self, range: Range) -> Self { + self.options.pods_per_host = range; + self + } + + /// Set the number of containers per pod + pub fn with_containers_per_pod(mut self, range: Range) -> Self { + self.options.containers_per_pod = range; + self + } + + /// Set the number of log entries per container + pub fn with_entries_per_container(mut self, range: Range) -> Self { + self.options.entries_per_container = range; self } } @@ -294,13 +323,21 @@ impl Iterator for AccessLogGenerator { type Item = RecordBatch; fn next(&mut self) -> Option { - if self.row_count == self.row_limit { + if self.row_count == self.options.row_limit { return None; } - let mut builder = BatchBuilder::default().with_row_limit(Some( - self.max_batch_size.min(self.row_limit - self.row_count), - )); + let row_limit = self + .max_batch_size + .min(self.options.row_limit - self.row_count); + + let mut builder = BatchBuilder { + options: GeneratorOptions { + row_limit, + ..self.options.clone() + }, + ..Default::default() + }; let host = format!( "i-{:016x}.ec2.internal",