Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add not_impl_err error macro #7340

Merged
merged 2 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions benchmarks/src/tpch/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::fs;
use std::path::{Path, PathBuf};
use std::time::Instant;

use datafusion::common::not_impl_err;
use datafusion::error::DataFusionError;
use datafusion::error::Result;
use datafusion::prelude::*;
Expand Down Expand Up @@ -117,9 +118,7 @@ impl ConvertOpt {
ctx.write_parquet(csv, output_path, Some(props)).await?
}
other => {
return Err(DataFusionError::NotImplemented(format!(
"Invalid output format: {other}"
)));
return not_impl_err!("Invalid output format: {other}");
}
}
println!("Conversion completed in {} ms", start.elapsed().as_millis());
Expand All @@ -139,9 +138,7 @@ impl ConvertOpt {
"lz0" => Compression::LZO,
"zstd" => Compression::ZSTD(Default::default()),
other => {
return Err(DataFusionError::NotImplemented(format!(
"Invalid compression format: {other}"
)));
return not_impl_err!("Invalid compression format: {other}");
}
})
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,9 +451,13 @@ make_error!(plan_err, Plan);
// Exposes a macro to create `DataFusionError::Internal`
make_error!(internal_err, Internal);

// Exposes a macro to create `DataFusionError::NotImplemented`
make_error!(not_impl_err, NotImplemented);

// To avoid compiler error when using macro in the same crate:
// macros from the current crate cannot be referred to by absolute paths
pub use internal_err as _internal_err;
pub use not_impl_err as _not_impl_err;

#[cfg(test)]
mod test {
Expand Down
5 changes: 2 additions & 3 deletions datafusion/common/src/join_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
str::FromStr,
};

use crate::error::_not_impl_err;
use crate::{DataFusionError, Result};

/// Join type
Expand Down Expand Up @@ -81,9 +82,7 @@ impl FromStr for JoinType {
"RIGHTSEMI" => Ok(JoinType::RightSemi),
"LEFTANTI" => Ok(JoinType::LeftAnti),
"RIGHTANTI" => Ok(JoinType::RightAnti),
_ => Err(DataFusionError::NotImplemented(format!(
"The join type {s} does not exist or is not implemented"
))),
_ => _not_impl_err!("The join type {s} does not exist or is not implemented"),
}
}
}
Expand Down
30 changes: 14 additions & 16 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::cast::{
as_fixed_size_binary_array, as_fixed_size_list_array, as_list_array, as_struct_array,
};
use crate::delta::shift_months;
use crate::error::{DataFusionError, Result, _internal_err};
use crate::error::{DataFusionError, Result, _internal_err, _not_impl_err};
use arrow::buffer::NullBuffer;
use arrow::compute::nullif;
use arrow::datatypes::{i256, FieldRef, Fields, SchemaBuilder};
Expand Down Expand Up @@ -617,9 +617,7 @@ macro_rules! decimal_right {
-$TERM
};
($TERM:expr, /) => {
Err(DataFusionError::NotImplemented(format!(
"Decimal reciprocation not yet supported",
)))
_not_impl_err!("Decimal reciprocation not yet supported",)
};
}

Expand Down Expand Up @@ -1870,9 +1868,9 @@ impl ScalarValue {
ScalarValue::DurationNanosecond(None)
}
_ => {
return Err(DataFusionError::NotImplemented(format!(
return _not_impl_err!(
"Can't create a zero scalar from data_type \"{datatype:?}\""
)));
);
}
})
}
Expand All @@ -1892,9 +1890,9 @@ impl ScalarValue {
DataType::Float32 => ScalarValue::Float32(Some(1.0)),
DataType::Float64 => ScalarValue::Float64(Some(1.0)),
_ => {
return Err(DataFusionError::NotImplemented(format!(
return _not_impl_err!(
"Can't create an one scalar from data_type \"{datatype:?}\""
)));
);
}
})
}
Expand All @@ -1910,9 +1908,9 @@ impl ScalarValue {
DataType::Float32 => ScalarValue::Float32(Some(-1.0)),
DataType::Float64 => ScalarValue::Float64(Some(-1.0)),
_ => {
return Err(DataFusionError::NotImplemented(format!(
return _not_impl_err!(
"Can't create a negative one scalar from data_type \"{datatype:?}\""
)));
);
}
})
}
Expand All @@ -1931,9 +1929,9 @@ impl ScalarValue {
DataType::Float32 => ScalarValue::Float32(Some(10.0)),
DataType::Float64 => ScalarValue::Float64(Some(10.0)),
_ => {
return Err(DataFusionError::NotImplemented(format!(
return _not_impl_err!(
"Can't create a negative one scalar from data_type \"{datatype:?}\""
)));
);
}
})
}
Expand Down Expand Up @@ -3257,9 +3255,9 @@ impl ScalarValue {
}

