Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it possible to compile datafusion-common without default features #7625

Merged
merged 4 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/common/src/file_options/file_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub enum FileType {
/// Apache Avro file
AVRO,
/// Apache Parquet file
#[cfg(feature = "parquet")]
PARQUET,
/// CSV file
CSV,
Expand All @@ -60,6 +61,7 @@ impl GetExt for FileType {
match self {
FileType::ARROW => DEFAULT_ARROW_EXTENSION.to_owned(),
FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(),
#[cfg(feature = "parquet")]
FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(),
FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(),
FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(),
Expand All @@ -72,6 +74,7 @@ impl Display for FileType {
let out = match self {
FileType::CSV => "csv",
FileType::JSON => "json",
#[cfg(feature = "parquet")]
FileType::PARQUET => "parquet",
FileType::AVRO => "avro",
FileType::ARROW => "arrow",
Expand All @@ -88,6 +91,7 @@ impl FromStr for FileType {
match s.as_str() {
"ARROW" => Ok(FileType::ARROW),
"AVRO" => Ok(FileType::AVRO),
#[cfg(feature = "parquet")]
"PARQUET" => Ok(FileType::PARQUET),
"CSV" => Ok(FileType::CSV),
"JSON" | "NDJSON" => Ok(FileType::JSON),
Expand Down
10 changes: 9 additions & 1 deletion datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod avro_writer;
pub mod csv_writer;
pub mod file_type;
pub mod json_writer;
#[cfg(feature = "parquet")]
pub mod parquet_writer;
pub(crate) mod parse_utils;

Expand All @@ -37,10 +38,12 @@ use crate::{
DataFusionError, FileType, Result,
};

#[cfg(feature = "parquet")]
use self::parquet_writer::ParquetWriterOptions;

use self::{
arrow_writer::ArrowWriterOptions, avro_writer::AvroWriterOptions,
csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions,
parquet_writer::ParquetWriterOptions,
};

/// Represents a single arbitrary setting in a
Expand Down Expand Up @@ -145,6 +148,7 @@ impl StatementOptions {
/// plus any DataFusion specific writing options (e.g. CSV compression)
#[derive(Clone, Debug)]
pub enum FileTypeWriterOptions {
#[cfg(feature = "parquet")]
Parquet(ParquetWriterOptions),
CSV(CsvWriterOptions),
JSON(JsonWriterOptions),
Expand All @@ -164,6 +168,7 @@ impl FileTypeWriterOptions {
let options = (config_defaults, statement_options);

let file_type_write_options = match file_type {
#[cfg(feature = "parquet")]
FileType::PARQUET => {
FileTypeWriterOptions::Parquet(ParquetWriterOptions::try_from(options)?)
}
Expand Down Expand Up @@ -193,6 +198,7 @@ impl FileTypeWriterOptions {
let options = (config_defaults, &empty_statement);

let file_type_write_options = match file_type {
#[cfg(feature = "parquet")]
FileType::PARQUET => {
FileTypeWriterOptions::Parquet(ParquetWriterOptions::try_from(options)?)
}
Expand All @@ -215,6 +221,7 @@ impl FileTypeWriterOptions {

/// Tries to extract ParquetWriterOptions from this FileTypeWriterOptions enum.
/// Returns an error if a different type from parquet is set.
#[cfg(feature = "parquet")]
pub fn try_into_parquet(&self) -> Result<&ParquetWriterOptions> {
match self {
FileTypeWriterOptions::Parquet(opt) => Ok(opt),
Expand Down Expand Up @@ -281,6 +288,7 @@ impl Display for FileTypeWriterOptions {
FileTypeWriterOptions::Avro(_) => "AvroWriterOptions",
FileTypeWriterOptions::CSV(_) => "CsvWriterOptions",
FileTypeWriterOptions::JSON(_) => "JsonWriterOptions",
#[cfg(feature = "parquet")]
FileTypeWriterOptions::Parquet(_) => "ParquetWriterOptions",
};
write!(f, "{}", name)
Expand Down
175 changes: 166 additions & 9 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@

use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};

use crate::{
config::ConfigOptions,
file_options::parse_utils::{
parse_compression_string, parse_encoding_string, parse_statistics_string,
parse_version_string,
},
DataFusionError, Result,
};
use crate::{config::ConfigOptions, DataFusionError, Result};

use super::StatementOptions;

use super::{parse_utils::split_option_and_column_path, StatementOptions};
use parquet::{
basic::{BrotliLevel, GzipLevel, ZstdLevel},
file::properties::{EnabledStatistics, WriterVersion},
schema::types::ColumnPath,
};

/// Options for writing parquet files
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -214,3 +213,161 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for ParquetWriterOptions {
})
}
}

/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding
pub(crate) fn parse_encoding_string(
str_setting: &str,
) -> Result<parquet::basic::Encoding> {
let str_setting_lower: &str = &str_setting.to_lowercase();
match str_setting_lower {
"plain" => Ok(parquet::basic::Encoding::PLAIN),
"plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
"rle" => Ok(parquet::basic::Encoding::RLE),
"bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
"delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
"delta_length_byte_array" => {
Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY)
}
"delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY),
"rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY),
"byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT),
_ => Err(DataFusionError::Configuration(format!(
"Unknown or unsupported parquet encoding: \
{str_setting}. Valid values are: plain, plain_dictionary, rle, \
bit_packed, delta_binary_packed, delta_length_byte_array, \
delta_byte_array, rle_dictionary, and byte_stream_split."
))),
}
}

/// Splits compression string into compression codec and optional compression_level
/// I.e. gzip(2) -> gzip, 2
fn split_compression_string(str_setting: &str) -> Result<(String, Option<u32>)> {
// ignore string literal chars passed from sqlparser i.e. remove single quotes
let str_setting = str_setting.replace('\'', "");
let split_setting = str_setting.split_once('(');

match split_setting {
Some((codec, rh)) => {
let level = &rh[..rh.len() - 1].parse::<u32>().map_err(|_| {
DataFusionError::Configuration(format!(
"Could not parse compression string. \
Got codec: {} and unknown level from {}",
codec, str_setting
))
})?;
Ok((codec.to_owned(), Some(*level)))
}
None => Ok((str_setting.to_owned(), None)),
}
}

/// Helper to ensure compression codecs which don't support levels
/// don't have one set. E.g. snappy(2) is invalid.
fn check_level_is_none(codec: &str, level: &Option<u32>) -> Result<()> {
if level.is_some() {
return Err(DataFusionError::Configuration(format!(
"Compression {codec} does not support specifying a level"
)));
}
Ok(())
}

/// Helper to ensure compression codecs which require a level
/// do have one set. E.g. zstd is invalid, zstd(3) is valid
fn require_level(codec: &str, level: Option<u32>) -> Result<u32> {
level.ok_or(DataFusionError::Configuration(format!(
"{codec} compression requires specifying a level such as {codec}(4)"
)))
}

/// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression
pub(crate) fn parse_compression_string(
str_setting: &str,
) -> Result<parquet::basic::Compression> {
let str_setting_lower: &str = &str_setting.to_lowercase();
let (codec, level) = split_compression_string(str_setting_lower)?;
let codec = codec.as_str();
match codec {
"uncompressed" => {
check_level_is_none(codec, &level)?;
Ok(parquet::basic::Compression::UNCOMPRESSED)
}
"snappy" => {
check_level_is_none(codec, &level)?;
Ok(parquet::basic::Compression::SNAPPY)
}
"gzip" => {
let level = require_level(codec, level)?;
Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new(
level,
)?))
}
"lzo" => {
check_level_is_none(codec, &level)?;
Ok(parquet::basic::Compression::LZO)
}
"brotli" => {
let level = require_level(codec, level)?;
Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
level,
)?))
}
"lz4" => {
check_level_is_none(codec, &level)?;
Ok(parquet::basic::Compression::LZ4)
}
"zstd" => {
let level = require_level(codec, level)?;
Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(
level as i32,
)?))
}
"lz4_raw" => {
check_level_is_none(codec, &level)?;
Ok(parquet::basic::Compression::LZ4_RAW)
}
_ => Err(DataFusionError::Configuration(format!(
"Unknown or unsupported parquet compression: \
{str_setting}. Valid values are: uncompressed, snappy, gzip(level), \
lzo, brotli(level), lz4, zstd(level), and lz4_raw."
))),
}
}

pub(crate) fn parse_version_string(str_setting: &str) -> Result<WriterVersion> {
let str_setting_lower: &str = &str_setting.to_lowercase();
match str_setting_lower {
"1.0" => Ok(WriterVersion::PARQUET_1_0),
"2.0" => Ok(WriterVersion::PARQUET_2_0),
_ => Err(DataFusionError::Configuration(format!(
"Unknown or unsupported parquet writer version {str_setting} \
valid options are '1.0' and '2.0'"
))),
}
}

pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
let str_setting_lower: &str = &str_setting.to_lowercase();
match str_setting_lower {
"none" => Ok(EnabledStatistics::None),
"chunk" => Ok(EnabledStatistics::Chunk),
"page" => Ok(EnabledStatistics::Page),
_ => Err(DataFusionError::Configuration(format!(
"Unknown or unsupported parquet statistics setting {str_setting} \
valid options are 'none', 'page', and 'chunk'"
))),
}
}

pub(crate) fn split_option_and_column_path(
str_setting: &str,
) -> (String, Option<ColumnPath>) {
match str_setting.replace('\'', "").split_once("::") {
Some((s1, s2)) => {
let col_path = ColumnPath::new(s2.split('.').map(|s| s.to_owned()).collect());
(s1.to_owned(), Some(col_path))
}
None => (str_setting.to_owned(), None),
}
}
Loading