Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: parallelize parquet_exec test case single_file #4735

Merged
merged 4 commits into from
Dec 27, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 66 additions & 63 deletions datafusion/core/tests/parquet/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
//! select * from data limit 10;
//! ```

use std::sync::Arc;
use std::time::Instant;

use arrow::compute::concat_batches;
Expand All @@ -42,6 +43,7 @@ use test_utils::AccessLogGenerator;

/// how many rows of generated data to write to our parquet file (arbitrary)
const NUM_ROWS: usize = 53819;
const ROW_LIMIT: usize = 4096;

#[cfg(test)]
#[ctor::ctor]
Expand All @@ -51,45 +53,49 @@ fn init() {
}

#[cfg(not(target_family = "windows"))]
#[tokio::test]
// Use multi-threaded executor as this test consumes CPU
#[tokio::test(flavor = "multi_thread")]
async fn single_file() {
// Only create the parquet file once as it is fairly large

let tempdir = TempDir::new().unwrap();

let generator = AccessLogGenerator::new().with_row_limit(NUM_ROWS);
let generator = AccessLogGenerator::new()
.with_row_limit(NUM_ROWS)
.with_max_batch_size(ROW_LIMIT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't quite what I meant, this just alters the internal batch size that the file is generated in. I'll add it to my list to get a PR in to adjust the generator to not generate such unnecessarily large files

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note I carefully chose the values of the predicates in this test to cover various cases based on how the values were distributed in the data generator. I am sure it will be possible to reduce the size of the data file, but it will require some carefulness to ensure the coverage is retained

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR in #4743


// default properties
let props = WriterProperties::builder().build();
let file = tempdir.path().join("data.parquet");

let start = Instant::now();
println!("Writing test data to {:?}", file);
let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap();
let test_parquet_file =
Arc::new(TestParquetFile::try_new(file, props, generator).unwrap());
println!(
"Completed generating test data in {:?}",
Instant::now() - start
);

TestCase::new(&test_parquet_file)
let mut set = tokio::task::JoinSet::new();

let case = TestCase::new(test_parquet_file.clone())
.with_name("selective")
// request_method = 'GET'
.with_filter(col("request_method").eq(lit("GET")))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(8886)
.run()
.await;
.with_expected_rows(8875);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("non_selective")
// request_method != 'GET'
.with_filter(col("request_method").not_eq(lit("GET")))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(44933)
.run()
.await;
.with_expected_rows(44944);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("basic_conjunction")
// request_method = 'POST' AND
// response_status = 503
Expand All @@ -101,49 +107,44 @@ async fn single_file() {
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(1729)
.run()
.await;
.with_expected_rows(1731);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("everything")
// filter filters everything (no row has this status)
// response_status = 429
.with_filter(col("response_status").eq(lit(429_u16)))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(0)
.run()
.await;
.with_expected_rows(0);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("nothing")
// No rows are filtered out -- all are returned
// response_status > 0
.with_filter(col("response_status").gt(lit(0_u16)))
.with_pushdown_expected(PushdownExpected::None)
.with_expected_rows(NUM_ROWS)
.run()
.await;
.with_expected_rows(NUM_ROWS);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_selective")
// container = 'backend_container_0'
.with_filter(col("container").eq(lit("backend_container_0")))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(37856)
.run()
.await;
.with_expected_rows(15911);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("not eq")
// container != 'backend_container_0'
.with_filter(col("container").not_eq(lit("backend_container_0")))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(15963)
.run()
.await;
.with_expected_rows(37908);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_conjunction")
// container == 'backend_container_0' AND
// pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg'
Expand All @@ -155,11 +156,10 @@ async fn single_file() {
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(3052)
.run()
.await;
.with_expected_rows(3052);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_very_selective")
// request_bytes > 2B AND
// container == 'backend_container_0' AND
Expand All @@ -173,11 +173,10 @@ async fn single_file() {
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(88)
.run()
.await;
.with_expected_rows(88);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_very_selective2")
// picks only 2 rows
// client_addr = '204.47.29.82' AND
Expand All @@ -192,11 +191,10 @@ async fn single_file() {
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(88)
.run()
.await;
.with_expected_rows(88);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_disjunction")
// container = 'backend_container_0' OR
// pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg'
Expand All @@ -207,12 +205,11 @@ async fn single_file() {
])
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(39982)
.run()
.await;
.with_pushdown_expected(PushdownExpected::None)
.with_expected_rows(16955);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_disjunction3")
// request_method != 'GET' OR
// response_status = 400 OR
Expand All @@ -225,10 +222,14 @@ async fn single_file() {
])
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::None)
.with_expected_rows(NUM_ROWS)
.run()
.await;
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(48919);
set.spawn(async move { case.run().await });

// Join all the cases.
while let Some(result) = set.join_next().await {
result.unwrap()
}
}

#[cfg(not(target_family = "windows"))]
Expand All @@ -247,7 +248,8 @@ async fn single_file_small_data_pages() {

let start = Instant::now();
println!("Writing test data to {:?}", file);
let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap();
let test_parquet_file =
Arc::new(TestParquetFile::try_new(file, props, generator).unwrap());
println!(
"Completed generating test data in {:?}",
Instant::now() - start
Expand All @@ -267,7 +269,7 @@ async fn single_file_small_data_pages() {
// page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fktdcgtmzvoedpwhfevcvvrtaurzgex, max: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, num_nulls not defined] CRC:[none] SZ:7 VC:9216
// page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, max: iadnalqpdzthpifrvewossmpqibgtsuin, num_nulls not defined] CRC:[none] SZ:7 VC:7739

TestCase::new(&test_parquet_file)
TestCase::new(test_parquet_file.clone())
.with_name("selective")
// predicate is chosen carefully to prune pages 0, 1, 2, 3, 4
// pod = 'iadnalqpdzthpifrvewossmpqibgtsuin'
Expand All @@ -286,7 +288,7 @@ async fn single_file_small_data_pages() {
// page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.004269056, num_nulls not defined] CRC:[none] SZ:14996 VC:9216
// page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.007261184, num_nulls not defined] CRC:[none] SZ:14996 VC:9216
// page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.005330944, num_nulls not defined] CRC:[none] SZ:12601 VC:7739
TestCase::new(&test_parquet_file)
TestCase::new(test_parquet_file.clone())
.with_name("selective")
// predicate is chosen carefully to prune pages 1, 2, 4, and 5
// time > 1970-01-01T00:00:00.004300000
Expand Down Expand Up @@ -314,7 +316,7 @@ async fn single_file_small_data_pages() {
// offset compressed size first row index
// page-0 5581636 147517 0
// page-1 5729153 147517 9216
TestCase::new(&test_parquet_file)
TestCase::new(test_parquet_file.clone())
.with_name("selective_on_decimal")
// predicate is chosen carefully to prune pages 1, 2, 3, 4, and 5
// decimal_price < 9200
Expand Down Expand Up @@ -345,8 +347,8 @@ enum PageIndexFilteringExpected {
}

/// parameters for running a test
struct TestCase<'a> {
test_parquet_file: &'a TestParquetFile,
struct TestCase {
test_parquet_file: Arc<TestParquetFile>,
/// Human readable name to help debug failures
name: String,
/// The filter to apply
Expand All @@ -361,8 +363,8 @@ struct TestCase<'a> {
expected_rows: usize,
}

impl<'a> TestCase<'a> {
fn new(test_parquet_file: &'a TestParquetFile) -> Self {
impl TestCase {
fn new(test_parquet_file: Arc<TestParquetFile>) -> Self {
Self {
test_parquet_file,
name: "<NOT SPECIFIED>".into(),
Expand Down Expand Up @@ -533,12 +535,13 @@ impl<'a> TestCase<'a> {

match pushdown_expected {
PushdownExpected::None => {
assert_eq!(pushdown_rows_filtered, 0);
assert_eq!(pushdown_rows_filtered, 0, "{}", self.name);
}
PushdownExpected::Some => {
assert!(
pushdown_rows_filtered > 0,
"Expected to filter rows via pushdown, but none were"
"{}: Expected to filter rows via pushdown, but none were",
self.name
);
}
};
Expand Down