Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Datafusion file sink #2155

Merged
merged 11 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ log = { workspace = true }
moka = { workspace = true, features = ["future", "sync"] }
object_store = { workspace = true }
pin-project = { workspace = true }
rand = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "fs"] }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion vortex-datafusion/src/persistent/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ impl ExecutionPlan for VortexExec {
object_store,
projection,
self.predicate.clone(),
arrow_schema,
self.initial_read_cache.clone(),
self.file_scan_config.clone(),
)?;
let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?;

Expand Down
83 changes: 55 additions & 28 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ use datafusion::execution::SessionState;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
use datafusion_common::{
not_impl_err, ColumnStatistics, DataFusionError, GetExt, Result as DFResult, ScalarValue,
Statistics,
config_datafusion_err, not_impl_err, ColumnStatistics, DataFusionError, GetExt,
Result as DFResult, ScalarValue, Statistics,
};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::Expr;
use datafusion_physical_expr::{LexRequirement, PhysicalExpr};
use datafusion_physical_plan::insert::DataSinkExec;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::ExecutionPlan;
use futures::{stream, StreamExt as _, TryStreamExt as _};
Expand All @@ -30,6 +32,7 @@ use vortex_scalar::Scalar;

use super::cache::FileLayoutCache;
use super::execution::VortexExec;
use super::sink::VortexSink;
use crate::can_be_pushed_down;

