From cbc9885b8b550aca46860053eda8f9b2644c06bf Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 7 Nov 2022 16:40:40 -0500 Subject: [PATCH] Support parquet page filtering for string columns --- .../file_format/parquet/page_filter.rs | 15 +++++++++-- .../core/tests/parquet_filter_pushdown.rs | 25 ++++++++----------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs index 95c93151af02..b0f93a27b8ac 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -17,7 +17,9 @@ //! Contains code to filter entire pages -use arrow::array::{BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array}; +use arrow::array::{ + BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, StringArray, +}; use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError}; use datafusion_common::{Column, DataFusionError, Result}; use datafusion_expr::utils::expr_to_columns; @@ -421,7 +423,16 @@ macro_rules! get_min_max_values_for_page_index { vec.iter().map(|x| x.$func().cloned()), ))) } - Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => { + Index::BYTE_ARRAY(index) => { + let vec = &index.indexes; + let array: StringArray = vec + .iter() + .map(|x| x.$func()) + .map(|x| x.and_then(|x| std::str::from_utf8(x).ok())) + .collect(); + Some(Arc::new(array)) + } + Index::INT96(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => { //Todo support these type None } diff --git a/datafusion/core/tests/parquet_filter_pushdown.rs b/datafusion/core/tests/parquet_filter_pushdown.rs index 54b7d8d169e4..4211011d2905 100644 --- a/datafusion/core/tests/parquet_filter_pushdown.rs +++ b/datafusion/core/tests/parquet_filter_pushdown.rs @@ -266,20 +266,17 @@ async fn single_file_small_data_pages() { // page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: djzdyiecnumrsrcbizwlqzdhnpoiqdh, max: fktdcgtmzvoedpwhfevcvvrtaurzgex, num_nulls not defined] CRC:[none] SZ:7 VC:9216 // page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fktdcgtmzvoedpwhfevcvvrtaurzgex, max: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, num_nulls not defined] CRC:[none] SZ:7 VC:9216 // page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, max: iadnalqpdzthpifrvewossmpqibgtsuin, num_nulls not defined] CRC:[none] SZ:7 VC:7739 - // - // This test currently fails due to https://github.com/apache/arrow-datafusion/issues/3833 - // (page index pruning not implemented for byte array) - - // TestCase::new(&test_parquet_file) - // .with_name("selective") - // // predicagte is chosen carefully to prune pages 0, 1, 2, 3, 4 - // // pod = 'iadnalqpdzthpifrvewossmpqibgtsuin' - // .with_filter(col("pod").eq(lit("iadnalqpdzthpifrvewossmpqibgtsuin"))) - // .with_pushdown_expected(PushdownExpected::Some) - // .with_page_index_filtering_expected(PageIndexFilteringExpected::Some) - // .with_expected_rows(2574) - // .run() - // .await; + + TestCase::new(&test_parquet_file) + .with_name("selective") + // predicate is chosen carefully to prune pages 0, 1, 2, 3, 4 + // pod = 'iadnalqpdzthpifrvewossmpqibgtsuin' + .with_filter(col("pod").eq(lit("iadnalqpdzthpifrvewossmpqibgtsuin"))) + .with_pushdown_expected(PushdownExpected::Some) + .with_page_index_filtering_expected(PageIndexFilteringExpected::Some) + .with_expected_rows(2574) + .run() + .await; // time TV=53819 RL=0 DL=0 DS: 7092 DE:PLAIN // --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------