Skip to content

Commit

Permalink
feat: Datafusion file sink (#2155)
Browse files Browse the repository at this point in the history
This writer is very simple compared to the built-in ones, it doesn't
support as many features and generates one file per COPY/INSERT
operation.

Closes #2148.

---------

Co-authored-by: Andrew Duffy <[email protected]>
  • Loading branch information
AdamGS and a10y authored Jan 31, 2025
1 parent 46c770b commit f7b5d56
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 95 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ log = { workspace = true }
moka = { workspace = true, features = ["future", "sync"] }
object_store = { workspace = true }
pin-project = { workspace = true }
rand = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "fs"] }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion vortex-datafusion/src/persistent/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ impl ExecutionPlan for VortexExec {
object_store,
projection,
self.predicate.clone(),
arrow_schema,
self.initial_read_cache.clone(),
self.file_scan_config.clone(),
)?;
let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?;

Expand Down
83 changes: 55 additions & 28 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ use datafusion::execution::SessionState;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
use datafusion_common::{
not_impl_err, ColumnStatistics, DataFusionError, GetExt, Result as DFResult, ScalarValue,
Statistics,
config_datafusion_err, not_impl_err, ColumnStatistics, DataFusionError, GetExt,
Result as DFResult, ScalarValue, Statistics,
};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::Expr;
use datafusion_physical_expr::{LexRequirement, PhysicalExpr};
use datafusion_physical_plan::insert::DataSinkExec;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::ExecutionPlan;
use futures::{stream, StreamExt as _, TryStreamExt as _};
Expand All @@ -30,6 +32,7 @@ use vortex_scalar::Scalar;

use super::cache::FileLayoutCache;
use super::execution::VortexExec;
use super::sink::VortexSink;
use crate::can_be_pushed_down;

