Skip to content

Commit

Permalink
feat: enable passing storage options to Delta table builder via DataF…
Browse files Browse the repository at this point in the history
…usion's CREATE EXTERNAL TABLE (delta-io#1043)

# Description
We've recently added Delta table support to
[Seafowl](https://github.com/splitgraph/seafowl) using delta-rs, which
utilizes the new `OPTIONS` clause in sqlparser/DataFusion. It allows
propagating a set of key/values down to the `DeltaTableBuilder`, which
in turn can use those to instantiate a corresponding object store
client. This means someone can now define a delta table without relying
on env vars as:
```sql
CREATE EXTERNAL TABLE my_delta
STORED AS DELTATABLE
OPTIONS ('AWS_ACCESS_KEY_ID' 'secret', 'AWS_SECRET_ACCESS_KEY' 'also_secret', 'AWS_REGION' 'eu-west-3') 
LOCATION 's3://my-bucket/my-delta-table/'
```

I've also changed the existing datafusion integration tests to use this
approach to exercise it.

I'm not sure whether it makes sense to merge this PR upstream, but
opening this PR just in case it does.

# Related Issue(s)
Didn't find any related issues.

# Documentation
  • Loading branch information
gruuya authored and chitralverma committed Mar 17, 2023
1 parent 6109d4b commit 94423ba
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 30 deletions.
13 changes: 13 additions & 0 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,19 @@ pub async fn open_table(table_uri: impl AsRef<str>) -> Result<DeltaTable, DeltaT
Ok(table)
}

/// Same as `open_table`, but also accepts storage options to aid in building the table for a deduced
/// `StorageService`.
pub async fn open_table_with_storage_options(
table_uri: impl AsRef<str>,
storage_options: HashMap<String, String>,
) -> Result<DeltaTable, DeltaTableError> {
let table = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(storage_options)
.load()
.await?;
Ok(table)
}

/// Creates a DeltaTable from the given path and loads it with the metadata from the given version.
/// Infers the storage backend to use from the scheme in the given table path.
pub async fn open_table_with_version(
Expand Down
8 changes: 6 additions & 2 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use object_store::{path::Path, ObjectMeta};
use url::Url;

use crate::Invariant;
use crate::{action, open_table};
use crate::{action, open_table, open_table_with_storage_options};
use crate::{schema, DeltaTableBuilder};
use crate::{DeltaTable, DeltaTableError};

Expand Down Expand Up @@ -866,7 +866,11 @@ impl TableProviderFactory for DeltaTableFactory {
_ctx: &SessionState,
cmd: &CreateExternalTable,
) -> datafusion::error::Result<Arc<dyn TableProvider>> {
let provider = open_table(cmd.to_owned().location).await.unwrap();
let provider = if cmd.options.is_empty() {
open_table(cmd.to_owned().location).await?
} else {
open_table_with_storage_options(cmd.to_owned().location, cmd.to_owned().options).await?
};
Ok(Arc::new(provider))
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub type TestResult = Result<(), Box<dyn std::error::Error + 'static>>;

/// The IntegrationContext provides temporary resources to test against cloud storage services.
pub struct IntegrationContext {
integration: StorageIntegration,
pub integration: StorageIntegration,
bucket: String,
store: Arc<DynObjectStore>,
tmp_dir: TempDir,
Expand Down
16 changes: 16 additions & 0 deletions rust/tests/common/datafusion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use datafusion::datasource::datasource::TableProviderFactory;
use datafusion::execution::context::SessionContext;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::SessionConfig;
use deltalake::delta_datafusion::DeltaTableFactory;
use std::collections::HashMap;
use std::sync::Arc;

pub fn context_with_delta_table_factory() -> SessionContext {
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> = HashMap::new();
table_factories.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {}));
let cfg = RuntimeConfig::new().with_table_factories(table_factories);
let env = RuntimeEnv::new(cfg).unwrap();
let ses = SessionConfig::new();
SessionContext::with_config_rt(ses, Arc::new(env))
}
5 changes: 4 additions & 1 deletion rust/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![deny(warnings)]
#![allow(dead_code)]
#![allow(unused_variables)]

use bytes::Bytes;
use deltalake::action::{self, Add, Remove};
Expand All @@ -15,6 +16,8 @@ use tempdir::TempDir;
#[cfg(feature = "azure")]
pub mod adls;
pub mod clock;
#[cfg(feature = "datafusion-ext")]
pub mod datafusion;
#[cfg(any(feature = "s3", feature = "s3-rustls"))]
pub mod s3;
pub mod schemas;
Expand Down
22 changes: 8 additions & 14 deletions rust/tests/datafusion_test.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,28 @@
#![cfg(feature = "datafusion-ext")]

use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;

use arrow::array::*;
use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema};
use arrow::record_batch::RecordBatch;
use common::datafusion::context_with_delta_table_factory;
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::datasource::TableProviderFactory;
use datafusion::datasource::TableProvider;
use datafusion::execution::context::{SessionContext, TaskContext};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::{common, file_format::ParquetExec, metrics::Label};
use datafusion::physical_plan::{common::collect, file_format::ParquetExec, metrics::Label};
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor};
use datafusion::prelude::SessionConfig;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::{Column, DataFusionError, Result};
use datafusion_expr::Expr;