other => {
return Err(DataFusionError::NotImplemented(format!(
return _not_impl_err!(
"Can't create a scalar from array of type \"{other:?}\""
)));
);
}
})
}
Expand Down Expand Up @@ -3820,9 +3818,9 @@ impl TryFrom<&DataType> for ScalarValue {
DataType::Struct(fields) => ScalarValue::Struct(None, fields.clone()),
DataType::Null => ScalarValue::Null,
_ => {
return Err(DataFusionError::NotImplemented(format!(
return _not_impl_err!(
"Can't create a scalar from data_type \"{datatype:?}\""
)));
);
}
})
}
Expand Down
10 changes: 3 additions & 7 deletions datafusion/core/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub use datafusion_sql::{ResolvedTableReference, TableReference};

use crate::catalog::schema::SchemaProvider;
use dashmap::DashMap;
use datafusion_common::{DataFusionError, Result};
use datafusion_common::{not_impl_err, DataFusionError, Result};
use std::any::Any;
use std::sync::Arc;

Expand Down Expand Up @@ -125,9 +125,7 @@ pub trait CatalogProvider: Sync + Send {
// use variables to avoid unused variable warnings
let _ = name;
let _ = schema;
Err(DataFusionError::NotImplemented(
"Registering new schemas is not supported".to_string(),
))
not_impl_err!("Registering new schemas is not supported")
}

/// Removes a schema from this catalog. Implementations of this method should return
Expand All @@ -145,9 +143,7 @@ pub trait CatalogProvider: Sync + Send {
_name: &str,
_cascade: bool,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
Err(DataFusionError::NotImplemented(
"Deregistering new schemas is not supported".to_string(),
))
not_impl_err!("Deregistering new schemas is not supported")
}
}

Expand Down
16 changes: 5 additions & 11 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow::{self, datatypes::SchemaRef};
use arrow_array::RecordBatch;
use datafusion_common::DataFusionError;
use datafusion_common::{not_impl_err, DataFusionError};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;

Expand Down Expand Up @@ -267,15 +267,11 @@ impl FileFormat for CsvFormat {
conf: FileSinkConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return Err(DataFusionError::NotImplemented(
"Overwrites are not implemented yet for CSV".into(),
));
return not_impl_err!("Overwrites are not implemented yet for CSV");
}

if self.file_compression_type != FileCompressionType::UNCOMPRESSED {
return Err(DataFusionError::NotImplemented(
"Inserting compressed CSV is not implemented yet.".into(),
));
return not_impl_err!("Inserting compressed CSV is not implemented yet.");
}