/// Vortex implementation of a DataFusion [`FileFormat`].
Expand Down Expand Up @@ -88,8 +91,14 @@ impl FileFormatFactory for VortexFormatFactory {
fn create(
&self,
_state: &SessionState,
_format_options: &std::collections::HashMap<String, String>,
format_options: &std::collections::HashMap<String, String>,
) -> DFResult<Arc<dyn FileFormat>> {
if !format_options.is_empty() {
return Err(config_datafusion_err!(
"Vortex tables don't support any options"
));
}

Ok(Arc::new(VortexFormat::new(self.context.clone())))
}

Expand Down Expand Up @@ -282,12 +291,24 @@ impl FileFormat for VortexFormat {

async fn create_writer_physical_plan(
&self,
_input: Arc<dyn ExecutionPlan>,
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
_conf: FileSinkConfig,
_order_requirements: Option<LexRequirement>,
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
not_impl_err!("Writer not implemented for this format")
if conf.insert_op != InsertOp::Append {
return not_impl_err!("Overwrites are not implemented yet for Parquet");
}

let sink_schema = conf.output_schema().clone();
let sink = Arc::new(VortexSink::new(conf));

Ok(Arc::new(DataSinkExec::new(
input,
sink,
sink_schema,
order_requirements,
)) as _)
}

fn supports_filters_pushdown(
Expand All @@ -310,45 +331,26 @@ impl FileFormat for VortexFormat {

#[cfg(test)]
mod tests {
use datafusion::datasource::provider::DefaultTableFactory;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::SessionContext;
use tempfile::TempDir;

use super::*;

/// Utility function to register Vortex with a [`SessionStateBuilder`]
fn register_vortex_format_factory(
factory: VortexFormatFactory,
session_state_builder: &mut SessionStateBuilder,
) {
if let Some(table_factories) = session_state_builder.table_factories() {
table_factories.insert(
factory.get_ext().to_uppercase(), // Has to be uppercase
Arc::new(DefaultTableFactory::new()),
);
}

if let Some(file_formats) = session_state_builder.file_formats() {
file_formats.push(Arc::new(factory));
}
}
use crate::persistent::register_vortex_format_factory;

#[tokio::test]
async fn create_table() {
let dir = TempDir::new().unwrap();

let factory = VortexFormatFactory::default_config();
let mut session_state_builder = SessionStateBuilder::new().with_default_features();

register_vortex_format_factory(factory, &mut session_state_builder);

let session = SessionContext::new_with_state(session_state_builder.build());

let df = session
.sql(&format!(
"CREATE EXTERNAL TABLE my_tbl \
(c1 VARCHAR NOT NULL, c2 INT NOT NULL)
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
STORED AS vortex LOCATION '{}'",
dir.path().to_str().unwrap()
))
Expand All @@ -357,4 +359,29 @@ mod tests {

assert_eq!(df.count().await.unwrap(), 0);
}

#[tokio::test]
#[should_panic]
async fn fail_table_config() {
let dir = TempDir::new().unwrap();

let factory = VortexFormatFactory::default_config();
let mut session_state_builder = SessionStateBuilder::new().with_default_features();
register_vortex_format_factory(factory, &mut session_state_builder);
let session = SessionContext::new_with_state(session_state_builder.build());

session
.sql(&format!(
"CREATE EXTERNAL TABLE my_tbl \
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
STORED AS vortex LOCATION '{}' \
OPTIONS( some_key 'value' );",
dir.path().to_str().unwrap()
))
.await
.unwrap()
.collect()
.await
.unwrap();
}
}
19 changes: 19 additions & 0 deletions vortex-datafusion/src/persistent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,24 @@ mod config;
mod execution;
mod format;
mod opener;
mod sink;

pub use format::{VortexFormat, VortexFormatFactory, VortexFormatOptions};

#[cfg(test)]
/// Utility function to register Vortex with a [`SessionStateBuilder`]
fn register_vortex_format_factory(
factory: VortexFormatFactory,
session_state_builder: &mut datafusion::execution::SessionStateBuilder,
) {
if let Some(table_factories) = session_state_builder.table_factories() {
table_factories.insert(
datafusion_common::GetExt::get_ext(&factory).to_uppercase(), // Has to be uppercase
std::sync::Arc::new(datafusion::datasource::provider::DefaultTableFactory::new()),
);
}

if let Some(file_formats) = session_state_builder.file_formats() {
file_formats.push(std::sync::Arc::new(factory));
}
}
18 changes: 12 additions & 6 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener, FileScanConfig};
use datafusion_common::Result as DFResult;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
use futures::{FutureExt as _, StreamExt, TryStreamExt};
use object_store::ObjectStore;
use tokio::runtime::Handle;
use vortex_array::array::StructArray;
use vortex_array::arrow::FromArrowType;
use vortex_array::ContextRef;
use vortex_dtype::{DType, FieldNames};
Expand All @@ -27,6 +26,7 @@ pub(crate) struct VortexFileOpener {
pub projection: ExprRef,
pub filter: Option<ExprRef>,
pub(crate) file_layout_cache: FileLayoutCache,
pub config: FileScanConfig,
}

impl VortexFileOpener {
Expand All @@ -35,10 +35,10 @@ impl VortexFileOpener {
object_store: Arc<dyn ObjectStore>,
projection: Option<FieldNames>,
predicate: Option<Arc<dyn PhysicalExpr>>,
arrow_schema: SchemaRef,
file_layout_cache: FileLayoutCache,
config: FileScanConfig,
) -> VortexResult<Self> {
let dtype = DType::from_arrow(arrow_schema);
let dtype = DType::from_arrow(config.file_schema.clone());
let filter = predicate
.as_ref()
// If we cannot convert an expr to a vortex expr, we run no filter, since datafusion
Expand Down Expand Up @@ -67,6 +67,7 @@ impl VortexFileOpener {
projection,
filter,
file_layout_cache,
config,
})
}
}
Expand All @@ -92,9 +93,14 @@ impl FileOpener for VortexFileOpener {
.open(read_at)
.await?;

let (projected_arrow_schema, ..) = this.config.project();

Ok(vxf
.scan(Scan::new(this.projection.clone()).with_some_filter(this.filter.clone()))?
.map_ok(RecordBatch::try_from)
.map_ok(move |array| {
let st = StructArray::try_from(array)?;
st.into_record_batch_with_schema(projected_arrow_schema.as_ref())
})
.map(|r| r.and_then(|inner| inner))
.map_err(|e| e.into())
.boxed())
Expand Down
Loading

0 comments on commit f7b5d56

Please sign in to comment.