Skip to content

Commit

Permalink
Merge pull request #5 from Ted-Jiang/add_decimal
Browse files Browse the repository at this point in the history
Support parquet page filtering for decimal128 columns
  • Loading branch information
alamb authored Nov 16, 2022
2 parents cbc9885 + 154ee27 commit 48ec0d7
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 14 deletions.
6 changes: 4 additions & 2 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ struct RowGroupPruningStatistics<'a> {
// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
// Copy from the arrow-rs
fn from_bytes_to_i128(b: &[u8]) -> i128 {
pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
let first_bit = b[0] & 128u8 == 128u8;
let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
Expand Down Expand Up @@ -773,7 +773,9 @@ macro_rules! get_null_count_values {

// Convert parquet column schema to arrow data type, and just consider the
// decimal data type.
fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option<DataType> {
pub(crate) fn parquet_to_arrow_decimal_type(
parquet_column: &ColumnDescriptor,
) -> Option<DataType> {
let type_ptr = parquet_column.self_type_ptr();
match type_ptr.get_basic_info().logical_type() {
Some(LogicalType::Decimal { scale, precision }) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
//! Contains code to filter entire pages
use arrow::array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, StringArray,
BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array, Int64Array,
StringArray,
};
use arrow::datatypes::DataType;
use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
use datafusion_common::{Column, DataFusionError, Result};
use datafusion_expr::utils::expr_to_columns;
use datafusion_optimizer::utils::split_conjunction;
use log::{debug, error, trace};
use parquet::schema::types::ColumnDescriptor;
use parquet::{
arrow::arrow_reader::{RowSelection, RowSelector},
errors::ParquetError,
Expand All @@ -38,6 +41,9 @@ use std::collections::{HashSet, VecDeque};
use std::sync::Arc;

use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use crate::physical_plan::file_format::parquet::{
from_bytes_to_i128, parquet_to_arrow_decimal_type,
};

use super::metrics::ParquetFileMetrics;

Expand Down Expand Up @@ -135,6 +141,7 @@ pub(crate) fn build_page_filter(
&predicate,
rg_offset_indexes.get(col_id),
rg_page_indexes.get(col_id),
groups[*r].column(col_id).column_descr(),
file_metrics,
)
.map_err(|e| {
Expand Down Expand Up @@ -309,15 +316,18 @@ fn prune_pages_in_one_row_group(
predicate: &PruningPredicate,
col_offset_indexes: Option<&Vec<PageLocation>>,
col_page_indexes: Option<&Index>,
col_desc: &ColumnDescriptor,
metrics: &ParquetFileMetrics,
) -> Result<Vec<RowSelector>> {
let num_rows = group.num_rows() as usize;
if let (Some(col_offset_indexes), Some(col_page_indexes)) =
(col_offset_indexes, col_page_indexes)
{
let target_type = parquet_to_arrow_decimal_type(col_desc);
let pruning_stats = PagesPruningStatistics {
col_page_indexes,
col_offset_indexes,
target_type: &target_type,
};

match predicate.prune(&pruning_stats) {
Expand Down Expand Up @@ -386,6 +396,9 @@ fn create_row_count_in_each_page(
struct PagesPruningStatistics<'a> {
col_page_indexes: &'a Index,
col_offset_indexes: &'a Vec<PageLocation>,
// target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the
// real physical type in parquet file may be `INT32, INT64, FIXED_LEN_BYTE_ARRAY`
target_type: &'a Option<DataType>,
}

// Extract the min or max value calling `func` from page idex
Expand All @@ -394,16 +407,50 @@ macro_rules! get_min_max_values_for_page_index {
match $self.col_page_indexes {
Index::NONE => None,
Index::INT32(index) => {
let vec = &index.indexes;
Some(Arc::new(Int32Array::from_iter(
vec.iter().map(|x| x.$func().cloned()),
)))
match $self.target_type {
// int32 to decimal with the precision and scale
Some(DataType::Decimal128(precision, scale)) => {
let vec = &index.indexes;
if let Ok(arr) = Decimal128Array::from_iter_values(
vec.iter().map(|x| *x.$func().unwrap() as i128),
)
.with_precision_and_scale(*precision, *scale)
{
return Some(Arc::new(arr));
} else {
return None;
}
}
_ => {
let vec = &index.indexes;
Some(Arc::new(Int32Array::from_iter(
vec.iter().map(|x| x.$func().cloned()),
)))
}
}
}
Index::INT64(index) => {
let vec = &index.indexes;
Some(Arc::new(Int64Array::from_iter(
vec.iter().map(|x| x.$func().cloned()),
)))
match $self.target_type {
// int64 to decimal with the precision and scale
Some(DataType::Decimal128(precision, scale)) => {
let vec = &index.indexes;
if let Ok(arr) = Decimal128Array::from_iter_values(
vec.iter().map(|x| *x.$func().unwrap() as i128),
)
.with_precision_and_scale(*precision, *scale)
{
return Some(Arc::new(arr));
} else {
return None;
}
}
_ => {
let vec = &index.indexes;
Some(Arc::new(Int64Array::from_iter(
vec.iter().map(|x| x.$func().cloned()),
)))
}
}
}
Index::FLOAT(index) => {
let vec = &index.indexes;
Expand Down Expand Up @@ -432,10 +479,28 @@ macro_rules! get_min_max_values_for_page_index {
.collect();
Some(Arc::new(array))
}
Index::INT96(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => {
Index::INT96(_) => {
//Todo support these type
None
}
Index::FIXED_LEN_BYTE_ARRAY(index) => {
match $self.target_type {
// int32 to decimal with the precision and scale
Some(DataType::Decimal128(precision, scale)) => {
let vec = &index.indexes;
if let Ok(array) = Decimal128Array::from_iter_values(
vec.iter().map(|x| from_bytes_to_i128(x.$func().unwrap())),
)
.with_precision_and_scale(*precision, *scale)
{
return Some(Arc::new(array));
} else {
return None;
}
}
_ => None,
}
}
}
}};
}
Expand Down
28 changes: 28 additions & 0 deletions datafusion/core/tests/parquet_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,34 @@ async fn single_file_small_data_pages() {
.with_expected_rows(9745)
.run()
.await;

// decimal_price TV=53819 RL=0 DL=0
// ----------------------------------------------------------------------------
// row group 0:
// column index for column decimal_price:
// Boudary order: UNORDERED
// null count min max
// page-0 0 1 9216
// page-1 0 9217 18432
// page-2 0 18433 27648
// page-3 0 27649 36864
// page-4 0 36865 46080
// page-5 0 46081 53819
//
// offset index for column decimal_price:
// offset compressed size first row index
// page-0 5581636 147517 0
// page-1 5729153 147517 9216
TestCase::new(&test_parquet_file)
.with_name("selective_on_decimal")
// predicate is chosen carefully to prune pages 1, 2, 3, 4, and 5
// decimal_price < 9200
.with_filter(col("decimal_price").lt_eq(lit(9200)))
.with_pushdown_expected(PushdownExpected::Some)
.with_page_index_filtering_expected(PageIndexFilteringExpected::Some)
.with_expected_rows(9200)
.run()
.await;
}

/// Expected pushdown behavior
Expand Down
13 changes: 11 additions & 2 deletions test-utils/src/data_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use std::ops::Range;
use std::sync::Arc;

use arrow::array::{
Int32Builder, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder,
UInt16Builder,
Decimal128Builder, Int32Builder, StringBuilder, StringDictionaryBuilder,
TimestampNanosecondBuilder, UInt16Builder,
};
use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
Expand All @@ -43,6 +43,7 @@ struct BatchBuilder {
request_bytes: Int32Builder,
response_bytes: Int32Builder,
response_status: UInt16Builder,
prices_status: Decimal128Builder,

/// optional number of rows produced
row_limit: Option<usize>,
Expand Down Expand Up @@ -73,6 +74,7 @@ impl BatchBuilder {
Field::new("request_bytes", DataType::Int32, true),
Field::new("response_bytes", DataType::Int32, true),
Field::new("response_status", DataType::UInt16, false),
Field::new("decimal_price", DataType::Decimal128(38, 0), false),
]))
}

Expand Down Expand Up @@ -146,6 +148,7 @@ impl BatchBuilder {
.append_option(rng.gen_bool(0.9).then(|| rng.gen()));
self.response_status
.append_value(status[rng.gen_range(0..status.len())]);
self.prices_status.append_value(self.row_count as i128);
}

fn finish(mut self, schema: SchemaRef) -> RecordBatch {
Expand All @@ -166,6 +169,12 @@ impl BatchBuilder {
Arc::new(self.request_bytes.finish()),
Arc::new(self.response_bytes.finish()),
Arc::new(self.response_status.finish()),
Arc::new(
self.prices_status
.finish()
.with_precision_and_scale(38, 0)
.unwrap(),
),
],
)
.unwrap()
Expand Down

0 comments on commit 48ec0d7

Please sign in to comment.