From 13719703cca7a51d6ea56e373c7f9eb9d0b2a5a6 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 26 Dec 2022 20:59:34 +0800 Subject: [PATCH 1/4] refactor: parallelize test case Signed-off-by: Ruihang Xia --- .../core/tests/parquet/filter_pushdown.rs | 112 +++++++++--------- 1 file changed, 54 insertions(+), 58 deletions(-) diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index ac3744278c14..0a165eb5fab9 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -26,6 +26,7 @@ //! select * from data limit 10; //! ``` +use std::sync::Arc; use std::time::Instant; use arrow::compute::concat_batches; @@ -51,7 +52,7 @@ fn init() { } #[cfg(not(target_family = "windows"))] -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn single_file() { // Only create the parquet file once as it is fairly large @@ -65,31 +66,32 @@ async fn single_file() { let start = Instant::now(); println!("Writing test data to {:?}", file); - let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap(); + let test_parquet_file = + Arc::new(TestParquetFile::try_new(file, props, generator).unwrap()); println!( "Completed generating test data in {:?}", Instant::now() - start ); - TestCase::new(&test_parquet_file) + let mut set = tokio::task::JoinSet::new(); + + let case = TestCase::new(test_parquet_file.clone()) .with_name("selective") // request_method = 'GET' .with_filter(col("request_method").eq(lit("GET"))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(8886) - .run() - .await; + .with_expected_rows(8886); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("non_selective") // request_method != 'GET' .with_filter(col("request_method").not_eq(lit("GET"))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(44933) - .run() - .await; + .with_expected_rows(44933); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("basic_conjunction") // request_method = 'POST' AND // response_status = 503 @@ -101,49 +103,44 @@ async fn single_file() { .unwrap(), ) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(1729) - .run() - .await; + .with_expected_rows(1729); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("everything") // filter filters everything (no row has this status) // response_status = 429 .with_filter(col("response_status").eq(lit(429_u16))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(0) - .run() - .await; + .with_expected_rows(0); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("nothing") // No rows are filtered out -- all are returned // response_status > 0 .with_filter(col("response_status").gt(lit(0_u16))) .with_pushdown_expected(PushdownExpected::None) - .with_expected_rows(NUM_ROWS) - .run() - .await; + .with_expected_rows(NUM_ROWS); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("dict_selective") // container = 'backend_container_0' .with_filter(col("container").eq(lit("backend_container_0"))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(37856) - .run() - .await; + .with_expected_rows(37856); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("not eq") // container != 'backend_container_0' .with_filter(col("container").not_eq(lit("backend_container_0"))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(15963) - .run() - .await; + .with_expected_rows(15963); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("dict_conjunction") // container == 'backend_container_0' AND // pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg' @@ -155,11 +152,10 @@ async fn single_file() { .unwrap(), ) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(3052) - .run() - .await; + .with_expected_rows(3052); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("dict_very_selective") // request_bytes > 2B AND // container == 'backend_container_0' AND @@ -173,11 +169,10 @@ async fn single_file() { .unwrap(), ) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(88) - .run() - .await; + .with_expected_rows(88); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("dict_very_selective2") // picks only 2 rows // client_addr = '204.47.29.82' AND @@ -192,11 +187,10 @@ async fn single_file() { .unwrap(), ) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(88) - .run() - .await; + .with_expected_rows(88); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("dict_disjunction") // container = 'backend_container_0' OR // pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg' @@ -208,11 +202,10 @@ async fn single_file() { .unwrap(), ) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(39982) - .run() - .await; + .with_expected_rows(39982); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("dict_disjunction3") // request_method != 'GET' OR // response_status = 400 OR @@ -226,9 +219,11 @@ async fn single_file() { .unwrap(), ) .with_pushdown_expected(PushdownExpected::None) - .with_expected_rows(NUM_ROWS) - .run() - .await; + .with_expected_rows(NUM_ROWS); + set.spawn(async move { case.run().await }); + + // Join all the cases. + while let Some(_) = set.join_next().await {} } #[cfg(not(target_family = "windows"))] @@ -247,7 +242,8 @@ async fn single_file_small_data_pages() { let start = Instant::now(); println!("Writing test data to {:?}", file); - let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap(); + let test_parquet_file = + Arc::new(TestParquetFile::try_new(file, props, generator).unwrap()); println!( "Completed generating test data in {:?}", Instant::now() - start @@ -267,7 +263,7 @@ async fn single_file_small_data_pages() { // page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fktdcgtmzvoedpwhfevcvvrtaurzgex, max: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, num_nulls not defined] CRC:[none] SZ:7 VC:9216 // page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, max: iadnalqpdzthpifrvewossmpqibgtsuin, num_nulls not defined] CRC:[none] SZ:7 VC:7739 - TestCase::new(&test_parquet_file) + TestCase::new(test_parquet_file.clone()) .with_name("selective") // predicate is chosen carefully to prune pages 0, 1, 2, 3, 4 // pod = 'iadnalqpdzthpifrvewossmpqibgtsuin' @@ -286,7 +282,7 @@ async fn single_file_small_data_pages() { // page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.004269056, num_nulls not defined] CRC:[none] SZ:14996 VC:9216 // page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.007261184, num_nulls not defined] CRC:[none] SZ:14996 VC:9216 // page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.005330944, num_nulls not defined] CRC:[none] SZ:12601 VC:7739 - TestCase::new(&test_parquet_file) + TestCase::new(test_parquet_file.clone()) .with_name("selective") // predicate is chosen carefully to prune pages 1, 2, 4, and 5 // time > 1970-01-01T00:00:00.004300000 @@ -314,7 +310,7 @@ async fn single_file_small_data_pages() { // offset compressed size first row index // page-0 5581636 147517 0 // page-1 5729153 147517 9216 - TestCase::new(&test_parquet_file) + TestCase::new(test_parquet_file.clone()) .with_name("selective_on_decimal") // predicate is chosen carefully to prune pages 1, 2, 3, 4, and 5 // decimal_price < 9200 @@ -345,8 +341,8 @@ enum PageIndexFilteringExpected { } /// parameters for running a test -struct TestCase<'a> { - test_parquet_file: &'a TestParquetFile, +struct TestCase { + test_parquet_file: Arc, /// Human readable name to help debug failures name: String, /// The filter to apply @@ -361,8 +357,8 @@ struct TestCase<'a> { expected_rows: usize, } -impl<'a> TestCase<'a> { - fn new(test_parquet_file: &'a TestParquetFile) -> Self { +impl TestCase { + fn new(test_parquet_file: Arc) -> Self { Self { test_parquet_file, name: "".into(), From 03c60ba4db0dbe2dc7b348596b214bf862645661 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 27 Dec 2022 10:04:11 +0800 Subject: [PATCH 2/4] Apply suggestions from code review Co-authored-by: Andrew Lamb --- datafusion/core/tests/parquet/filter_pushdown.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index 0a165eb5fab9..ae9154d51c8d 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -52,6 +52,7 @@ fn init() { } #[cfg(not(target_family = "windows"))] +// Use multi-threaded executor as this test consumes CPU #[tokio::test(flavor = "multi_thread")] async fn single_file() { // Only create the parquet file once as it is fairly large @@ -223,7 +224,9 @@ async fn single_file() { set.spawn(async move { case.run().await }); // Join all the cases. - while let Some(_) = set.join_next().await {} + while let Some(result) = set.join_next().await { + result.unwrap() + } } #[cfg(not(target_family = "windows"))] From 554eb9f6373157d28f0287cd89afa36bab848758 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 27 Dec 2022 10:08:30 +0800 Subject: [PATCH 3/4] format code Signed-off-by: Ruihang Xia --- datafusion/core/tests/parquet/filter_pushdown.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index ae9154d51c8d..855b391c7b00 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -225,7 +225,7 @@ async fn single_file() { // Join all the cases. while let Some(result) = set.join_next().await { - result.unwrap() + result.unwrap() } } From 8799a641fa1c771422accc84f61b0875b7778d8b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 27 Dec 2022 10:48:35 +0800 Subject: [PATCH 4/4] change row limit Signed-off-by: Ruihang Xia --- .../core/tests/parquet/filter_pushdown.rs | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index 855b391c7b00..59350113c943 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -43,6 +43,7 @@ use test_utils::AccessLogGenerator; /// how many rows of generated data to write to our parquet file (arbitrary) const NUM_ROWS: usize = 53819; +const ROW_LIMIT: usize = 4096; #[cfg(test)] #[ctor::ctor] @@ -59,7 +60,9 @@ async fn single_file() { let tempdir = TempDir::new().unwrap(); - let generator = AccessLogGenerator::new().with_row_limit(NUM_ROWS); + let generator = AccessLogGenerator::new() + .with_row_limit(NUM_ROWS) + .with_max_batch_size(ROW_LIMIT); // default properties let props = WriterProperties::builder().build(); @@ -81,7 +84,7 @@ async fn single_file() { // request_method = 'GET' .with_filter(col("request_method").eq(lit("GET"))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(8886); + .with_expected_rows(8875); set.spawn(async move { case.run().await }); let case = TestCase::new(test_parquet_file.clone()) @@ -89,7 +92,7 @@ async fn single_file() { // request_method != 'GET' .with_filter(col("request_method").not_eq(lit("GET"))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(44933); + .with_expected_rows(44944); set.spawn(async move { case.run().await }); let case = TestCase::new(test_parquet_file.clone()) @@ -104,7 +107,7 @@ async fn single_file() { .unwrap(), ) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(1729); + .with_expected_rows(1731); set.spawn(async move { case.run().await }); let case = TestCase::new(test_parquet_file.clone()) @@ -130,7 +133,7 @@ async fn single_file() { // container = 'backend_container_0' .with_filter(col("container").eq(lit("backend_container_0"))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(37856); + .with_expected_rows(15911); set.spawn(async move { case.run().await }); let case = TestCase::new(test_parquet_file.clone()) @@ -138,7 +141,7 @@ async fn single_file() { // container != 'backend_container_0' .with_filter(col("container").not_eq(lit("backend_container_0"))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(15963); + .with_expected_rows(37908); set.spawn(async move { case.run().await }); let case = TestCase::new(test_parquet_file.clone()) @@ -202,8 +205,8 @@ async fn single_file() { ]) .unwrap(), ) - .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(39982); + .with_pushdown_expected(PushdownExpected::None) + .with_expected_rows(16955); set.spawn(async move { case.run().await }); let case = TestCase::new(test_parquet_file.clone()) @@ -219,8 +222,8 @@ async fn single_file() { ]) .unwrap(), ) - .with_pushdown_expected(PushdownExpected::None) - .with_expected_rows(NUM_ROWS); + .with_pushdown_expected(PushdownExpected::Some) + .with_expected_rows(48919); set.spawn(async move { case.run().await }); // Join all the cases. @@ -532,12 +535,13 @@ impl TestCase { match pushdown_expected { PushdownExpected::None => { - assert_eq!(pushdown_rows_filtered, 0); + assert_eq!(pushdown_rows_filtered, 0, "{}", self.name); } PushdownExpected::Some => { assert!( pushdown_rows_filtered > 0, - "Expected to filter rows via pushdown, but none were" + "{}: Expected to filter rows via pushdown, but none were", + self.name ); } };