From 6f0805ede7dc473e6a8afd7a27b8eb94715203ee Mon Sep 17 00:00:00 2001 From: parkma99 Date: Wed, 26 Jul 2023 00:38:02 +0800 Subject: [PATCH 1/4] refactor byte_to_string and string_to_byte --- datafusion/proto/src/common.rs | 17 ++++------------- datafusion/proto/src/logical_plan/mod.rs | 12 ++++++------ datafusion/proto/src/physical_plan/mod.rs | 14 +++++++------- 3 files changed, 17 insertions(+), 26 deletions(-) diff --git a/datafusion/proto/src/common.rs b/datafusion/proto/src/common.rs index ed826f587413..5fdee158185a 100644 --- a/datafusion/proto/src/common.rs +++ b/datafusion/proto/src/common.rs @@ -17,26 +17,17 @@ use datafusion_common::{DataFusionError, Result}; -pub fn csv_delimiter_to_string(b: u8) -> Result { - let b = &[b]; - let b = std::str::from_utf8(b) - .map_err(|_| DataFusionError::Internal("Invalid CSV delimiter".to_owned()))?; - Ok(b.to_owned()) -} - -pub fn str_to_byte(s: &String) -> Result { +pub fn str_to_byte(s: &String, flag: &str) -> Result { if s.len() != 1 { - return Err(DataFusionError::Internal( - "Invalid CSV delimiter".to_owned(), - )); + return Err(DataFusionError::Internal(format!("Invalid CSV {flag}"))); } Ok(s.as_bytes()[0]) } -pub fn byte_to_string(b: u8) -> Result { +pub fn byte_to_string(b: u8, flag: &str) -> Result { let b = &[b]; let b = std::str::from_utf8(b) - .map_err(|_| DataFusionError::Internal("Invalid CSV delimiter".to_owned()))?; + .map_err(|_| DataFusionError::Internal(format!("Invalid CSV {flag}")))?; Ok(b.to_owned()) } diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 27f9389678cf..831ac10197fd 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -353,10 +353,10 @@ impl AsLogicalPlan for LogicalPlanNode { }) => { let mut csv = CsvFormat::default() .with_has_header(*has_header) - .with_delimiter(str_to_byte(delimiter)?) - .with_quote(str_to_byte(quote)?); + .with_delimiter(str_to_byte(delimiter, "delimiter")?) + .with_quote(str_to_byte(quote, "quote")?); if let Some(protobuf::csv_format::OptionalEscape::Escape(escape)) = optional_escape { - csv = csv.with_quote(str_to_byte(escape)?); + csv = csv.with_quote(str_to_byte(escape, "escape")?); } Arc::new(csv)}, FileFormatType::Avro(..) => Arc::new(AvroFormat), @@ -850,12 +850,12 @@ impl AsLogicalPlan for LogicalPlanNode { FileFormatType::Parquet(protobuf::ParquetFormat {}) } else if let Some(csv) = any.downcast_ref::() { FileFormatType::Csv(protobuf::CsvFormat { - delimiter: byte_to_string(csv.delimiter())?, + delimiter: byte_to_string(csv.delimiter(), "delimiter")?, has_header: csv.has_header(), - quote: byte_to_string(csv.quote())?, + quote: byte_to_string(csv.quote(), "quote")?, optional_escape: if let Some(escape) = csv.escape() { Some(protobuf::csv_format::OptionalEscape::Escape( - byte_to_string(escape)?, + byte_to_string(escape, "escape")?, )) } else { None diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index cebe4eddcc75..e97a773d3472 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -51,8 +51,8 @@ use datafusion_common::{DataFusionError, Result}; use prost::bytes::BufMut; use prost::Message; +use crate::common::str_to_byte; use crate::common::{byte_to_string, proto_error}; -use crate::common::{csv_delimiter_to_string, str_to_byte}; use crate::physical_plan::from_proto::{ parse_physical_expr, parse_physical_sort_expr, parse_protobuf_file_scan_config, }; @@ -155,13 +155,13 @@ impl AsExecutionPlan for PhysicalPlanNode { registry, )?, scan.has_header, - str_to_byte(&scan.delimiter)?, - str_to_byte(&scan.quote)?, + str_to_byte(&scan.delimiter, "delimiter")?, + str_to_byte(&scan.quote, "quote")?, if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape( escape, )) = &scan.optional_escape { - Some(str_to_byte(escape)?) + Some(str_to_byte(escape, "escape")?) } else { None }, @@ -1079,11 +1079,11 @@ impl AsExecutionPlan for PhysicalPlanNode { protobuf::CsvScanExecNode { base_conf: Some(exec.base_config().try_into()?), has_header: exec.has_header(), - delimiter: csv_delimiter_to_string(exec.delimiter())?, - quote: byte_to_string(exec.quote())?, + delimiter: byte_to_string(exec.delimiter(), "delimiter")?, + quote: byte_to_string(exec.quote(), "quote")?, optional_escape: if let Some(escape) = exec.escape() { Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape( - byte_to_string(escape)?, + byte_to_string(escape, "escape")?, )) } else { None From e93f951ef54991cb106e1c9c6d20c88358df19ac Mon Sep 17 00:00:00 2001 From: parkma99 Date: Thu, 27 Jul 2023 13:17:42 +0800 Subject: [PATCH 2/4] update --- datafusion/proto/src/common.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/datafusion/proto/src/common.rs b/datafusion/proto/src/common.rs index 5fdee158185a..dca4519df5fe 100644 --- a/datafusion/proto/src/common.rs +++ b/datafusion/proto/src/common.rs @@ -17,17 +17,22 @@ use datafusion_common::{DataFusionError, Result}; -pub fn str_to_byte(s: &String, flag: &str) -> Result { +pub fn str_to_byte(s: &String, description: &str) -> Result { if s.len() != 1 { - return Err(DataFusionError::Internal(format!("Invalid CSV {flag}"))); + return Err(DataFusionError::Internal(format!( + "Invalid CSV {description}: expected single character, got {s}" + ))); } Ok(s.as_bytes()[0]) } -pub fn byte_to_string(b: u8, flag: &str) -> Result { +pub fn byte_to_string(b: u8, description: &str) -> Result { let b = &[b]; - let b = std::str::from_utf8(b) - .map_err(|_| DataFusionError::Internal(format!("Invalid CSV {flag}")))?; + let b = std::str::from_utf8(b).map_err(|_| { + DataFusionError::Internal(format!( + "Invalid CSV {description}: can not represent {b:0x?} as utf8" + )) + })?; Ok(b.to_owned()) } From d1a6352a60d940668a20040578f02d929e6251bf Mon Sep 17 00:00:00 2001 From: jakevin Date: Thu, 27 Jul 2023 13:57:23 +0800 Subject: [PATCH 3/4] use pub(crate) --- datafusion/proto/src/common.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/proto/src/common.rs b/datafusion/proto/src/common.rs index dca4519df5fe..1ecddaeca1ee 100644 --- a/datafusion/proto/src/common.rs +++ b/datafusion/proto/src/common.rs @@ -17,7 +17,7 @@ use datafusion_common::{DataFusionError, Result}; -pub fn str_to_byte(s: &String, description: &str) -> Result { +pub(crate) fn str_to_byte(s: &String, description: &str) -> Result { if s.len() != 1 { return Err(DataFusionError::Internal(format!( "Invalid CSV {description}: expected single character, got {s}" From d27847b7cb93379494e52e667e48dc6109c65315 Mon Sep 17 00:00:00 2001 From: jakevin Date: Thu, 27 Jul 2023 13:57:49 +0800 Subject: [PATCH 4/4] use pub(crate) --- datafusion/proto/src/common.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/proto/src/common.rs b/datafusion/proto/src/common.rs index 1ecddaeca1ee..cbbb469f0863 100644 --- a/datafusion/proto/src/common.rs +++ b/datafusion/proto/src/common.rs @@ -26,7 +26,7 @@ pub(crate) fn str_to_byte(s: &String, description: &str) -> Result { Ok(s.as_bytes()[0]) } -pub fn byte_to_string(b: u8, description: &str) -> Result { +pub(crate) fn byte_to_string(b: u8, description: &str) -> Result { let b = &[b]; let b = std::str::from_utf8(b).map_err(|_| { DataFusionError::Internal(format!(