diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 61d2e5badb6b..b41ca46d05d1 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -643,7 +643,7 @@ struct RowGroupPruningStatistics<'a> { // Convert the bytes array to i128. // The endian of the input bytes array must be big-endian. // Copy from the arrow-rs -fn from_bytes_to_i128(b: &[u8]) -> i128 { +pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 { assert!(b.len() <= 16, "Decimal128Array supports only up to size 16"); let first_bit = b[0] & 128u8 == 128u8; let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] }; @@ -773,7 +773,9 @@ macro_rules! get_null_count_values { // Convert parquet column schema to arrow data type, and just consider the // decimal data type. -fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option { +pub(crate) fn parquet_to_arrow_decimal_type( + parquet_column: &ColumnDescriptor, +) -> Option { let type_ptr = parquet_column.self_type_ptr(); match type_ptr.get_basic_info().logical_type() { Some(LogicalType::Decimal { scale, precision }) => { diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs index b0f93a27b8ac..3867c41edd34 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -18,13 +18,16 @@ //! Contains code to filter entire pages use arrow::array::{ - BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, StringArray, + BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array, Int64Array, + StringArray, }; +use arrow::datatypes::DataType; use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError}; use datafusion_common::{Column, DataFusionError, Result}; use datafusion_expr::utils::expr_to_columns; use datafusion_optimizer::utils::split_conjunction; use log::{debug, error, trace}; +use parquet::schema::types::ColumnDescriptor; use parquet::{ arrow::arrow_reader::{RowSelection, RowSelector}, errors::ParquetError, @@ -38,6 +41,9 @@ use std::collections::{HashSet, VecDeque}; use std::sync::Arc; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use crate::physical_plan::file_format::parquet::{ + from_bytes_to_i128, parquet_to_arrow_decimal_type, +}; use super::metrics::ParquetFileMetrics; @@ -135,6 +141,7 @@ pub(crate) fn build_page_filter( &predicate, rg_offset_indexes.get(col_id), rg_page_indexes.get(col_id), + groups[*r].column(col_id).column_descr(), file_metrics, ) .map_err(|e| { @@ -309,15 +316,18 @@ fn prune_pages_in_one_row_group( predicate: &PruningPredicate, col_offset_indexes: Option<&Vec>, col_page_indexes: Option<&Index>, + col_desc: &ColumnDescriptor, metrics: &ParquetFileMetrics, ) -> Result> { let num_rows = group.num_rows() as usize; if let (Some(col_offset_indexes), Some(col_page_indexes)) = (col_offset_indexes, col_page_indexes) { + let target_type = parquet_to_arrow_decimal_type(col_desc); let pruning_stats = PagesPruningStatistics { col_page_indexes, col_offset_indexes, + target_type: &target_type, }; match predicate.prune(&pruning_stats) { @@ -386,6 +396,9 @@ fn create_row_count_in_each_page( struct PagesPruningStatistics<'a> { col_page_indexes: &'a Index, col_offset_indexes: &'a Vec, + // target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the + // real physical type in parquet file may be `INT32, INT64, FIXED_LEN_BYTE_ARRAY` + target_type: &'a Option, } // Extract the min or max value calling `func` from page idex @@ -394,16 +407,50 @@ macro_rules! get_min_max_values_for_page_index { match $self.col_page_indexes { Index::NONE => None, Index::INT32(index) => { - let vec = &index.indexes; - Some(Arc::new(Int32Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) + match $self.target_type { + // int32 to decimal with the precision and scale + Some(DataType::Decimal128(precision, scale)) => { + let vec = &index.indexes; + if let Ok(arr) = Decimal128Array::from_iter_values( + vec.iter().map(|x| *x.$func().unwrap() as i128), + ) + .with_precision_and_scale(*precision, *scale) + { + return Some(Arc::new(arr)); + } else { + return None; + } + } + _ => { + let vec = &index.indexes; + Some(Arc::new(Int32Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + } } Index::INT64(index) => { - let vec = &index.indexes; - Some(Arc::new(Int64Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) + match $self.target_type { + // int64 to decimal with the precision and scale + Some(DataType::Decimal128(precision, scale)) => { + let vec = &index.indexes; + if let Ok(arr) = Decimal128Array::from_iter_values( + vec.iter().map(|x| *x.$func().unwrap() as i128), + ) + .with_precision_and_scale(*precision, *scale) + { + return Some(Arc::new(arr)); + } else { + return None; + } + } + _ => { + let vec = &index.indexes; + Some(Arc::new(Int64Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + } } Index::FLOAT(index) => { let vec = &index.indexes; @@ -432,10 +479,28 @@ macro_rules! get_min_max_values_for_page_index { .collect(); Some(Arc::new(array)) } - Index::INT96(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => { + Index::INT96(_) => { //Todo support these type None } + Index::FIXED_LEN_BYTE_ARRAY(index) => { + match $self.target_type { + // int32 to decimal with the precision and scale + Some(DataType::Decimal128(precision, scale)) => { + let vec = &index.indexes; + if let Ok(array) = Decimal128Array::from_iter_values( + vec.iter().map(|x| from_bytes_to_i128(x.$func().unwrap())), + ) + .with_precision_and_scale(*precision, *scale) + { + return Some(Arc::new(array)); + } else { + return None; + } + } + _ => None, + } + } } }}; } diff --git a/datafusion/core/tests/parquet_filter_pushdown.rs b/datafusion/core/tests/parquet_filter_pushdown.rs index 4211011d2905..4aa7725cbf24 100644 --- a/datafusion/core/tests/parquet_filter_pushdown.rs +++ b/datafusion/core/tests/parquet_filter_pushdown.rs @@ -296,6 +296,34 @@ async fn single_file_small_data_pages() { .with_expected_rows(9745) .run() .await; + + // decimal_price TV=53819 RL=0 DL=0 + // ---------------------------------------------------------------------------- + // row group 0: + // column index for column decimal_price: + // Boudary order: UNORDERED + // null count min max + // page-0 0 1 9216 + // page-1 0 9217 18432 + // page-2 0 18433 27648 + // page-3 0 27649 36864 + // page-4 0 36865 46080 + // page-5 0 46081 53819 + // + // offset index for column decimal_price: + // offset compressed size first row index + // page-0 5581636 147517 0 + // page-1 5729153 147517 9216 + TestCase::new(&test_parquet_file) + .with_name("selective_on_decimal") + // predicate is chosen carefully to prune pages 1, 2, 3, 4, and 5 + // decimal_price < 9200 + .with_filter(col("decimal_price").lt_eq(lit(9200))) + .with_pushdown_expected(PushdownExpected::Some) + .with_page_index_filtering_expected(PageIndexFilteringExpected::Some) + .with_expected_rows(9200) + .run() + .await; } /// Expected pushdown behavior diff --git a/test-utils/src/data_gen.rs b/test-utils/src/data_gen.rs index adff4a514615..c82d56ef21f9 100644 --- a/test-utils/src/data_gen.rs +++ b/test-utils/src/data_gen.rs @@ -19,8 +19,8 @@ use std::ops::Range; use std::sync::Arc; use arrow::array::{ - Int32Builder, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder, - UInt16Builder, + Decimal128Builder, Int32Builder, StringBuilder, StringDictionaryBuilder, + TimestampNanosecondBuilder, UInt16Builder, }; use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit}; use arrow::record_batch::RecordBatch; @@ -43,6 +43,7 @@ struct BatchBuilder { request_bytes: Int32Builder, response_bytes: Int32Builder, response_status: UInt16Builder, + prices_status: Decimal128Builder, /// optional number of rows produced row_limit: Option, @@ -73,6 +74,7 @@ impl BatchBuilder { Field::new("request_bytes", DataType::Int32, true), Field::new("response_bytes", DataType::Int32, true), Field::new("response_status", DataType::UInt16, false), + Field::new("decimal_price", DataType::Decimal128(38, 0), false), ])) } @@ -146,6 +148,7 @@ impl BatchBuilder { .append_option(rng.gen_bool(0.9).then(|| rng.gen())); self.response_status .append_value(status[rng.gen_range(0..status.len())]); + self.prices_status.append_value(self.row_count as i128); } fn finish(mut self, schema: SchemaRef) -> RecordBatch { @@ -166,6 +169,12 @@ impl BatchBuilder { Arc::new(self.request_bytes.finish()), Arc::new(self.response_bytes.finish()), Arc::new(self.response_status.finish()), + Arc::new( + self.prices_status + .finish() + .with_precision_and_scale(38, 0) + .unwrap(), + ), ], ) .unwrap()