Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify DataFrame and SQL (Insert Into) Write Methods #7141

Merged
merged 10 commits into from
Aug 5, 2023
48 changes: 48 additions & 0 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,33 @@ use crate::physical_plan::{collect, collect_partitioned};
use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan};
use crate::prelude::SessionContext;

/// Contains options that control how data is
/// written out from a DataFrame
pub struct DataFrameWriteOptions {
/// Controls if existing data should be overwritten
overwrite: bool, // TODO, enable DataFrame COPY TO write without TableProvider
// settings such as LOCATION and FILETYPE can be set here
// e.g. add location: Option<Path>
}

impl DataFrameWriteOptions {
/// Create a new DataFrameWriteOptions with default values
pub fn new() -> Self {
DataFrameWriteOptions { overwrite: false }
}
/// Set the overwrite option to true or false
pub fn with_overwrite(mut self, overwrite: bool) -> Self {
self.overwrite = overwrite;
self
}
}

impl Default for DataFrameWriteOptions {
fn default() -> Self {
Self::new()
}
}

/// DataFrame represents a logical set of rows with the same named columns.
/// Similar to a [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) or
/// [Spark DataFrame](https://spark.apache.org/docs/latest/sql-programming-guide.html)
Expand Down Expand Up @@ -925,6 +952,27 @@ impl DataFrame {
))
}

