Skip to content

Commit

Permalink
Add write_csv to DataFrame (#1922)
Browse files Browse the repository at this point in the history
* Add write_csv to DataFrame

* Cleanup

* Update write_csv signature
  • Loading branch information
matthewmturner authored Mar 6, 2022
1 parent dd94fcf commit 0b9b30a
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 35 deletions.
3 changes: 3 additions & 0 deletions datafusion/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,4 +405,7 @@ pub trait DataFrame: Send + Sync {
/// # }
/// ```
fn except(&self, dataframe: Arc<dyn DataFrame>) -> Result<Arc<dyn DataFrame>>;

/// Write a `DataFrame` to a CSV file.
async fn write_csv(&self, path: &str) -> Result<()>;
}
35 changes: 3 additions & 32 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use std::{fs, path::PathBuf};
use futures::{StreamExt, TryStreamExt};
use tokio::task::{self, JoinHandle};

use arrow::{csv, datatypes::SchemaRef};
use arrow::datatypes::SchemaRef;

use crate::catalog::{
catalog::{CatalogProvider, MemoryCatalogProvider},
Expand Down Expand Up @@ -80,6 +80,7 @@ use crate::physical_optimizer::repartition::Repartition;

use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::logical_plan::plan::Explain;
use crate::physical_plan::file_format::plan_to_csv;
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udf::ScalarUDF;
use crate::physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -717,37 +718,7 @@ impl ExecutionContext {
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
) -> Result<()> {
let path = path.as_ref();
// create directory to contain the CSV files (one per partition)
let fs_path = Path::new(path);
let runtime = self.runtime_env();
match fs::create_dir(fs_path) {
Ok(()) => {
let mut tasks = vec![];
for i in 0..plan.output_partitioning().partition_count() {
let plan = plan.clone();
let filename = format!("part-{}.csv", i);
let path = fs_path.join(&filename);
let file = fs::File::create(path)?;
let mut writer = csv::Writer::new(file);
let stream = plan.execute(i, runtime.clone()).await?;
let handle: JoinHandle<Result<()>> = task::spawn(async move {
stream
.map(|batch| writer.write(&batch?))
.try_collect()
.await
.map_err(DataFusionError::from)
});
tasks.push(handle);
}
futures::future::join_all(tasks).await;
Ok(())
}
Err(e) => Err(DataFusionError::Execution(format!(
"Could not create directory {}: {:?}",
path, e
))),
}
plan_to_csv(self, plan, path).await
}

/// Executes a query and writes the results to a partitioned Parquet file.
Expand Down
8 changes: 8 additions & 0 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::{
use crate::arrow::util::pretty;
use crate::datasource::TableProvider;
use crate::datasource::TableType;
use crate::physical_plan::file_format::plan_to_csv;
use crate::physical_plan::{
execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream,
};
Expand Down Expand Up @@ -313,6 +314,13 @@ impl DataFrame for DataFrameImpl {
&LogicalPlanBuilder::except(left_plan, right_plan, true)?,
)))
}

async fn write_csv(&self, path: &str) -> Result<()> {
let plan = self.create_physical_plan().await?;
let state = self.ctx_state.lock().clone();
let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
plan_to_csv(&ctx, plan, path).await
}
}

#[cfg(test)]
Expand Down
136 changes: 133 additions & 3 deletions datafusion/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@
//! Execution plan for reading CSV files
use crate::error::{DataFusionError, Result};
use crate::execution::context::ExecutionContext;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};

use crate::execution::runtime_env::RuntimeEnv;
use arrow::csv;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use std::any::Any;
use std::fs;
use std::path::Path;
use std::sync::Arc;

use crate::execution::runtime_env::RuntimeEnv;
use async_trait::async_trait;
use tokio::task::{self, JoinHandle};

use super::file_stream::{BatchIter, FileStream};
use super::FileScanConfig;
Expand Down Expand Up @@ -176,16 +180,59 @@ impl ExecutionPlan for CsvExec {
}
}

