Skip to content

Commit

Permalink
DataFrameOptions and clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
devinjdangelo committed Aug 4, 2023
1 parent eb1fac7 commit 41698bb
Show file tree
Hide file tree
Showing 12 changed files with 115 additions and 68 deletions.
32 changes: 29 additions & 3 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use arrow::compute::{cast, concat};
use arrow::datatypes::{DataType, Field};
use async_trait::async_trait;
use datafusion_common::{DataFusionError, SchemaError};
use futures::StreamExt;
use parquet::file::properties::WriterProperties;

use datafusion_common::{Column, DFSchema, ScalarValue};
Expand Down Expand Up @@ -54,6 +53,33 @@ use crate::physical_plan::{collect, collect_partitioned};
use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan};
use crate::prelude::SessionContext;

/// Contains options that control how data is
/// written out from a DataFrame
pub struct DataFrameWriteOptions {
/// Controls if existing data should be overwritten
overwrite: bool, // TODO, enable DataFrame COPY TO write without TableProvider
// settings such as LOCATION and FILETYPE can be set here
// e.g. add location: Option<Path>
}

impl DataFrameWriteOptions {
/// Create a new DataFrameWriteOptions with default values
pub fn new() -> Self {
DataFrameWriteOptions { overwrite: false }
}
/// Set the overwrite option to true or false
pub fn with_overwrite(mut self, overwrite: bool) -> Self {
self.overwrite = overwrite;
self
}
}

impl Default for DataFrameWriteOptions {
fn default() -> Self {
Self::new()
}
}

