Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: temporary branch for IOx update (11-30-2023 to 12-07-2023) #5

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,10 @@ impl FileOpener for ParquetOpener {
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
.await?;

let file_schema = builder.schema().clone();

let (schema_mapping, adapted_projections) =
schema_adapter.map_schema(builder.schema())?;
schema_adapter.map_schema(&file_schema)?;
// let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?;

let mask = ProjectionMask::roots(
Expand All @@ -481,8 +483,8 @@ impl FileOpener for ParquetOpener {
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
let row_filter = row_filter::build_row_filter(
&predicate,
builder.schema().as_ref(),
table_schema.as_ref(),
&file_schema,
&table_schema,
builder.metadata(),
reorder_predicates,
&file_metrics,
Expand All @@ -507,6 +509,7 @@ impl FileOpener for ParquetOpener {
let file_metadata = builder.metadata().clone();
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let mut row_groups = row_groups::prune_row_groups_by_statistics(
&file_schema,
builder.parquet_schema(),
file_metadata.row_groups(),
file_range,
Expand Down
140 changes: 110 additions & 30 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use super::ParquetFileMetrics;
/// Note: This method currently ignores ColumnOrder
/// <https://github.com/apache/arrow-datafusion/issues/8335>
pub(crate) fn prune_row_groups_by_statistics(
arrow_schema: &Schema,
parquet_schema: &SchemaDescriptor,
groups: &[RowGroupMetaData],
range: Option<FileRange>,
Expand All @@ -80,7 +81,7 @@ pub(crate) fn prune_row_groups_by_statistics(
let pruning_stats = RowGroupPruningStatistics {
parquet_schema,
row_group_metadata: metadata,
arrow_schema: predicate.schema().as_ref(),
arrow_schema,
};
match predicate.prune(&pruning_stats) {
Ok(values) => {
Expand Down Expand Up @@ -415,11 +416,11 @@ mod tests {
fn row_group_pruning_predicate_simple_expr() {
use datafusion_expr::{col, lit};
// int > 1 => c1_max > 1
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
let expr = col("c1").gt(lit(15));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();

let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
let schema_descr = get_test_schema_descr(vec![field]);
Expand All @@ -435,6 +436,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2],
None,
Expand All @@ -449,11 +451,11 @@ mod tests {
fn row_group_pruning_predicate_missing_stats() {
use datafusion_expr::{col, lit};
// int > 1 => c1_max > 1
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
let expr = col("c1").gt(lit(15));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();

let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
let schema_descr = get_test_schema_descr(vec![field]);
Expand All @@ -470,6 +472,7 @@ mod tests {
// is null / undefined so the first row group can't be filtered out
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2],
None,
Expand Down Expand Up @@ -518,6 +521,7 @@ mod tests {
// when conditions are joined using AND
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
groups,
None,
Expand All @@ -531,12 +535,13 @@ mod tests {
// this bypasses the entire predicate expression and no row groups are filtered out
let expr = col("c1").gt(lit(15)).or(col("c2").rem(lit(2)).eq(lit(0)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();

// if conditions in predicate are joined with OR and an unsupported expression is used
// this bypasses the entire predicate expression and no row groups are filtered out
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
groups,
None,
Expand All @@ -547,6 +552,64 @@ mod tests {
);
}

#[test]
fn row_group_pruning_predicate_file_schema() {
use datafusion_expr::{col, lit};
// test row group predicate when file schema is different than table schema
// c1 > 0
let table_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]));
let expr = col("c1").gt(lit(0));
let expr = logical2physical(&expr, &table_schema);
let pruning_predicate =
PruningPredicate::try_new(expr, table_schema.clone()).unwrap();

// Model a file schema's column order c2 then c1, which is the opposite
// of the table schema
let file_schema = Arc::new(Schema::new(vec![
Field::new("c2", DataType::Int32, false),
Field::new("c1", DataType::Int32, false),
]));
let schema_descr = get_test_schema_descr(vec![
PrimitiveTypeField::new("c2", PhysicalType::INT32),
PrimitiveTypeField::new("c1", PhysicalType::INT32),
]);
// rg1 has c2 less than zero, c1 greater than zero
let rgm1 = get_row_group_meta_data(
&schema_descr,
vec![
ParquetStatistics::int32(Some(-10), Some(-1), None, 0, false), // c2
ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
],
);
// rg1 has c2 greater than zero, c1 less than zero
let rgm2 = get_row_group_meta_data(
&schema_descr,
vec![
ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
ParquetStatistics::int32(Some(-10), Some(-1), None, 0, false),
],
);

let metrics = parquet_file_metrics();
let groups = &[rgm1, rgm2];
// the first row group should be left because c1 is greater than zero
// the second should be filtered out because c1 is less than zero
assert_eq!(
prune_row_groups_by_statistics(
&file_schema, // NB must be file schema, not table_schema
&schema_descr,
groups,
None,
Some(&pruning_predicate),
&metrics
),
vec![0]
);
}

fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
let schema_descr = get_test_schema_descr(vec![
PrimitiveTypeField::new("c1", PhysicalType::INT32),
Expand Down Expand Up @@ -580,13 +643,14 @@ mod tests {
let schema_descr = arrow_to_parquet_schema(&schema).unwrap();
let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();

let metrics = parquet_file_metrics();
// First row group was filtered out because it contains no null value on "c2".
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&groups,
None,
Expand All @@ -612,14 +676,15 @@ mod tests {
.gt(lit(15))
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();

let metrics = parquet_file_metrics();
// bool = NULL always evaluates to NULL (and thus will not
// pass predicates. Ideally these should both be false
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&groups,
None,
Expand All @@ -638,8 +703,11 @@ mod tests {

// INT32: c1 > 5, the c1 is decimal(9,2)
// The type of scalar value if decimal(9,2), don't need to do cast
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(9, 2),
false,
)]));
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
.with_logical_type(LogicalType::Decimal {
scale: 2,
Expand All @@ -650,8 +718,7 @@ mod tests {
let schema_descr = get_test_schema_descr(vec![field]);
let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [1.00, 6.00]
Expand Down Expand Up @@ -679,6 +746,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
Expand All @@ -692,8 +760,11 @@ mod tests {
// The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2).
// We should convert all type to the coercion type, which is decimal(11,2)
// The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(9, 0),
false,
)]));

let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
.with_logical_type(LogicalType::Decimal {
Expand All @@ -708,8 +779,7 @@ mod tests {
Decimal128(11, 2),
));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [100, 600]
Expand Down Expand Up @@ -743,6 +813,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3, rgm4],
None,
Expand All @@ -753,8 +824,11 @@ mod tests {
);

// INT64: c1 < 5, the c1 is decimal(18,2)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(18, 2),
false,
)]));
let field = PrimitiveTypeField::new("c1", PhysicalType::INT64)
.with_logical_type(LogicalType::Decimal {
scale: 2,
Expand All @@ -765,8 +839,7 @@ mod tests {
let schema_descr = get_test_schema_descr(vec![field]);
let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [6.00, 8.00]
Expand All @@ -791,6 +864,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
Expand All @@ -802,8 +876,11 @@ mod tests {

// FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
// the type of parquet is decimal(18,2)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(18, 2),
false,
)]));
let field = PrimitiveTypeField::new("c1", PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_logical_type(LogicalType::Decimal {
scale: 2,
Expand All @@ -817,8 +894,7 @@ mod tests {
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
// we must use the big-endian when encode the i128 to bytes or vec[u8].
let rgm1 = get_row_group_meta_data(
&schema_descr,
Expand Down Expand Up @@ -862,6 +938,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
Expand All @@ -873,8 +950,11 @@ mod tests {

// BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
// the type of parquet is decimal(18,2)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(18, 2),
false,
)]));
let field = PrimitiveTypeField::new("c1", PhysicalType::BYTE_ARRAY)
.with_logical_type(LogicalType::Decimal {
scale: 2,
Expand All @@ -888,8 +968,7 @@ mod tests {
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
// we must use the big-endian when encode the i128 to bytes or vec[u8].
let rgm1 = get_row_group_meta_data(
&schema_descr,
Expand Down Expand Up @@ -922,6 +1001,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
Expand Down
Loading