-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support parquet page filtering on min_max for decimal128
and string
columns
#4255
Conversation
Signed-off-by: yangjiang <[email protected]>
Signed-off-by: yangjiang <[email protected]>
datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
Outdated
Show resolved
Hide resolved
// Convert the bytes array to i128. | ||
// The endian of the input bytes array must be big-endian. | ||
// Copy from the arrow-rs | ||
pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the common func here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @Ted-Jiang -- this looks great
I wonder if it would be possible to add some more targeted testing for the string and decimal page indexes in https://github.com/apache/arrow-datafusion/blob/master/datafusion/core/tests/parquet/page_pruning.rs
The current test in parquet_exec
I think ensures that the plumbing is all hooked up correctly, but I think some more targeted testing would be good too
However, overall I think this PR could also go in as is. Thanks a lot!
@@ -266,20 +266,17 @@ async fn single_file_small_data_pages() { | |||
// page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: djzdyiecnumrsrcbizwlqzdhnpoiqdh, max: fktdcgtmzvoedpwhfevcvvrtaurzgex, num_nulls not defined] CRC:[none] SZ:7 VC:9216 | |||
// page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fktdcgtmzvoedpwhfevcvvrtaurzgex, max: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, num_nulls not defined] CRC:[none] SZ:7 VC:9216 | |||
// page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, max: iadnalqpdzthpifrvewossmpqibgtsuin, num_nulls not defined] CRC:[none] SZ:7 VC:7739 | |||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
@@ -146,6 +148,7 @@ impl BatchBuilder { | |||
.append_option(rng.gen_bool(0.9).then(|| rng.gen())); | |||
self.response_status | |||
.append_value(status[rng.gen_range(0..status.len())]); | |||
self.prices_status.append_value(self.row_count as i128); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the incrementing price makes sense for range testing
@@ -382,6 +394,9 @@ fn create_row_count_in_each_page( | |||
struct PagesPruningStatistics<'a> { | |||
col_page_indexes: &'a Index, | |||
col_offset_indexes: &'a Vec<PageLocation>, | |||
// target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
…ter.rs Co-authored-by: Andrew Lamb <[email protected]>
@@ -419,10 +468,37 @@ macro_rules! get_min_max_values_for_page_index { | |||
vec.iter().map(|x| x.$func().cloned()), | |||
))) | |||
} | |||
Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => { | |||
Index::BYTE_ARRAY(index) => { | |||
let vec = &index.indexes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
decimal should be supported for this logical type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arrow-rs contains the method of decoding decimal from byte array in ByteArrayReader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, i prefer align with row group, do them together in other pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addition additional support in a follow on PR sounds like a good idea to me -- maybe we can file a ticket to track the work
agree. @Ted-Jiang |
Signed-off-by: yangjiang <[email protected]>
will add more test |
Signed-off-by: yangjiang <[email protected]>
Signed-off-by: yangjiang <[email protected]>
@alamb @liukun4515 Add types check same as in reorg test code: code refactoring avoid duplicate code in test. Some test are |
@@ -503,465 +483,3 @@ async fn prune_decimal_in_list() { | |||
) | |||
.await; | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code move to mod.rs
} | ||
|
||
#[tokio::test] | ||
#[ignore] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All test case with expr fail 😭
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we have to run "type coercion / simplifiction" on them first?
Did rowGroup run this "type coercion / simplifiction" 🤔 ? I think they are the same code path, i will find it out soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ted-Jiang Does the test_prune
function not run the optimizer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File related #4317
I plan to review this carefully tomorrow again --sorry for the delay @Ted-Jiang |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is looking good @Ted-Jiang -- the only thing I think that should be addressed before merging is the use of min()
rather than$func()
-- and that may just be my misunderstanding.
Once that is sorted out, perhaps we then get this PR in and then iterate on additional changes / data type support as follow ons?
let vec = &index.indexes; | ||
let vec: Vec<Option<i128>> = vec | ||
.iter() | ||
.map(|x| x.min().and_then(|x| Some(*x as i128))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this this be $x.$func()
rather than x.min()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your are right! i forgot change it writing the macro, real surprise ut not cover this 😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i will add a greater than test case.
let vec = &index.indexes; | ||
let vec: Vec<Option<i128>> = vec | ||
.iter() | ||
.map(|x| x.min().and_then(|x| Some(*x as i128))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question here -- should this be x.$func()
rather than x.min()
?
if let Ok(arr) = Decimal128Array::from(vec) | ||
.with_precision_and_scale(*precision, *scale) | ||
{ | ||
return Some(Arc::new(arr)); | ||
} else { | ||
return None; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might be able to this more functionally with something like (untested):
if let Ok(arr) = Decimal128Array::from(vec) | |
.with_precision_and_scale(*precision, *scale) | |
{ | |
return Some(Arc::new(arr)); | |
} else { | |
return None; | |
} | |
Decimal128Array::from(vec) | |
.with_precision_and_scale(*precision, *scale) | |
.ok() | |
.map(|arr| Arc::new(arr)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice suggestion ! Some much api need remember deal with option
and result
😂
@@ -419,10 +468,37 @@ macro_rules! get_min_max_values_for_page_index { | |||
vec.iter().map(|x| x.$func().cloned()), | |||
))) | |||
} | |||
Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => { | |||
Index::BYTE_ARRAY(index) => { | |||
let vec = &index.indexes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addition additional support in a follow on PR sounds like a good idea to me -- maybe we can file a ticket to track the work
@@ -204,3 +222,466 @@ async fn page_index_filter_multi_col() { | |||
let batch = results.next().await.unwrap().unwrap(); | |||
assert_eq!(batch.num_rows(), 7300); | |||
} | |||
|
|||
async fn test_prune( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great coverage -- thanks @Ted-Jiang. It is somewhat repetitive with the row group pruning but I think that is ok as they are different code paths
} | ||
|
||
#[tokio::test] | ||
#[ignore] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we have to run "type coercion / simplifiction" on them first?
Did rowGroup run this "type coercion / simplifiction" 🤔 ? I think they are the same code path, i will find it out soon.
} | ||
|
||
#[tokio::test] | ||
async fn prune_decimal_eq() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be worth another test that prunes something other than 5 rows -- maybe where decimal_col = 30.00
and prunes out the other pages? All of the tests here seem to prune out only the third page 20.00 -> 60.00
I'll take a look later :) |
Signed-off-by: yangjiang <[email protected]>
Signed-off-by: yangjiang <[email protected]>
I think this is ready to go in now -- thank you @xudong963 and @Ted-Jiang and @liukun4515 |
Benchmark runs are scheduled for baseline = eac254c and contender = d7a7fb6. d7a7fb6 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Related #3833.
Rationale for this change
Support page index filter on
min_max
for type decimal and string.The string code is from @alamb Thanks!
Only cast to
Decimal128Array
align withrow_group
prunning.As null_count, i prefer fix in next pr.
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?