pub async fn plan_to_csv(
context: &ExecutionContext,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
) -> Result<()> {
let path = path.as_ref();
// create directory to contain the CSV files (one per partition)
let fs_path = Path::new(path);
let runtime = context.runtime_env();
match fs::create_dir(fs_path) {
Ok(()) => {
let mut tasks = vec![];
for i in 0..plan.output_partitioning().partition_count() {
let plan = plan.clone();
let filename = format!("part-{}.csv", i);
let path = fs_path.join(&filename);
let file = fs::File::create(path)?;
let mut writer = csv::Writer::new(file);
let stream = plan.execute(i, runtime.clone()).await?;
let handle: JoinHandle<Result<()>> = task::spawn(async move {
stream
.map(|batch| writer.write(&batch?))
.try_collect()
.await
.map_err(DataFusionError::from)
});
tasks.push(handle);
}
futures::future::join_all(tasks).await;
Ok(())
}
Err(e) => Err(DataFusionError::Execution(format!(
"Could not create directory {}: {:?}",
path, e
))),
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::prelude::*;
use crate::test_util::aggr_test_schema_with_missing_col;
use crate::{
datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem},
scalar::ScalarValue,
test_util::aggr_test_schema,
};
use arrow::datatypes::*;
use futures::StreamExt;
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;

#[tokio::test]
async fn csv_exec_with_projection() -> Result<()> {
Expand Down Expand Up @@ -376,4 +423,87 @@ mod tests {
crate::assert_batches_eq!(expected, &[batch.slice(0, 5)]);
Ok(())
}

/// Generate CSV partitions within the supplied directory
fn populate_csv_partitions(
tmp_dir: &TempDir,
partition_count: usize,
file_extension: &str,
) -> Result<SchemaRef> {
// define schema for data source (csv file)
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::UInt32, false),
Field::new("c2", DataType::UInt64, false),
Field::new("c3", DataType::Boolean, false),
]));

// generate a partitioned file
for partition in 0..partition_count {
let filename = format!("partition-{}.{}", partition, file_extension);
let file_path = tmp_dir.path().join(&filename);
let mut file = File::create(file_path)?;

// generate some data
for i in 0..=10 {
let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
file.write_all(data.as_bytes())?;
}
}

Ok(schema)
}

#[tokio::test]
async fn write_csv_results() -> Result<()> {
// create partitioned input file and context
let tmp_dir = TempDir::new()?;
let mut ctx = ExecutionContext::with_config(
ExecutionConfig::new().with_target_partitions(8),
);

let schema = populate_csv_partitions(&tmp_dir, 8, ".csv")?;

// register csv file with the execution context
ctx.register_csv(
"test",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new().schema(&schema),
)
.await?;

// execute a simple query and write the results to CSV
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
df.write_csv(&out_dir).await?;

// create a new context and verify that the results were saved to a partitioned csv file
let mut ctx = ExecutionContext::new();

let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::UInt32, false),
Field::new("c2", DataType::UInt64, false),
]));

// register each partition as well as the top level dir
let csv_read_option = CsvReadOptions::new().schema(&schema);
ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option)
.await?;
ctx.register_csv("allparts", &out_dir, csv_read_option)
.await?;

let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?;
let allparts = ctx
.sql("SELECT c1, c2 FROM allparts")
.await?
.collect()
.await?;

let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum();

assert_eq!(part0[0].schema(), allparts[0].schema());

assert_eq!(allparts_count, 80);

Ok(())
}
}
1 change: 1 addition & 0 deletions datafusion/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use arrow::{
record_batch::RecordBatch,
};
pub use avro::AvroExec;
pub(crate) use csv::plan_to_csv;
pub use csv::CsvExec;
pub use json::NdJsonExec;

Expand Down

0 comments on commit 0b9b30a

Please sign in to comment.