/// DataFrame represents a logical set of rows with the same named columns.
/// Similar to a [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) or
/// [Spark DataFrame](https://spark.apache.org/docs/latest/sql-programming-guide.html)
Expand Down Expand Up @@ -934,14 +960,14 @@ impl DataFrame {
pub async fn write_table(
self,
table_name: &str,
overwrite: bool,
write_options: DataFrameWriteOptions,
) -> Result<Vec<RecordBatch>, DataFusionError> {
let arrow_schema = Schema::from(self.schema());
let plan = LogicalPlanBuilder::insert_into(
self.plan,
table_name.to_owned(),
&arrow_schema,
overwrite,
write_options.overwrite,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
Expand Down
9 changes: 6 additions & 3 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow::{self, datatypes::SchemaRef};
use arrow_array::RecordBatch;
use chrono::{DateTime, NaiveDate, Utc};
use datafusion_common::DataFusionError;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;
Expand Down Expand Up @@ -270,8 +269,10 @@ impl FileFormat for CsvFormat {
_state: &SessionState,
conf: FileSinkConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite{
return Err(DataFusionError::NotImplemented("Overwrites are not implemented yet for CSV".into()))
if conf.overwrite {
return Err(DataFusionError::NotImplemented(
"Overwrites are not implemented yet for CSV".into(),
));
}
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(CsvSink::new(
Expand Down Expand Up @@ -639,6 +640,8 @@ impl DataSink for CsvSink {
// Map errors to DatafusionError.
let err_converter =
|_| DataFusionError::Internal("Unexpected FileSink Error".to_string());
// TODO parallelize serialization accross partitions and batches within partitions
// see: https://github.com/apache/arrow-datafusion/issues/7079
for idx in 0..num_partitions {
while let Some(maybe_batch) = data[idx].next().await {
// Write data to files in a round robin fashion:
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::listing::{ListingTableUrl, ListingTableInsertMode};
use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl};
use crate::datasource::{
file_format::{
avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat,
Expand Down Expand Up @@ -179,12 +179,14 @@ impl<'a> CsvReadOptions<'a> {
self
}

pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>)->Self{
/// Configure if file has known sort order
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}

pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self{
/// Configure how insertions to this table should be handled
pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
self.insert_mode = insert_mode;
self
}
Expand All @@ -195,7 +197,7 @@ impl<'a> CsvReadOptions<'a> {
/// Note this structure is supplied when a datasource is created and
/// can not not vary from statement to statement. For settings that
/// can vary statement to statement see
/// [`ConfigOptions`](crate::config::ConfigO ptions).
/// [`ConfigOptions`](crate::config::ConfigOptions).
#[derive(Clone)]
pub struct ParquetReadOptions<'a> {
/// File extension; only files with this extension are selected for data input.
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use std::pin::Pin;
use std::sync::Arc;

pub use self::url::ListingTableUrl;
pub use table::{ListingOptions, ListingTable, ListingTableConfig, ListingTableInsertMode};
pub use table::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableInsertMode,
};

/// Stream of files get listed from object store
pub type PartitionedFileStream =
Expand Down
80 changes: 44 additions & 36 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,8 @@ impl ListingOptions {
self
}

pub fn with_insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self{
/// Configure how insertions to this table should be handled.
pub fn with_insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
self.insert_mode = insert_mode;
self
}
Expand Down Expand Up @@ -823,17 +824,27 @@ impl TableProvider for ListingTable {
//we can append to that file. Otherwise, we can write new files into the directory
//adding new files to the listing table in order to insert to the table.
let input_partitions = input.output_partitioning().partition_count();
match self.options.insert_mode{
match self.options.insert_mode {
ListingTableInsertMode::AppendToFile => {
if input_partitions > file_groups.len(){
return Err(DataFusionError::Plan(format!("Cannot append {input_partitions} partitions to {} files!", file_groups.len())))
if input_partitions > file_groups.len() {
return Err(DataFusionError::Plan(format!(
"Cannot append {input_partitions} partitions to {} files!",
file_groups.len()
)));
}
writer_mode = crate::datasource::file_format::FileWriterMode::Append;
},
ListingTableInsertMode::AppendNewFiles => writer_mode = crate::datasource::file_format::FileWriterMode::PutMultipart,
ListingTableInsertMode::Error => return Err(DataFusionError::Plan("Invalid plan attempting write to table with TableWriteMode::Error!".into())),
}
ListingTableInsertMode::AppendNewFiles => {
writer_mode = crate::datasource::file_format::FileWriterMode::PutMultipart
}
ListingTableInsertMode::Error => {
return Err(DataFusionError::Plan(
"Invalid plan attempting write to table with TableWriteMode::Error!"
.into(),
))
}
}

// Sink related option, apart from format
let config = FileSinkConfig {
object_store_url: self.table_paths()[0].object_store(),
Expand Down Expand Up @@ -1432,8 +1443,8 @@ mod tests {
let table_path = ListingTableUrl::parse(temp_path).unwrap();

let file_format = CsvFormat::default();
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_insert_mode(insert_mode);
let listing_options =
ListingOptions::new(Arc::new(file_format)).with_insert_mode(insert_mode);

let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
Expand Down Expand Up @@ -1583,8 +1594,11 @@ mod tests {
let tmp_dir = TempDir::new()?;
let path = tmp_dir.path().join(filename);

let initial_table =
load_empty_schema_csv_table(schema.clone(), path.to_str().unwrap(), ListingTableInsertMode::AppendToFile)?;
let initial_table = load_empty_schema_csv_table(
schema.clone(),
path.to_str().unwrap(),
ListingTableInsertMode::AppendToFile,
)?;
session_ctx.register_table("t", initial_table)?;
// Create and register the source table with the provided schema and inserted data
let source_table = Arc::new(MemTable::try_new(
Expand Down Expand Up @@ -1712,9 +1726,6 @@ mod tests {

#[tokio::test]
async fn test_append_new_files_to_csv_table() -> Result<()> {
let file_type = FileType::CSV;
let file_compression_type = FileCompressionType::UNCOMPRESSED;

// Create the initial context, schema, and batch.
let session_ctx = SessionContext::new();
// Create a new schema with one field called "a" of type Int32
Expand All @@ -1730,28 +1741,17 @@ mod tests {
vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))],
)?;

// Filename with extension
let filename = format!(
"path{}",
file_type
.to_owned()
.get_ext_with_compression(file_compression_type.clone())
.unwrap()
);

// Define batch size for file reader
let batch_size = batch.num_rows();

// Create a temporary directory and a CSV file within it.
let tmp_dir = TempDir::new()?;
let path = tmp_dir.path().join(filename);

session_ctx.register_csv("t", tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new()
.insert_mode(ListingTableInsertMode::AppendNewFiles)
.schema(schema.as_ref()))
session_ctx
.register_csv(
"t",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new()
.insert_mode(ListingTableInsertMode::AppendNewFiles)
.schema(schema.as_ref()),
)
.await?;
let csv_table = session_ctx.table_provider("t").await?;
// Create and register the source table with the provided schema and inserted data
let source_table = Arc::new(MemTable::try_new(
schema.clone(),
Expand Down Expand Up @@ -1788,7 +1788,11 @@ mod tests {
assert_batches_eq!(expected, &res);

// Read the records in the table
let batches = session_ctx.sql("select count(*) from t").await?.collect().await?;
let batches = session_ctx
.sql("select count(*) from t")
.await?
.collect()
.await?;
let expected = vec![
"+----------+",
"| COUNT(*) |",
Expand Down Expand Up @@ -1825,7 +1829,11 @@ mod tests {
assert_batches_eq!(expected, &res);

// Read the contents of the table
let batches = session_ctx.sql("select count(*) from t").await?.collect().await?;
let batches = session_ctx
.sql("select count(*) from t")
.await?
.collect()
.await?;

// Define the expected result after the second append.
let expected = vec![
Expand Down
10 changes: 8 additions & 2 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ impl TableProvider for MemTable {
"Inserting query must have the same schema with the table."
);
}
if overwrite {
return Err(DataFusionError::NotImplemented(
"Overwrite not implemented for MemoryTable yet".into(),
));
}
let sink = Arc::new(MemSink::new(self.batches.clone()));
Ok(Arc::new(InsertExec::new(input, sink, self.schema.clone())))
}
Expand Down Expand Up @@ -264,8 +269,9 @@ impl DataSink for MemSink {
let mut i = 0;
let mut row_count = 0;
let num_parts = data.len();
for idx in 0..num_parts {
while let Some(batch) = data[idx].next().await.transpose()? {
// TODO parallelize outer and inner loops
for data_part in data.iter_mut().take(num_parts) {
while let Some(batch) = data_part.next().await.transpose()? {
row_count += batch.num_rows();
new_batches[i].push(batch);
i = (i + 1) % num_partitions;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ pub struct FileSinkConfig {
/// A writer mode that determines how data is written to the file
pub writer_mode: FileWriterMode,
/// Controls whether existing data should be overwritten by this sink
pub overwrite: bool
pub overwrite: bool,
}

impl FileSinkConfig {
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/datasource/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ pub trait TableProvider: Sync + Send {
let msg = "Insert into not implemented for this table".to_owned();
Err(DataFusionError::NotImplemented(msg))
}

}

/// A factory which creates [`TableProvider`]s at runtime given a URL.
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/physical_plan/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use std::fmt::Debug;
use std::sync::Arc;

use crate::physical_plan::stream::RecordBatchStreamAdapter;
use crate::physical_plan::Distribution;
use datafusion_common::DataFusionError;
use datafusion_execution::TaskContext;

Expand Down Expand Up @@ -232,8 +231,10 @@ impl ExecutionPlan for InsertExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if partition!=0{
return Err(DataFusionError::Internal("InsertExec can only be called on partition 0!".into()))
if partition != 0 {
return Err(DataFusionError::Internal(
"InsertExec can only be called on partition 0!".into(),
));
}
let data = self.make_all_input_streams(context.clone())?;

Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

//! Planner for [`LogicalPlan`] to [`ExecutionPlan`]
use crate::datasource::listing::ListingTableInsertMode;
use crate::datasource::source_as_provider;
use crate::execution::context::{ExecutionProps, SessionState};
use crate::logical_expr::utils::generate_sort_key;
Expand Down
13 changes: 7 additions & 6 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,13 @@ impl LogicalPlanBuilder {
overwrite: bool,
) -> Result<Self> {
let table_schema = table_schema.clone().to_dfschema_ref()?;
let op;
if overwrite{
op = WriteOp::InsertOverwrite;
} else{
op = WriteOp::InsertInto;
}

let op = if overwrite {
WriteOp::InsertOverwrite
} else {
WriteOp::InsertInto
};

Ok(Self::from(LogicalPlan::Dml(DmlStatement {
table_name: table_name.into(),
table_schema,
Expand Down
14 changes: 7 additions & 7 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,13 +1024,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
})
.collect::<Result<Vec<datafusion_expr::Expr>>>()?;
let source = project(source, exprs)?;
let op;
if overwrite{
op = WriteOp::InsertOverwrite
} else{
op = WriteOp::InsertInto
}

let op = if overwrite {
WriteOp::InsertOverwrite
} else {
WriteOp::InsertInto
};

let plan = LogicalPlan::Dml(DmlStatement {
table_name,
table_schema: Arc::new(table_schema),
Expand Down

0 comments on commit 41698bb

Please sign in to comment.