Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add test for decimal and pruning for decimal column #2960

Merged
merged 2 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,8 @@ mod tests {
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{
ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
StringArray, TimestampNanosecondArray,
Array, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array,
Int32Array, StringArray, TimestampNanosecondArray,
};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
Expand Down Expand Up @@ -1023,6 +1023,49 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn read_decimal_parquet() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();

// parquet use the int32 as the physical type to store decimal
let exec = get_exec("int32_decimal.parquet", None, None).await?;
let batches = collect(exec, task_ctx.clone()).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
let column = batches[0].column(0);
assert_eq!(&DataType::Decimal(4, 2), column.data_type());

// parquet use the int64 as the physical type to store decimal
let exec = get_exec("int64_decimal.parquet", None, None).await?;
let batches = collect(exec, task_ctx.clone()).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
let column = batches[0].column(0);
assert_eq!(&DataType::Decimal(10, 2), column.data_type());

// parquet use the fixed length binary as the physical type to store decimal
let exec = get_exec("fixed_length_decimal.parquet", None, None).await?;
let batches = collect(exec, task_ctx.clone()).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
let column = batches[0].column(0);
assert_eq!(&DataType::Decimal(25, 2), column.data_type());

let exec = get_exec("fixed_length_decimal_legacy.parquet", None, None).await?;
let batches = collect(exec, task_ctx.clone()).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
let column = batches[0].column(0);
assert_eq!(&DataType::Decimal(13, 2), column.data_type());

// parquet use the fixed length binary as the physical type to store decimal
// TODO: arrow-rs don't support convert the physical type of binary to decimal
// let exec = get_exec("byte_array_decimal.parquet", None, None).await?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arrow-rs doesn't support read decimal value from parquet with binary physical type.
why not support this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would you have a variable length decimal type? What would this mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold
In the parquet of Java version, we can write decimal with the binary or fixed-length binary, so we can get the decimal using the physical type of binary.

we can get the definition of decimal from the format spec

Decimal can be stored in the parquet as INT32, INT64, FIXED_LEN_BYTE_ARRAY or BYTE_ARRAY.

I can help to implement the BYTE_ARRAY in the arrow-rs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pr apache/arrow-rs#2160 for supporting read decimal from binary physical type in parquet.


Ok(())
}

fn assert_bytes_scanned(exec: Arc<dyn ExecutionPlan>, expected: usize) {
let actual = exec
.metrics()
Expand Down
102 changes: 102 additions & 0 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,10 +800,12 @@ mod tests {
use crate::from_slice::FromSlice;
use crate::logical_plan::{col, lit};
use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType};
use arrow::array::DecimalArray;
use arrow::{
array::{BinaryArray, Int32Array, Int64Array, StringArray},
datatypes::{DataType, TimeUnit},
};
use datafusion_common::ScalarValue;
use std::collections::HashMap;

#[derive(Debug)]
Expand All @@ -814,6 +816,38 @@ mod tests {
}

impl ContainerStats {
fn new_decimal128(
min: impl IntoIterator<Item = Option<i128>>,
max: impl IntoIterator<Item = Option<i128>>,
precision: usize,
scale: usize,
) -> Self {
Self {
min: Arc::new(
min.into_iter()
.collect::<DecimalArray>()
.with_precision_and_scale(precision, scale)
.unwrap(),
),
max: Arc::new(
max.into_iter()
.collect::<DecimalArray>()
.with_precision_and_scale(precision, scale)
.unwrap(),
),
}
}

fn new_i64(
min: impl IntoIterator<Item = Option<i64>>,
max: impl IntoIterator<Item = Option<i64>>,
) -> Self {
Self {
min: Arc::new(min.into_iter().collect::<Int64Array>()),
max: Arc::new(max.into_iter().collect::<Int64Array>()),
}
}

fn new_i32(
min: impl IntoIterator<Item = Option<i32>>,
max: impl IntoIterator<Item = Option<i32>>,
Expand Down Expand Up @@ -1418,6 +1452,74 @@ mod tests {
Ok(())
}

#[test]
fn prune_decimal_data() {
// decimal(9,2)
let schema = Arc::new(Schema::new(vec![Field::new(
"s1",
DataType::Decimal(9, 2),
true,
)]));
// s1 > 5
let expr = col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
// If the data is written by spark, the physical data type is INT32 in the parquet
// So we use the INT32 type of statistic.
let statistics = TestStatistics::new().with(
"s1",
ContainerStats::new_i32(
vec![Some(0), Some(4), None, Some(3)], // min
vec![Some(5), Some(6), Some(4), None], // max
),
);
let p = PruningPredicate::try_new(expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
let expected = vec![false, true, false, true];
assert_eq!(result, expected);

// decimal(18,2)
let schema = Arc::new(Schema::new(vec![Field::new(
"s1",
DataType::Decimal(18, 2),
true,
)]));
// s1 > 5
let expr = col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
// If the data is written by spark, the physical data type is INT64 in the parquet
// So we use the INT32 type of statistic.
let statistics = TestStatistics::new().with(
"s1",
ContainerStats::new_i64(
vec![Some(0), Some(4), None, Some(3)], // min
vec![Some(5), Some(6), Some(4), None], // max
),
);
let p = PruningPredicate::try_new(expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
let expected = vec![false, true, false, true];
assert_eq!(result, expected);

// decimal(23,2)
let schema = Arc::new(Schema::new(vec![Field::new(
"s1",
DataType::Decimal(23, 2),
true,
)]));
// s1 > 5
let expr = col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23, 2)));
let statistics = TestStatistics::new().with(
"s1",
ContainerStats::new_decimal128(
vec![Some(0), Some(400), None, Some(300)], // min
vec![Some(500), Some(600), Some(400), None], // max
23,
2,
),
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some large decimal numbers to test the decimal(23,2) cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case is just used to test the pruning logic.
I will file follow-up pull request to fix the #2962 with parquet rowgroup filter/prune.

let p = PruningPredicate::try_new(expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
let expected = vec![false, true, false, true];
assert_eq!(result, expected);
}
#[test]
fn prune_api() {
let schema = Arc::new(Schema::new(vec![
Expand Down