use deltalake::action::SaveMode;
use deltalake::delta_datafusion::DeltaTableFactory;
use deltalake::{operations::DeltaOps, DeltaTable, Schema};

mod common;

fn get_scanned_files(node: &dyn ExecutionPlan) -> HashSet<Label> {
node.metrics()
.unwrap()
Expand Down Expand Up @@ -86,12 +85,7 @@ async fn prepare_table(

#[tokio::test]
async fn test_datafusion_sql_registration() -> Result<()> {
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> = HashMap::new();
table_factories.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {}));
let cfg = RuntimeConfig::new().with_table_factories(table_factories);
let env = RuntimeEnv::new(cfg).unwrap();
let ses = SessionConfig::new();
let ctx = SessionContext::with_config_rt(ses, Arc::new(env));
let ctx = context_with_delta_table_factory();

let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
d.push("tests/data/delta-0.8.0-partitioned");
Expand Down Expand Up @@ -260,7 +254,7 @@ async fn test_files_scanned() -> Result<()> {
let plan = CoalescePartitionsExec::new(plan.clone());

let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
let _ = common::collect(plan.execute(0, task_ctx)?).await?;
let _ = collect(plan.execute(0, task_ctx)?).await?;

let mut metrics = ExecutionMetricsCollector::default();
visit_execution_plan(&plan, &mut metrics).unwrap();
Expand All @@ -273,7 +267,7 @@ async fn test_files_scanned() -> Result<()> {

let plan = CoalescePartitionsExec::new(table.scan(&ctx.state(), None, &[filter], None).await?);
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
let _result = common::collect(plan.execute(0, task_ctx)?).await?;
let _result = collect(plan.execute(0, task_ctx)?).await?;

let mut metrics = ExecutionMetricsCollector::default();
visit_execution_plan(&plan, &mut metrics).unwrap();
Expand Down
35 changes: 23 additions & 12 deletions rust/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#![cfg(all(feature = "integration_test", feature = "datafusion-ext"))]

use arrow::array::Int64Array;
use datafusion::execution::context::SessionContext;
use common::datafusion::context_with_delta_table_factory;
use deltalake::test_utils::{IntegrationContext, StorageIntegration, TestResult, TestTables};
use deltalake::DeltaTableBuilder;
use maplit::hashmap;
use serial_test::serial;
use std::sync::Arc;

mod common;

#[tokio::test]
#[serial]
async fn test_datafusion_local() -> TestResult {
Expand Down Expand Up @@ -47,16 +47,27 @@ async fn test_datafusion(storage: StorageIntegration) -> TestResult {
async fn simple_query(context: &IntegrationContext) -> TestResult {
let table_uri = context.uri_for_table(TestTables::Simple);

let table = DeltaTableBuilder::from_uri(table_uri)
.with_allow_http(true)
.with_storage_options(hashmap! {
"DYNAMO_LOCK_OWNER_NAME".to_string() => "s3::deltars/simple".to_string(),
})
.load()
.await?;
let dynamo_lock_option = "'DYNAMO_LOCK_OWNER_NAME' 's3::deltars/simple'".to_string();
let options = match context.integration {
StorageIntegration::Amazon => format!("'AWS_STORAGE_ALLOW_HTTP' '1', {dynamo_lock_option}"),
StorageIntegration::Microsoft => {
format!("'AZURE_STORAGE_ALLOW_HTTP' '1', {dynamo_lock_option}")
}
_ => dynamo_lock_option,
};

let sql = format!(
"CREATE EXTERNAL TABLE demo \
STORED AS DELTATABLE \
OPTIONS ({options}) \
LOCATION '{table_uri}'",
);

let ctx = SessionContext::new();
ctx.register_table("demo", Arc::new(table))?;
let ctx = context_with_delta_table_factory();
let _ = ctx
.sql(sql.as_str())
.await
.expect("Failed to register table!");

let batches = ctx
.sql("SELECT id FROM demo WHERE id > 5 ORDER BY id ASC")
Expand Down

0 comments on commit 94423ba

Please sign in to comment.