/// Vortex implementation of a DataFusion [`FileFormat`].
Expand Down Expand Up @@ -88,8 +91,14 @@ impl FileFormatFactory for VortexFormatFactory {
fn create(
&self,
_state: &SessionState,
_format_options: &std::collections::HashMap<String, String>,
format_options: &std::collections::HashMap<String, String>,
) -> DFResult<Arc<dyn FileFormat>> {
if !format_options.is_empty() {
return Err(config_datafusion_err!(
"Vortex tables don't support any options"
));
}

Ok(Arc::new(VortexFormat::new(self.context.clone())))
}

Expand Down Expand Up @@ -282,12 +291,24 @@ impl FileFormat for VortexFormat {

async fn create_writer_physical_plan(
&self,
_input: Arc<dyn ExecutionPlan>,
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
_conf: FileSinkConfig,
_order_requirements: Option<LexRequirement>,
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
not_impl_err!("Writer not implemented for this format")
if conf.insert_op != InsertOp::Append {
return not_impl_err!("Overwrites are not implemented yet for Parquet");
}

let sink_schema = conf.output_schema().clone();
let sink = Arc::new(VortexSink::new(conf));

Ok(Arc::new(DataSinkExec::new(
input,
sink,
sink_schema,
order_requirements,
)) as _)
}

fn supports_filters_pushdown(
Expand All @@ -310,45 +331,26 @@ impl FileFormat for VortexFormat {

#[cfg(test)]
mod tests {
use datafusion::datasource::provider::DefaultTableFactory;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::SessionContext;
use tempfile::TempDir;

use super::*;

/// Utility function to register Vortex with a [`SessionStateBuilder`]
fn register_vortex_format_factory(
factory: VortexFormatFactory,
session_state_builder: &mut SessionStateBuilder,
) {
if let Some(table_factories) = session_state_builder.table_factories() {
table_factories.insert(
factory.get_ext().to_uppercase(), // Has to be uppercase
Arc::new(DefaultTableFactory::new()),
);
}

if let Some(file_formats) = session_state_builder.file_formats() {
file_formats.push(Arc::new(factory));
}
}
use crate::persistent::register_vortex_format_factory;

#[tokio::test]
async fn create_table() {
let dir = TempDir::new().unwrap();

let factory = VortexFormatFactory::default_config();
let mut session_state_builder = SessionStateBuilder::new().with_default_features();

register_vortex_format_factory(factory, &mut session_state_builder);

let session = SessionContext::new_with_state(session_state_builder.build());

let df = session
.sql(&format!(
"CREATE EXTERNAL TABLE my_tbl \
(c1 VARCHAR NOT NULL, c2 INT NOT NULL)
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
STORED AS vortex LOCATION '{}'",
dir.path().to_str().unwrap()
))
Expand All @@ -357,4 +359,29 @@ mod tests {

assert_eq!(df.count().await.unwrap(), 0);
}

#[tokio::test]
#[should_panic]
async fn fail_table_config() {
let dir = TempDir::new().unwrap();

let factory = VortexFormatFactory::default_config();
let mut session_state_builder = SessionStateBuilder::new().with_default_features();
register_vortex_format_factory(factory, &mut session_state_builder);
let session = SessionContext::new_with_state(session_state_builder.build());

session
.sql(&format!(
"CREATE EXTERNAL TABLE my_tbl \
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
STORED AS vortex LOCATION '{}' \
OPTIONS( some_key 'value' );",
dir.path().to_str().unwrap()
))
.await
.unwrap()
.collect()
.await
.unwrap();
}
}
19 changes: 19 additions & 0 deletions vortex-datafusion/src/persistent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,24 @@ mod config;
mod execution;
mod format;
mod opener;
mod sink;

pub use format::{VortexFormat, VortexFormatFactory, VortexFormatOptions};

#[cfg(test)]
/// Utility function to register Vortex with a [`SessionStateBuilder`]
fn register_vortex_format_factory(
factory: VortexFormatFactory,
session_state_builder: &mut datafusion::execution::SessionStateBuilder,
) {
if let Some(table_factories) = session_state_builder.table_factories() {
table_factories.insert(
datafusion_common::GetExt::get_ext(&factory).to_uppercase(), // Has to be uppercase
std::sync::Arc::new(datafusion::datasource::provider::DefaultTableFactory::new()),
);
}

if let Some(file_formats) = session_state_builder.file_formats() {
file_formats.push(std::sync::Arc::new(factory));
}
}
18 changes: 12 additions & 6 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener, FileScanConfig};
use datafusion_common::Result as DFResult;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
use futures::{FutureExt as _, StreamExt, TryStreamExt};
use object_store::ObjectStore;
use tokio::runtime::Handle;
use vortex_array::array::StructArray;
use vortex_array::arrow::FromArrowType;
use vortex_array::ContextRef;
use vortex_dtype::{DType, FieldNames};
Expand All @@ -27,6 +26,7 @@ pub(crate) struct VortexFileOpener {
pub projection: ExprRef,
pub filter: Option<ExprRef>,
pub(crate) file_layout_cache: FileLayoutCache,
pub config: FileScanConfig,
}

impl VortexFileOpener {
Expand All @@ -35,10 +35,10 @@ impl VortexFileOpener {
object_store: Arc<dyn ObjectStore>,
projection: Option<FieldNames>,
predicate: Option<Arc<dyn PhysicalExpr>>,
arrow_schema: SchemaRef,
file_layout_cache: FileLayoutCache,
config: FileScanConfig,
) -> VortexResult<Self> {
let dtype = DType::from_arrow(arrow_schema);
let dtype = DType::from_arrow(config.file_schema.clone());
let filter = predicate
.as_ref()
// If we cannot convert an expr to a vortex expr, we run no filter, since datafusion
Expand Down Expand Up @@ -67,6 +67,7 @@ impl VortexFileOpener {
projection,
filter,
file_layout_cache,
config,
})
}
}
Expand All @@ -92,9 +93,14 @@ impl FileOpener for VortexFileOpener {
.open(read_at)
.await?;

let (projected_arrow_schema, ..) = this.config.project();

Ok(vxf
.scan(Scan::new(this.projection.clone()).with_some_filter(this.filter.clone()))?
.map_ok(RecordBatch::try_from)
.map_ok(move |array| {
let st = StructArray::try_from(array)?;
st.into_record_batch_with_schema(projected_arrow_schema.as_ref())
})
.map(|r| r.and_then(|inner| inner))
.map_err(|e| e.into())
.boxed())
Expand Down
Loading
Loading