Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support parquet page filtering for decimal128 columns #5

Merged
merged 1 commit into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ struct RowGroupPruningStatistics<'a> {
// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
// Copy from the arrow-rs
fn from_bytes_to_i128(b: &[u8]) -> i128 {
pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
let first_bit = b[0] & 128u8 == 128u8;
let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
Expand Down Expand Up @@ -773,7 +773,9 @@ macro_rules! get_null_count_values {

// Convert parquet column schema to arrow data type, and just consider the
// decimal data type.
fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option<DataType> {
pub(crate) fn parquet_to_arrow_decimal_type(
parquet_column: &ColumnDescriptor,
) -> Option<DataType> {
let type_ptr = parquet_column.self_type_ptr();
match type_ptr.get_basic_info().logical_type() {
Some(LogicalType::Decimal { scale, precision }) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
//! Contains code to filter entire pages

use arrow::array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, StringArray,
BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array, Int64Array,
StringArray,
};
use arrow::datatypes::DataType;
use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
use datafusion_common::{Column, DataFusionError, Result};
use datafusion_expr::utils::expr_to_columns;
use datafusion_optimizer::utils::split_conjunction;
use log::{debug, error, trace};
use parquet::schema::types::ColumnDescriptor;
use parquet::{
arrow::arrow_reader::{RowSelection, RowSelector},
errors::ParquetError,
Expand All @@ -38,6 +41,9 @@ use std::collections::{HashSet, VecDeque};
use std::sync::Arc;

use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use crate::physical_plan::file_format::parquet::{
from_bytes_to_i128, parquet_to_arrow_decimal_type,
};

use super::metrics::ParquetFileMetrics;

Expand Down Expand Up @@ -135,6 +141,7 @@ pub(crate) fn build_page_filter(
&predicate,
rg_offset_indexes.get(col_id),
rg_page_indexes.get(col_id),
groups[*r].column(col_id).column_descr(),
file_metrics,
)
.map_err(|e| {
Expand Down Expand Up @@ -309,15 +316,18 @@ fn prune_pages_in_one_row_group(
predicate: &PruningPredicate,
col_offset_indexes: Option<&Vec<PageLocation>>,
col_page_indexes: Option<&Index>,
col_desc: &ColumnDescriptor,
metrics: &ParquetFileMetrics,
) -> Result<Vec<RowSelector>> {
let num_rows = group.num_rows() as usize;
if let (Some(col_offset_indexes), Some(col_page_indexes)) =
(col_offset_indexes, col_page_indexes)
{
let target_type = parquet_to_arrow_decimal_type(col_desc);
let pruning_stats = PagesPruningStatistics {
col_page_indexes,
col_offset_indexes,
target_type: &target_type,
};

match predicate.prune(&pruning_stats) {
Expand Down Expand Up @@ -386,6 +396,9 @@ fn create_row_count_in_each_page(
struct PagesPruningStatistics<'a> {
col_page_indexes: &'a Index,
col_offset_indexes: &'a Vec<PageLocation>,
// target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the
// real physical type in parquet file may be `INT32, INT64, FIXED_LEN_BYTE_ARRAY`
target_type: &'a Option<DataType>,
}

// Extract the min or max value calling `func` from page idex
Expand All @@ -394,16 +407,50 @@ macro_rules! get_min_max_values_for_page_index {
match $self.col_page_indexes {
Index::NONE => None,
Index::INT32(index) => {
let vec = &index.indexes;
Some(Arc::new(Int32Array::from_iter(
vec.iter().map(|x| x.$func().cloned()),
)))
match $self.target_type {
// int32 to decimal with the precision and scale
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here only support decimal128 align with row group pruning.

Some(DataType::Decimal128(precision, scale)) => {
let vec = &index.indexes;
if let Ok(arr) = Decimal128Array::from_iter_values(
vec.iter().map(|x| *x.$func().unwrap() as i128),
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)
.with_precision_and_scale(*precision, *scale)
{
return Some(Arc::new(arr));
} else {
return None;
}
}
_ => {
let vec = &index.indexes;
Some(Arc::new(Int32Array::from_iter(
vec.iter().map(|x| x.$func().cloned()),
)))
}
}
}
Index::INT64(index) => {
let vec = &index.indexes;
Some(Arc::new(Int64Array::from_iter(
vec.iter().map(|x| x.$func().cloned()),
)))
match $self.target_type {
// int64 to decimal with the precision and scale
Some(DataType::Decimal128(precision, scale)) => {
let vec = &index.indexes;
if let Ok(arr) = Decimal128Array::from_iter_values(
vec.iter().map(|x| *x.$func().unwrap() as i128),
)
.with_precision_and_scale(*precision, *scale)
{
return Some(Arc::new(arr));
} else {
return None;
}
}
_ => {
let vec = &index.indexes;
Some(Arc::new(Int64Array::from_iter(
vec.iter().map(|x| x.$func().cloned()),
)))
}
}
}
Index::FLOAT(index) => {
let vec = &index.indexes;
Expand Down Expand Up @@ -432,10 +479,28 @@ macro_rules! get_min_max_values_for_page_index {
.collect();
Some(Arc::new(array))
}
Index::INT96(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => {
Index::INT96(_) => {
//Todo support these type
None
}
Index::FIXED_LEN_BYTE_ARRAY(index) => {
match $self.target_type {
// int32 to decimal with the precision and scale
Some(DataType::Decimal128(precision, scale)) => {
let vec = &index.indexes;
if let Ok(array) = Decimal128Array::from_iter_values(
vec.iter().map(|x| from_bytes_to_i128(x.$func().unwrap())),
)
.with_precision_and_scale(*precision, *scale)
{
return Some(Arc::new(array));
} else {
return None;
}
}
_ => None,
}
}
}
}};
}
Expand Down
28 changes: 28 additions & 0 deletions datafusion/core/tests/parquet_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,34 @@ async fn single_file_small_data_pages() {
.with_expected_rows(9745)
.run()
.await;

// decimal_price TV=53819 RL=0 DL=0
// ----------------------------------------------------------------------------
// row group 0:
// column index for column decimal_price:
// Boudary order: UNORDERED
// null count min max
// page-0 0 1 9216
// page-1 0 9217 18432
// page-2 0 18433 27648
// page-3 0 27649 36864
// page-4 0 36865 46080
// page-5 0 46081 53819
//
// offset index for column decimal_price:
// offset compressed size first row index
// page-0 5581636 147517 0
// page-1 5729153 147517 9216
TestCase::new(&test_parquet_file)
.with_name("selective_on_decimal")
// predicate is chosen carefully to prune pages 1, 2, 3, 4, and 5
// decimal_price < 9200
.with_filter(col("decimal_price").lt_eq(lit(9200)))
.with_pushdown_expected(PushdownExpected::Some)
.with_page_index_filtering_expected(PageIndexFilteringExpected::Some)
.with_expected_rows(9200)
.run()
.await;
}

/// Expected pushdown behavior
Expand Down
13 changes: 11 additions & 2 deletions test-utils/src/data_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use std::ops::Range;
use std::sync::Arc;

use arrow::array::{
Int32Builder, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder,
UInt16Builder,
Decimal128Builder, Int32Builder, StringBuilder, StringDictionaryBuilder,
TimestampNanosecondBuilder, UInt16Builder,
};
use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
Expand All @@ -43,6 +43,7 @@ struct BatchBuilder {
request_bytes: Int32Builder,
response_bytes: Int32Builder,
response_status: UInt16Builder,
prices_status: Decimal128Builder,

/// optional number of rows produced
row_limit: Option<usize>,
Expand Down Expand Up @@ -73,6 +74,7 @@ impl BatchBuilder {
Field::new("request_bytes", DataType::Int32, true),
Field::new("response_bytes", DataType::Int32, true),
Field::new("response_status", DataType::UInt16, false),
Field::new("decimal_price", DataType::Decimal128(38, 0), false),
]))
}

Expand Down Expand Up @@ -146,6 +148,7 @@ impl BatchBuilder {
.append_option(rng.gen_bool(0.9).then(|| rng.gen()));
self.response_status
.append_value(status[rng.gen_range(0..status.len())]);
self.prices_status.append_value(self.row_count as i128);
}

fn finish(mut self, schema: SchemaRef) -> RecordBatch {
Expand All @@ -166,6 +169,12 @@ impl BatchBuilder {
Arc::new(self.request_bytes.finish()),
Arc::new(self.response_bytes.finish()),
Arc::new(self.response_status.finish()),
Arc::new(
self.prices_status
.finish()
.with_precision_and_scale(38, 0)
.unwrap(),
),
],
)
.unwrap()
Expand Down