Skip to content

Commit

Permalink
Reorganize the physical plan code in proto
Browse files Browse the repository at this point in the history
  • Loading branch information
kyotoYaho committed Dec 16, 2022
1 parent a1a6368 commit 2d8bc8f
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 64 deletions.
39 changes: 17 additions & 22 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::common::proto_error;
use crate::convert_required;
use crate::logical_plan;
use crate::protobuf::physical_expr_node::ExprType;
use crate::protobuf::JoinSide;
use datafusion::physical_plan::joins::utils::JoinSide;
use datafusion::physical_plan::sorts::sort::SortOptions;
use parking_lot::RwLock;

Expand Down Expand Up @@ -450,36 +450,31 @@ impl From<&protobuf::ColumnStats> for ColumnStatistics {
}
}

impl TryInto<Statistics> for &protobuf::Statistics {
impl From<protobuf::JoinSide> for JoinSide {
fn from(t: protobuf::JoinSide) -> Self {
match t {
protobuf::JoinSide::LeftSide => JoinSide::Left,
protobuf::JoinSide::RightSide => JoinSide::Right,
}
}
}

impl TryFrom<&protobuf::Statistics> for Statistics {
type Error = DataFusionError;

fn try_into(self) -> Result<Statistics, Self::Error> {
let column_statistics = self
.column_stats
.iter()
.map(|s| s.into())
.collect::<Vec<_>>();
fn try_from(s: &protobuf::Statistics) -> Result<Self, Self::Error> {
let column_statistics =
s.column_stats.iter().map(|s| s.into()).collect::<Vec<_>>();
Ok(Statistics {
num_rows: Some(self.num_rows as usize),
total_byte_size: Some(self.total_byte_size as usize),
num_rows: Some(s.num_rows as usize),
total_byte_size: Some(s.total_byte_size as usize),
// No column statistic (None) is encoded with empty array
column_statistics: if column_statistics.is_empty() {
None
} else {
Some(column_statistics)
},
is_exact: self.is_exact,
is_exact: s.is_exact,
})
}
}

impl From<JoinSide> for datafusion::physical_plan::joins::utils::JoinSide {
fn from(t: JoinSide) -> Self {
match t {
JoinSide::LeftSide => datafusion::physical_plan::joins::utils::JoinSide::Left,
JoinSide::RightSide => {
datafusion::physical_plan::joins::utils::JoinSide::Right
}
}
}
}
69 changes: 27 additions & 42 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,96 +44,85 @@ use crate::protobuf::{ConfigOption, PhysicalSortExprNode};
use datafusion::logical_expr::BuiltinScalarFunction;
use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
use datafusion::physical_expr::ScalarFunctionExpr;
use datafusion::physical_plan::joins::utils::JoinSide;
use datafusion_common::DataFusionError;

impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn AggregateExpr> {
impl TryFrom<Arc<dyn AggregateExpr>> for protobuf::PhysicalExprNode {
type Error = DataFusionError;

fn try_into(self) -> Result<protobuf::PhysicalExprNode, Self::Error> {
fn try_from(a: Arc<dyn AggregateExpr>) -> Result<Self, Self::Error> {
use datafusion::physical_plan::expressions;
use protobuf::AggregateFunction;

let mut distinct = false;
let aggr_function = if self.as_any().downcast_ref::<Avg>().is_some() {
let aggr_function = if a.as_any().downcast_ref::<Avg>().is_some() {
Ok(AggregateFunction::Avg.into())
} else if self.as_any().downcast_ref::<Sum>().is_some() {
} else if a.as_any().downcast_ref::<Sum>().is_some() {
Ok(AggregateFunction::Sum.into())
} else if self.as_any().downcast_ref::<Count>().is_some() {
} else if a.as_any().downcast_ref::<Count>().is_some() {
Ok(AggregateFunction::Count.into())
} else if self.as_any().downcast_ref::<DistinctCount>().is_some() {
} else if a.as_any().downcast_ref::<DistinctCount>().is_some() {
distinct = true;
Ok(AggregateFunction::Count.into())
} else if self.as_any().downcast_ref::<Min>().is_some() {
} else if a.as_any().downcast_ref::<Min>().is_some() {
Ok(AggregateFunction::Min.into())
} else if self.as_any().downcast_ref::<Max>().is_some() {
} else if a.as_any().downcast_ref::<Max>().is_some() {
Ok(AggregateFunction::Max.into())
} else if self
} else if a
.as_any()
.downcast_ref::<expressions::ApproxDistinct>()
.is_some()
{
Ok(AggregateFunction::ApproxDistinct.into())
} else if self
.as_any()
.downcast_ref::<expressions::ArrayAgg>()
.is_some()
{
} else if a.as_any().downcast_ref::<expressions::ArrayAgg>().is_some() {
Ok(AggregateFunction::ArrayAgg.into())
} else if self
.as_any()
.downcast_ref::<expressions::Variance>()
.is_some()
{
} else if a.as_any().downcast_ref::<expressions::Variance>().is_some() {
Ok(AggregateFunction::Variance.into())
} else if self
} else if a
.as_any()
.downcast_ref::<expressions::VariancePop>()
.is_some()
{
Ok(AggregateFunction::VariancePop.into())
} else if self
} else if a
.as_any()
.downcast_ref::<expressions::Covariance>()
.is_some()
{
Ok(AggregateFunction::Covariance.into())
} else if self
} else if a
.as_any()
.downcast_ref::<expressions::CovariancePop>()
.is_some()
{
Ok(AggregateFunction::CovariancePop.into())
} else if self
.as_any()
.downcast_ref::<expressions::Stddev>()
.is_some()
{
} else if a.as_any().downcast_ref::<expressions::Stddev>().is_some() {
Ok(AggregateFunction::Stddev.into())
} else if self
} else if a
.as_any()
.downcast_ref::<expressions::StddevPop>()
.is_some()
{
Ok(AggregateFunction::StddevPop.into())
} else if self
} else if a
.as_any()
.downcast_ref::<expressions::Correlation>()
.is_some()
{
Ok(AggregateFunction::Correlation.into())
} else if self
} else if a
.as_any()
.downcast_ref::<expressions::ApproxPercentileCont>()
.is_some()
{
Ok(AggregateFunction::ApproxPercentileCont.into())
} else if self
} else if a
.as_any()
.downcast_ref::<expressions::ApproxPercentileContWithWeight>()
.is_some()
{
Ok(AggregateFunction::ApproxPercentileContWithWeight.into())
} else if self
} else if a
.as_any()
.downcast_ref::<expressions::ApproxMedian>()
.is_some()
Expand All @@ -142,10 +131,10 @@ impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn AggregateExpr> {
} else {
Err(DataFusionError::NotImplemented(format!(
"Aggregate function not supported: {:?}",
self
a
)))
}?;
let expressions: Vec<protobuf::PhysicalExprNode> = self
let expressions: Vec<protobuf::PhysicalExprNode> = a
.expressions()
.iter()
.map(|e| e.clone().try_into())
Expand Down Expand Up @@ -494,15 +483,11 @@ impl TryFrom<&FileScanConfig> for protobuf::FileScanExecConf {
}
}

impl From<datafusion::physical_plan::joins::utils::JoinSide> for protobuf::JoinSide {
fn from(t: datafusion::physical_plan::joins::utils::JoinSide) -> Self {
impl From<JoinSide> for protobuf::JoinSide {
fn from(t: JoinSide) -> Self {
match t {
datafusion::physical_plan::joins::utils::JoinSide::Left => {
protobuf::JoinSide::LeftSide
}
datafusion::physical_plan::joins::utils::JoinSide::Right => {
protobuf::JoinSide::RightSide
}
JoinSide::Left => protobuf::JoinSide::LeftSide,
JoinSide::Right => protobuf::JoinSide::RightSide,
}
}
}

0 comments on commit 2d8bc8f

Please sign in to comment.