let sink_schema = conf.output_schema().clone();
Expand Down Expand Up @@ -512,7 +508,7 @@ impl DataSink for CsvSink {
match self.config.writer_mode {
FileWriterMode::Append => {
if !self.config.per_thread_output {
return Err(DataFusionError::NotImplemented("per_thread_output=false is not implemented for CsvSink in Append mode".into()));
return not_impl_err!("per_thread_output=false is not implemented for CsvSink in Append mode");
}
for file_group in &self.config.file_groups {
// In append mode, consider has_header flag only when file is empty (at the start).
Expand All @@ -538,9 +534,7 @@ impl DataSink for CsvSink {
}
}
FileWriterMode::Put => {
return Err(DataFusionError::NotImplemented(
"Put Mode is not implemented for CSV Sink yet".into(),
))
return not_impl_err!("Put Mode is not implemented for CSV Sink yet")
}
FileWriterMode::PutMultipart => {
// Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column)
Expand Down
22 changes: 6 additions & 16 deletions datafusion/core/src/datasource/file_format/file_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! File type abstraction
use crate::common::internal_err;
use crate::common::{internal_err, not_impl_err};
use crate::datasource::file_format::arrow::DEFAULT_ARROW_EXTENSION;
use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
Expand Down Expand Up @@ -144,9 +144,7 @@ impl FileCompressionType {
.boxed(),
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
return not_impl_err!("Compression feature is not enabled")
}
UNCOMPRESSED => s.boxed(),
})
Expand All @@ -169,9 +167,7 @@ impl FileCompressionType {
ZSTD => Box::new(ZstdEncoder::new(w)),
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
return not_impl_err!("Compression feature is not enabled")
}
UNCOMPRESSED => w,
})
Expand Down Expand Up @@ -201,9 +197,7 @@ impl FileCompressionType {
.boxed(),
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
return not_impl_err!("Compression feature is not enabled")
}
UNCOMPRESSED => s.boxed(),
})
Expand All @@ -228,9 +222,7 @@ impl FileCompressionType {
},
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
return not_impl_err!("Compression feature is not enabled")
}
UNCOMPRESSED => Box::new(r),
})
Expand Down Expand Up @@ -275,9 +267,7 @@ impl FromStr for FileType {
"PARQUET" => Ok(FileType::PARQUET),
"CSV" => Ok(FileType::CSV),
"JSON" | "NDJSON" => Ok(FileType::JSON),
_ => Err(DataFusionError::NotImplemented(format!(
"Unknown FileType: {s}"
))),
_ => not_impl_err!("Unknown FileType: {s}"),
}
}
}
Expand Down
15 changes: 5 additions & 10 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use std::any::Any;

use bytes::Bytes;
use datafusion_common::not_impl_err;
use datafusion_common::DataFusionError;
use datafusion_execution::TaskContext;
use rand::distributions::Alphanumeric;
Expand Down Expand Up @@ -174,15 +175,11 @@ impl FileFormat for JsonFormat {
conf: FileSinkConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return Err(DataFusionError::NotImplemented(
"Overwrites are not implemented yet for Json".into(),
));
return not_impl_err!("Overwrites are not implemented yet for Json");
}

if self.file_compression_type != FileCompressionType::UNCOMPRESSED {
return Err(DataFusionError::NotImplemented(
"Inserting compressed JSON is not implemented yet.".into(),
));
return not_impl_err!("Inserting compressed JSON is not implemented yet.");
}
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(JsonSink::new(conf, self.file_compression_type));
Expand Down Expand Up @@ -281,7 +278,7 @@ impl DataSink for JsonSink {
match self.config.writer_mode {
FileWriterMode::Append => {
if !self.config.per_thread_output {
return Err(DataFusionError::NotImplemented("per_thread_output=false is not implemented for JsonSink in Append mode".into()));
return not_impl_err!("per_thread_output=false is not implemented for JsonSink in Append mode");
}
for file_group in &self.config.file_groups {
let serializer = JsonSerializer::new();
Expand All @@ -299,9 +296,7 @@ impl DataSink for JsonSink {
}
}
FileWriterMode::Put => {
return Err(DataFusionError::NotImplemented(
"Put Mode is not implemented for Json Sink yet".into(),
))
return not_impl_err!("Put Mode is not implemented for Json Sink yet")
}
FileWriterMode::PutMultipart => {
// Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column)
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};

use datafusion_common::DataFusionError;
use datafusion_common::{not_impl_err, DataFusionError};
use datafusion_physical_expr::PhysicalExpr;

use async_trait::async_trait;
Expand Down Expand Up @@ -100,8 +100,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
_state: &SessionState,
_conf: FileSinkConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
let msg = "Writer not implemented for this format".to_owned();
Err(DataFusionError::NotImplemented(msg))
not_impl_err!("Writer not implemented for this format")
}
}

Expand Down
Loading