Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: parallelize parquet_exec test case single_file #4735

Merged
merged 4 commits into from
Dec 27, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions datafusion/core/tests/parquet/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use test_utils::AccessLogGenerator;

/// how many rows of generated data to write to our parquet file (arbitrary)
const NUM_ROWS: usize = 53819;
const ROW_LIMIT: usize = 4096;

#[cfg(test)]
#[ctor::ctor]
Expand All @@ -59,7 +60,9 @@ async fn single_file() {

let tempdir = TempDir::new().unwrap();

let generator = AccessLogGenerator::new().with_row_limit(NUM_ROWS);
let generator = AccessLogGenerator::new()
.with_row_limit(NUM_ROWS)
.with_max_batch_size(ROW_LIMIT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't quite what I meant, this just alters the internal batch size that the file is generated in. I'll add it to my list to get a PR in to adjust the generator to not generate such unnecessarily large files

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note I carefully chose the values of the predicates in this test to cover various cases based on how the values were distributed in the data generator. I am sure it will be possible to reduce the size of the data file, but it will require some carefulness to ensure the coverage is retained

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR in #4743


// default properties
let props = WriterProperties::builder().build();
Expand All @@ -81,15 +84,15 @@ async fn single_file() {
// request_method = 'GET'
.with_filter(col("request_method").eq(lit("GET")))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(8886);
.with_expected_rows(8875);
set.spawn(async move { case.run().await });

let case = TestCase::new(test_parquet_file.clone())
.with_name("non_selective")
// request_method != 'GET'
.with_filter(col("request_method").not_eq(lit("GET")))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(44933);
.with_expected_rows(44944);
set.spawn(async move { case.run().await });

let case = TestCase::new(test_parquet_file.clone())
Expand All @@ -104,7 +107,7 @@ async fn single_file() {
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(1729);
.with_expected_rows(1731);
set.spawn(async move { case.run().await });

let case = TestCase::new(test_parquet_file.clone())
Expand All @@ -130,15 +133,15 @@ async fn single_file() {
// container = 'backend_container_0'
.with_filter(col("container").eq(lit("backend_container_0")))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(37856);
.with_expected_rows(15911);
set.spawn(async move { case.run().await });

let case = TestCase::new(test_parquet_file.clone())
.with_name("not eq")
// container != 'backend_container_0'
.with_filter(col("container").not_eq(lit("backend_container_0")))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(15963);
.with_expected_rows(37908);
set.spawn(async move { case.run().await });

let case = TestCase::new(test_parquet_file.clone())
Expand Down Expand Up @@ -202,8 +205,8 @@ async fn single_file() {
])
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(39982);
.with_pushdown_expected(PushdownExpected::None)
.with_expected_rows(16955);
set.spawn(async move { case.run().await });

let case = TestCase::new(test_parquet_file.clone())
Expand All @@ -219,8 +222,8 @@ async fn single_file() {
])
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::None)
.with_expected_rows(NUM_ROWS);
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(48919);
set.spawn(async move { case.run().await });

// Join all the cases.
Expand Down Expand Up @@ -532,12 +535,13 @@ impl TestCase {

match pushdown_expected {
PushdownExpected::None => {
assert_eq!(pushdown_rows_filtered, 0);
assert_eq!(pushdown_rows_filtered, 0, "{}", self.name);
}
PushdownExpected::Some => {
assert!(
pushdown_rows_filtered > 0,
"Expected to filter rows via pushdown, but none were"
"{}: Expected to filter rows via pushdown, but none were",
self.name
);
}
};
Expand Down