/// Write this DataFrame to the referenced table
/// This method uses on the same underlying implementation
/// as the SQL Insert Into statement.
/// Unlike most other DataFrame methods, this method executes
/// eagerly, writing data, and returning the count of rows written.
pub async fn write_table(
self,
table_name: &str,
write_options: DataFrameWriteOptions,
) -> Result<Vec<RecordBatch>, DataFusionError> {
let arrow_schema = Schema::from(self.schema());
let plan = LogicalPlanBuilder::insert_into(
self.plan,
table_name.to_owned(),
&arrow_schema,
write_options.overwrite,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
}

/// Write a `DataFrame` to a CSV file.
pub async fn write_csv(self, path: &str) -> Result<()> {
let plan = self.session_state.create_physical_plan(&self.plan).await?;
Expand Down
116 changes: 82 additions & 34 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, InsertExec};
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use rand::distributions::{Alphanumeric, DistString};

/// The default file extension of csv files
pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
Expand Down Expand Up @@ -268,6 +269,11 @@ impl FileFormat for CsvFormat {
_state: &SessionState,
conf: FileSinkConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return Err(DataFusionError::NotImplemented(
"Overwrites are not implemented yet for CSV".into(),
));
}
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(CsvSink::new(
conf,
Expand Down Expand Up @@ -560,10 +566,10 @@ impl CsvSink {
impl DataSink for CsvSink {
async fn write_all(
&self,
mut data: SendableRecordBatchStream,
mut data: Vec<SendableRecordBatchStream>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

context: &Arc<TaskContext>,
) -> Result<u64> {
let num_partitions = self.config.file_groups.len();
let num_partitions = data.len();

let object_store = context
.runtime_env()
Expand All @@ -572,44 +578,86 @@ impl DataSink for CsvSink {
// Construct serializer and writer for each file group
let mut serializers = vec![];
let mut writers = vec![];
for file_group in &self.config.file_groups {
// In append mode, consider has_header flag only when file is empty (at the start).
// For other modes, use has_header flag as is.
let header = self.has_header
&& (!matches!(&self.config.writer_mode, FileWriterMode::Append)
|| file_group.object_meta.size == 0);
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
serializers.push(serializer);

let file = file_group.clone();
let writer = self
.create_writer(file.object_meta.clone().into(), object_store.clone())
.await?;
writers.push(writer);
match self.config.writer_mode {
FileWriterMode::Append => {
for file_group in &self.config.file_groups {
// In append mode, consider has_header flag only when file is empty (at the start).
// For other modes, use has_header flag as is.
let header = self.has_header
&& (!matches!(&self.config.writer_mode, FileWriterMode::Append)
|| file_group.object_meta.size == 0);
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
serializers.push(serializer);

let file = file_group.clone();
let writer = self
.create_writer(
file.object_meta.clone().into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
FileWriterMode::Put => {
return Err(DataFusionError::NotImplemented(
"Put Mode is not implemented for CSV Sink yet".into(),
))
}
FileWriterMode::PutMultipart => {
//currently assuming only 1 partition path (i.e. not hive style partitioning on a column)
let base_path = &self.config.table_paths[0];
//uniquely identify this batch of files with a random string, to prevent collisions overwriting files
let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
let header = true;
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
serializers.push(serializer);
let file_path = base_path
.prefix()
.child(format!("/{}_{}.csv", write_id, part_idx));
let object_meta = ObjectMeta {
location: file_path,
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = self
.create_writer(object_meta.into(), object_store.clone())
.await?;
writers.push(writer);
}
}
}

let mut idx = 0;
let mut row_count = 0;
// Map errors to DatafusionError.
let err_converter =
|_| DataFusionError::Internal("Unexpected FileSink Error".to_string());
while let Some(maybe_batch) = data.next().await {
// Write data to files in a round robin fashion:
idx = (idx + 1) % num_partitions;
let serializer = &mut serializers[idx];
let batch = check_for_errors(maybe_batch, &mut writers).await?;
row_count += batch.num_rows();
let bytes =
check_for_errors(serializer.serialize(batch).await, &mut writers).await?;
let writer = &mut writers[idx];
check_for_errors(
writer.write_all(&bytes).await.map_err(err_converter),
&mut writers,
)
.await?;
// TODO parallelize serialization accross partitions and batches within partitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this todo can probably be done with some fancy async stream stuff -- however, I got a little hung on on how to handle the abort case. I'll try and think on it some more

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nested join sets / task spawning seems to work well based on a quick test which I described here. I haven't thoroughly thought through or tested how aborts are handled with this code yet though. Parquet is also going to be more challenging to parallelize in this way since the serializer is stateful.

This will be the fun part to experiment with!

// see: https://github.com/apache/arrow-datafusion/issues/7079
for idx in 0..num_partitions {
while let Some(maybe_batch) = data[idx].next().await {
// Write data to files in a round robin fashion:
let serializer = &mut serializers[idx];
let batch = check_for_errors(maybe_batch, &mut writers).await?;
row_count += batch.num_rows();
let bytes =
check_for_errors(serializer.serialize(batch).await, &mut writers)
.await?;
let writer = &mut writers[idx];
check_for_errors(
writer.write_all(&bytes).await.map_err(err_converter),
&mut writers,
)
.await?;
}
}
// Perform cleanup:
let n_writers = writers.len();
Expand Down
25 changes: 22 additions & 3 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::listing::ListingTableUrl;
use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl};
use crate::datasource::{
file_format::{
avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat,
Expand All @@ -39,6 +39,7 @@ use crate::datasource::{
};
use crate::error::Result;
use crate::execution::context::{SessionConfig, SessionState};
use crate::logical_expr::Expr;

/// Options that control the reading of CSV files.
///
Expand Down Expand Up @@ -73,6 +74,10 @@ pub struct CsvReadOptions<'a> {
pub file_compression_type: FileCompressionType,
/// Flag indicating whether this file may be unbounded (as in a FIFO file).
pub infinite: bool,
/// Indicates how the file is sorted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

pub file_sort_order: Vec<Vec<Expr>>,
/// Setting controls how inserts to this file should be handled
pub insert_mode: ListingTableInsertMode,
}

impl<'a> Default for CsvReadOptions<'a> {
Expand All @@ -95,6 +100,8 @@ impl<'a> CsvReadOptions<'a> {
table_partition_cols: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
infinite: false,
file_sort_order: vec![],
insert_mode: ListingTableInsertMode::AppendToFile,
}
}

Expand Down Expand Up @@ -171,6 +178,18 @@ impl<'a> CsvReadOptions<'a> {
self.file_compression_type = file_compression_type;
self
}

/// Configure if file has known sort order
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}

/// Configure how insertions to this table should be handled
pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
self.insert_mode = insert_mode;
self
}
}

/// Options that control the reading of Parquet files.
Expand Down Expand Up @@ -461,9 +480,9 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_file_extension(self.file_extension)
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
// TODO: Add file sort order into CsvReadOptions and introduce here.
.with_file_sort_order(vec![])
.with_file_sort_order(self.file_sort_order.clone())
.with_infinite_source(self.infinite)
.with_insert_mode(self.insert_mode.clone())
}

async fn get_resolved_schema(
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use std::pin::Pin;
use std::sync::Arc;

pub use self::url::ListingTableUrl;
pub use table::{ListingOptions, ListingTable, ListingTableConfig};
pub use table::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableInsertMode,
};

/// Stream of files get listed from object store
pub type PartitionedFileStream =
Expand Down
Loading