Skip to content

Commit

Permalink
Reorganize the logical plan related code in proto to be consistent wi…
Browse files Browse the repository at this point in the history
…th the physical plan code
  • Loading branch information
kyotoYaho committed Dec 16, 2022
1 parent b575f93 commit a1a6368
Show file tree
Hide file tree
Showing 8 changed files with 1,642 additions and 1,710 deletions.
87 changes: 9 additions & 78 deletions datafusion/proto/src/bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
// under the License.

//! Serialization / Deserialization to Bytes
use crate::logical_plan::{AsLogicalPlan, LogicalExtensionCodec};
use crate::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec};
use crate::{from_proto::parse_expr, protobuf};
use arrow::datatypes::SchemaRef;
use datafusion::datasource::TableProvider;
use crate::logical_plan::{
self, AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
};
use crate::physical_plan::{
AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
};
use crate::protobuf;
use datafusion::physical_plan::functions::make_scalar_function;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{
create_udaf, create_udf, Expr, Extension, LogicalPlan, Volatility,
};
use datafusion_expr::{create_udaf, create_udf, Expr, LogicalPlan, Volatility};
use prost::{
bytes::{Bytes, BytesMut},
Message,
Expand Down Expand Up @@ -137,7 +137,7 @@ impl Serializeable for Expr {
DataFusionError::Plan(format!("Error decoding expr as protobuf: {}", e))
})?;

parse_expr(&protobuf, registry).map_err(|e| {
logical_plan::from_proto::parse_expr(&protobuf, registry).map_err(|e| {
DataFusionError::Plan(format!("Error parsing protobuf into Expr: {}", e))
})
}
Expand Down Expand Up @@ -272,75 +272,6 @@ pub fn physical_plan_from_bytes_with_extension_codec(
protobuf.try_into_physical_plan(ctx, &ctx.runtime_env(), extension_codec)
}

#[derive(Debug)]
struct DefaultLogicalExtensionCodec {}

impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
fn try_decode(
&self,
_buf: &[u8],
_inputs: &[LogicalPlan],
_ctx: &SessionContext,
) -> Result<Extension> {
Err(DataFusionError::NotImplemented(
"No extension codec provided".to_string(),
))
}

fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
Err(DataFusionError::NotImplemented(
"No extension codec provided".to_string(),
))
}

fn try_decode_table_provider(
&self,
_buf: &[u8],
_schema: SchemaRef,
_ctx: &SessionContext,
) -> std::result::Result<Arc<dyn TableProvider>, DataFusionError> {
Err(DataFusionError::NotImplemented(
"No codec provided to for TableProviders".to_string(),
))
}

fn try_encode_table_provider(
&self,
_node: Arc<dyn TableProvider>,
_buf: &mut Vec<u8>,
) -> std::result::Result<(), DataFusionError> {
Err(DataFusionError::NotImplemented(
"No codec provided to for TableProviders".to_string(),
))
}
}

#[derive(Debug)]
pub struct DefaultPhysicalExtensionCodec {}

impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
fn try_decode(
&self,
_buf: &[u8],
_inputs: &[Arc<dyn ExecutionPlan>],
_registry: &dyn FunctionRegistry,
) -> Result<Arc<dyn ExecutionPlan>> {
Err(DataFusionError::NotImplemented(
"PhysicalExtensionCodec is not provided".to_string(),
))
}

fn try_encode(
&self,
_node: Arc<dyn ExecutionPlan>,
_buf: &mut Vec<u8>,
) -> Result<()> {
Err(DataFusionError::NotImplemented(
"PhysicalExtensionCodec is not provided".to_string(),
))
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
42 changes: 41 additions & 1 deletion datafusion/proto/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,46 @@ pub fn str_to_byte(s: &String) -> Result<u8, DataFusionError> {
Ok(s.as_bytes()[0])
}

pub(crate) fn proto_error<S: Into<String>>(message: S) -> DataFusionError {
pub fn byte_to_string(b: u8) -> Result<String, DataFusionError> {
let b = &[b];
let b = std::str::from_utf8(b)
.map_err(|_| DataFusionError::Internal("Invalid CSV delimiter".to_owned()))?;
Ok(b.to_owned())
}

#[macro_export]
macro_rules! convert_required {
($PB:expr) => {{
if let Some(field) = $PB.as_ref() {
Ok(field.try_into()?)
} else {
Err(proto_error("Missing required field in protobuf"))
}
}};
}

#[macro_export]
macro_rules! into_required {
($PB:expr) => {{
if let Some(field) = $PB.as_ref() {
Ok(field.into())
} else {
Err(proto_error("Missing required field in protobuf"))
}
}};
}

#[macro_export]
macro_rules! convert_box_required {
($PB:expr) => {{
if let Some(field) = $PB.as_ref() {
field.as_ref().try_into()
} else {
Err(proto_error("Missing required field in protobuf"))
}
}};
}

pub fn proto_error<S: Into<String>>(message: S) -> DataFusionError {
DataFusionError::Internal(message.into())
}
Loading

0 comments on commit a1a6368

Please sign in to comment.