Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pruning serialization #1941

Merged
merged 3 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@ message FileScanExecConf {

message ParquetScanExecNode {
FileScanExecConf base_conf = 1;
LogicalExprNode pruning_predicate = 2;
}

message CsvScanExecNode {
Expand Down
47 changes: 44 additions & 3 deletions ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,14 @@ impl AsExecutionPlan for PhysicalPlanNode {
str_to_byte(&scan.delimiter)?,
))),
PhysicalPlanType::ParquetScan(scan) => {
let predicate = scan
.pruning_predicate
.as_ref()
.map(|expr| expr.try_into())
.transpose()?;
Ok(Arc::new(ParquetExec::new(
decode_scan_config(scan.base_conf.as_ref().unwrap(), ctx)?,
// TODO predicate should be de-serialized
None,
predicate,
)))
}
PhysicalPlanType::AvroScan(scan) => Ok(Arc::new(AvroExec::new(
Expand Down Expand Up @@ -701,11 +705,15 @@ impl AsExecutionPlan for PhysicalPlanNode {
)),
})
} else if let Some(exec) = plan.downcast_ref::<ParquetExec>() {
let pruning_expr = exec
.pruning_predicate()
.map(|pred| pred.logical_expr().try_into())
.transpose()?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
protobuf::ParquetScanExecNode {
base_conf: Some(exec.base_config().try_into()?),
// TODO serialize predicates
pruning_predicate: pruning_expr,
},
)),
})
Expand Down Expand Up @@ -929,6 +937,8 @@ mod roundtrip_tests {
use std::sync::Arc;

use crate::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::datasource::object_store::local::LocalFileSystem;
use datafusion::datasource::PartitionedFile;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::ExecutionContext;
use datafusion::{
Expand All @@ -950,6 +960,9 @@ mod roundtrip_tests {
scalar::ScalarValue,
};

use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
use datafusion::physical_plan::Statistics;

use super::super::super::error::Result;
use super::super::protobuf;
use crate::execution_plans::ShuffleWriterExec;
Expand Down Expand Up @@ -1119,4 +1132,32 @@ mod roundtrip_tests {
Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 4)),
)?))
}

#[test]
fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
let scan_config = FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema: Arc::new(Schema::new(vec![Field::new(
"col",
DataType::Utf8,
false,
)])),
file_groups: vec![vec![PartitionedFile::new(
"/path/to/file.parquet".to_string(),
1024,
)]],
statistics: Statistics {
num_rows: Some(100),
total_byte_size: Some(1024),
column_statistics: None,
is_exact: false,
},
projection: None,
limit: None,
table_partition_cols: vec![],
};

let predicate = datafusion::prelude::col("col").eq(datafusion::prelude::lit("1"));
roundtrip_test(Arc::new(ParquetExec::new(scan_config, Some(predicate))))
}
}
8 changes: 8 additions & 0 deletions datafusion/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ pub struct PruningPredicate {
predicate_expr: Arc<dyn PhysicalExpr>,
/// The statistics required to evaluate this predicate
required_columns: RequiredStatColumns,
/// Logical predicate from which this predicate expr is derived (required for serialization)
logical_expr: Expr,
}

impl PruningPredicate {
Expand Down Expand Up @@ -143,6 +145,7 @@ impl PruningPredicate {
schema,
predicate_expr,
required_columns,
logical_expr: expr.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth considering changing the signature of this function from `

pub fn try_new(expr: &Expr, schema: SchemaRef) -> Result<Self> {

to take the Expr directly rather than clone within

pub fn try_new(expr: Expr, schema: SchemaRef) -> Result<Self> {

The rationale would be if the caller already has a Expr they could pass it in rather than forcing a copy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could always do that as a follow on PR as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I should have some time tomorrow to see if I can do this relatively easily. If it turns out to be more involved then we can create a follow up task for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @thinkharderdev -- to keep the code flowing I am going to merge this PR as is and we can do the cleanup in a follow on PR. Let me know if you don't get a chance and I can file a follow on ticket to do so

})
}

Expand Down Expand Up @@ -202,6 +205,11 @@ impl PruningPredicate {
pub fn schema(&self) -> &SchemaRef {
&self.schema
}

/// Returns a reference to the logical expr used to construct this pruning predicate
pub fn logical_expr(&self) -> &Expr {
&self.logical_expr
}
}

/// Handles creating references to the min/max statistics
Expand Down
5 changes: 5 additions & 0 deletions datafusion/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ impl ParquetExec {
pub fn base_config(&self) -> &FileScanConfig {
&self.base_config
}

/// Optional reference to this parquet scan's pruning predicate
pub fn pruning_predicate(&self) -> Option<&PruningPredicate> {
self.pruning_predicate.as_ref()
}
}

impl ParquetFileMetrics {
Expand Down