From 5d99efe273569f6cbd26a4f4a0e0c1e590e74b02 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 14 Jan 2023 15:20:00 +0100 Subject: [PATCH 01/11] feat: improve table_uri handling --- rust/src/builder.rs | 256 ++++++++----------------- rust/src/delta.rs | 11 ++ rust/src/delta_datafusion.rs | 27 +-- rust/src/operations/create.rs | 40 ++-- rust/src/operations/load.rs | 8 +- rust/src/storage/config.rs | 344 ++++++++++++++++------------------ rust/src/storage/mod.rs | 264 ++++++++++++++------------ rust/src/storage/s3.rs | 2 +- rust/src/storage/utils.rs | 8 + 9 files changed, 441 insertions(+), 519 deletions(-) diff --git a/rust/src/builder.rs b/rust/src/builder.rs index 2e6213db50..028becaa7b 100644 --- a/rust/src/builder.rs +++ b/rust/src/builder.rs @@ -5,26 +5,14 @@ use std::sync::Arc; use crate::delta::{DeltaResult, DeltaTable, DeltaTableError}; use crate::schema::DeltaDataTypeVersion; -use crate::storage::config::{StorageLocation, StorageOptions}; -use crate::storage::file::FileStorageBackend; +use crate::storage::config::StorageOptions; use crate::storage::{DeltaObjectStore, ObjectStoreRef}; use chrono::{DateTime, FixedOffset, Utc}; -use object_store::memory::InMemory; -use object_store::path::Path; -use object_store::{DynObjectStore, Error as ObjectStoreError, Result as ObjectStoreResult}; +use object_store::DynObjectStore; use serde::{Deserialize, Serialize}; use url::Url; -#[cfg(any(feature = "s3", feature = "s3-rustls"))] -use crate::storage::s3::{S3StorageBackend, S3StorageOptions}; -#[cfg(any(feature = "s3", feature = "s3-rustls"))] -use object_store::aws::AmazonS3Builder; -#[cfg(feature = "azure")] -use object_store::azure::MicrosoftAzureBuilder; -#[cfg(feature = "gcs")] -use object_store::gcp::GoogleCloudStorageBuilder; - #[allow(dead_code)] #[derive(Debug, thiserror::Error)] enum BuilderError { @@ -38,6 +26,8 @@ enum BuilderError { Decode(String), #[error("Delta-rs must be build with feature '{feature}' to support url: {url}.")] MissingFeature { feature: &'static str, url: String }, + #[error("Failed to parse table uri")] + TableUri(#[from] url::ParseError), } impl From for DeltaTableError { @@ -98,7 +88,7 @@ pub struct DeltaTableLoadOptions { /// table root uri pub table_uri: String, /// backend to access storage system - pub storage_backend: Option<(Arc, Path)>, + pub storage_backend: Option<(Arc, Url)>, /// specify the version we are going to load: a time stamp, a version, or just the newest /// available version pub version: DeltaVersion, @@ -168,7 +158,7 @@ impl DeltaTableBuilder { } /// specify the timestamp given as ISO-8601/RFC-3339 timestamp - pub fn with_datestring(self, date_string: impl AsRef) -> Result { + pub fn with_datestring(self, date_string: impl AsRef) -> DeltaResult { let datetime = DateTime::::from(DateTime::::parse_from_rfc3339( date_string.as_ref(), )?); @@ -183,13 +173,14 @@ impl DeltaTableBuilder { /// Set the storage backend. /// - /// `table_root` denotes the [object_store::path::Path] within the store to the root of the delta. - /// This is required since we cannot infer the relative location of the table from the `table_uri` - /// For non-standard object store implementations. - /// /// If a backend is not provided then it is derived from `table_uri`. - pub fn with_storage_backend(mut self, storage: Arc, table_root: &Path) -> Self { - self.options.storage_backend = Some((storage, table_root.clone())); + /// + /// # Arguments + /// + /// * `storage` - A shared reference to an [`ObjectStore`](object_store::ObjectStore) with "/" pointing at delta table root (i.e. where `_delta_log` is located). + /// * `location` - A url corresponding to the storagle location of `storage`. + pub fn with_storage_backend(mut self, storage: Arc, location: Url) -> Self { + self.options.storage_backend = Some((storage, location)); self } @@ -214,53 +205,46 @@ impl DeltaTableBuilder { self } - /// Build a delta storage backend for the given config - pub fn build_storage(self) -> Result { - let (storage, storage_url) = match self.options.storage_backend { - // Some(storage) => storage, - None => get_storage_backend( - &self.options.table_uri, - self.storage_options, - self.allow_http, - )?, - _ => todo!(), + /// Storage options for configuring backend object store + pub fn storage_options(&self) -> StorageOptions { + let mut storage_options = self.storage_options.clone().unwrap_or_default(); + if let Some(allow) = self.allow_http { + storage_options.insert( + "allow_http".into(), + if allow { "true" } else { "false" }.into(), + ); }; - let object_store = Arc::new(DeltaObjectStore::new(storage_url, storage)); - Ok(object_store) + storage_options.into() + } + + /// Build a delta storage backend for the given config + pub fn build_storage(self) -> DeltaResult { + match self.options.storage_backend { + Some((storage, location)) => Ok(Arc::new(DeltaObjectStore::new(storage, location))), + None => { + let location = ensure_table_uri(&self.options.table_uri)?; + Ok(Arc::new(DeltaObjectStore::try_new( + location, + self.storage_options(), + )?)) + } + } } /// Build the [`DeltaTable`] from specified options. /// /// This will not load the log, i.e. the table is not initialized. To get an initialized /// table use the `load` function - pub fn build(self) -> Result { - let (storage, storage_url) = match self.options.storage_backend { - Some((store, path)) => { - let mut uri = self.options.table_uri + path.as_ref(); - if !uri.contains(':') { - uri = format!("file://{}", uri); - } - let url = Url::parse(uri.as_str()) - .map_err(|_| DeltaTableError::Generic(format!("Can't parse uri: {}", uri)))?; - let url = StorageLocation::new(url); - (store, url) - } - None => get_storage_backend( - &self.options.table_uri, - self.storage_options, - self.allow_http, - )?, - }; + pub fn build(self) -> DeltaResult { let config = DeltaTableConfig { require_tombstones: self.options.require_tombstones, require_files: self.options.require_files, }; - let object_store = Arc::new(DeltaObjectStore::new(storage_url, storage)); - Ok(DeltaTable::new(object_store, config)) + Ok(DeltaTable::new(self.build_storage()?, config)) } /// Build the [`DeltaTable`] and load its state - pub async fn load(self) -> Result { + pub async fn load(self) -> DeltaResult { let version = self.options.version.clone(); let mut table = self.build()?; match version { @@ -272,132 +256,6 @@ impl DeltaTableBuilder { } } -enum ObjectStoreKind { - Local, - InMemory, - S3, - Google, - Azure, -} - -impl ObjectStoreKind { - pub fn parse_url(url: &Url) -> ObjectStoreResult { - match url.scheme() { - "file" => Ok(ObjectStoreKind::Local), - "memory" => Ok(ObjectStoreKind::InMemory), - "az" | "abfs" | "abfss" | "azure" | "wasb" | "adl" => Ok(ObjectStoreKind::Azure), - "s3" | "s3a" => Ok(ObjectStoreKind::S3), - "gs" => Ok(ObjectStoreKind::Google), - "https" => { - let host = url.host_str().unwrap_or_default(); - if host.contains("amazonaws.com") { - Ok(ObjectStoreKind::S3) - } else if host.contains("dfs.core.windows.net") - || host.contains("blob.core.windows.net") - { - Ok(ObjectStoreKind::Azure) - } else { - Err(ObjectStoreError::NotImplemented) - } - } - _ => Err(ObjectStoreError::NotImplemented), - } - } -} - -/// Create a new storage backend used in Delta table -pub(crate) fn get_storage_backend( - table_uri: impl AsRef, - // annotation needed for some feature builds - #[allow(unused_variables)] options: Option>, - #[allow(unused_variables)] allow_http: Option, -) -> DeltaResult<(Arc, StorageLocation)> { - let storage_url = StorageLocation::parse(table_uri)?; - let mut options = options.unwrap_or_default(); - if let Some(allow) = allow_http { - options.insert( - "allow_http".into(), - if allow { "true" } else { "false" }.into(), - ); - } - let _options = StorageOptions::new(options); - - match ObjectStoreKind::parse_url(&storage_url.url)? { - ObjectStoreKind::Local => Ok((Arc::new(FileStorageBackend::new()), storage_url)), - ObjectStoreKind::InMemory => Ok((Arc::new(InMemory::new()), storage_url)), - #[cfg(any(feature = "s3", feature = "s3-rustls"))] - ObjectStoreKind::S3 => { - let store = AmazonS3Builder::new() - .with_url(storage_url.as_ref()) - .try_with_options(&_options.as_s3_options())? - .with_allow_http(_options.allow_http()) - .build() - .or_else(|_| { - AmazonS3Builder::from_env() - .with_url(storage_url.as_ref()) - .try_with_options(&_options.as_s3_options())? - .with_allow_http(_options.allow_http()) - .build() - })?; - Ok(( - Arc::new(S3StorageBackend::try_new( - Arc::new(store), - S3StorageOptions::from_map(&_options.0), - )?), - storage_url, - )) - } - #[cfg(not(any(feature = "s3", feature = "s3-rustls")))] - ObjectStoreKind::S3 => Err(BuilderError::MissingFeature { - feature: "s3", - url: storage_url.as_ref().into(), - } - .into()), - #[cfg(feature = "azure")] - ObjectStoreKind::Azure => { - let store = MicrosoftAzureBuilder::new() - .with_url(storage_url.as_ref()) - .try_with_options(&_options.as_azure_options())? - .with_allow_http(_options.allow_http()) - .build() - .or_else(|_| { - MicrosoftAzureBuilder::from_env() - .with_url(storage_url.as_ref()) - .try_with_options(&_options.as_azure_options())? - .with_allow_http(_options.allow_http()) - .build() - })?; - Ok((Arc::new(store), storage_url)) - } - #[cfg(not(feature = "azure"))] - ObjectStoreKind::Azure => Err(BuilderError::MissingFeature { - feature: "azure", - url: storage_url.as_ref().into(), - } - .into()), - #[cfg(feature = "gcs")] - ObjectStoreKind::Google => { - let store = GoogleCloudStorageBuilder::new() - .with_url(storage_url.as_ref()) - .try_with_options(&_options.as_gcs_options())? - .build() - .or_else(|_| { - GoogleCloudStorageBuilder::from_env() - .with_url(storage_url.as_ref()) - .try_with_options(&_options.as_gcs_options())? - .build() - })?; - Ok((Arc::new(store), storage_url)) - } - #[cfg(not(feature = "gcs"))] - ObjectStoreKind::Google => Err(BuilderError::MissingFeature { - feature: "gcs", - url: storage_url.as_ref().into(), - } - .into()), - } -} - /// Storage option keys to use when creating [crate::storage::s3::S3StorageOptions]. /// The same key should be used whether passing a key in the hashmap or setting it as an environment variable. /// Provided keys may include configuration for the S3 backend and also the optional DynamoDb lock used for atomic rename. @@ -432,7 +290,7 @@ pub mod s3_storage_options { /// Hence, the `connection closed before message completed` could occur. /// To avoid that, the default value of this setting is 15 seconds if it's not set otherwise. pub const AWS_S3_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_S3_POOL_IDLE_TIMEOUT_SECONDS"; - /// The `pool_idle_timeout` for the as3_storage_optionsws sts client. See + /// The `pool_idle_timeout` for the as3_storage_options sts client. See /// the reasoning in `AWS_S3_POOL_IDLE_TIMEOUT_SECONDS`. pub const AWS_STS_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_STS_POOL_IDLE_TIMEOUT_SECONDS"; /// The number of retries for S3 GET requests failed with 500 Internal Server Error. @@ -488,3 +346,37 @@ pub(crate) fn str_option(map: &HashMap, key: &str) -> Option) -> DeltaResult { + let table_uri = table_uri.as_ref(); + if let Ok(path) = std::fs::canonicalize(table_uri) { + return Ok(Url::from_directory_path(path) + .map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()))?); + } + if let Ok(url) = Url::parse(table_uri) { + return Ok(url); + } + // The table uri still might be a relative paths that does not exist. + std::fs::create_dir_all(table_uri) + .map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()))?; + let path = std::fs::canonicalize(table_uri) + .map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()))?; + Url::from_directory_path(path) + .map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string())) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ensure_table_uri() { + // parse an exisiting relative directory + let uri = ensure_table_uri("."); + assert!(uri.is_ok()); + let _uri = ensure_table_uri("./nonexistent"); + assert!(uri.is_ok()); + let uri = ensure_table_uri("s3://container/path"); + assert!(uri.is_ok()); + } +} diff --git a/rust/src/delta.rs b/rust/src/delta.rs index e290886479..c26d69b5a1 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -199,6 +199,17 @@ pub enum DeltaTableError { /// Error returned when user attempts to commit actions that don't belong to the next version. #[error("Delta transaction failed, version {0} does not follow {1}")] VersionMismatch(DeltaDataTypeVersion, DeltaDataTypeVersion), + /// A Feature is missing to perform operation + #[error("Delta-rs must be build with feature '{feature}' to support loading from: {url}.")] + MissingFeature { + /// Name of the missiing feature + feature: &'static str, + /// Storage location url + url: String, + }, + /// A Feature is missing to perform operation + #[error("Cannot infer storage location from: {0}")] + InvalidTableLocation(String), /// Generic Delta Table error #[error("Generic DeltaTable error: {0}")] Generic(String), diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index ab25bd66b9..e415293d8f 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -56,6 +56,7 @@ use datafusion_proto::physical_plan::PhysicalExtensionCodec; use object_store::{path::Path, ObjectMeta}; use url::Url; +use crate::builder::ensure_table_uri; use crate::Invariant; use crate::{action, open_table, open_table_with_storage_options}; use crate::{schema, DeltaTableBuilder}; @@ -410,13 +411,11 @@ impl TableProvider for DeltaTable { filters, ) .await?; - let mut url = self.table_uri(); - if url.ends_with(':') { - url += "//"; // table_uri() trims slashes from `memory://` so add them back - } - let delta_scan = DeltaScan { url, parquet_scan }; - Ok(Arc::new(delta_scan)) + Ok(Arc::new(DeltaScan { + url: ensure_table_uri(&self.table_uri())?.as_str().into(), + parquet_scan, + })) } fn as_any(&self) -> &dyn Any { @@ -467,15 +466,19 @@ impl ExecutionPlan for DeltaScan { partition: usize, context: Arc, ) -> DataFusionResult { - let url = self.url.as_str(); - let url = ListingTableUrl::parse(url)?; - let storage = context.runtime_env().object_store_registry.get_by_url(url); - let mut table = DeltaTableBuilder::from_uri(self.url.clone()); + let df_url = ListingTableUrl::parse(self.url.as_str())?; + let storage = context + .runtime_env() + .object_store_registry + .get_by_url(df_url); + let mut table = DeltaTableBuilder::from_uri(&self.url); if let Ok(storage) = storage { // When running in ballista, the store will be deserialized and re-created // When testing with a MemoryStore, it will already be present and we should re-use it - let path = &Path::parse("")?; - table = table.with_storage_backend(storage, path); + table = table.with_storage_backend( + storage, + Url::parse(&self.url).map_err(|err| DataFusionError::Internal(err.to_string()))?, + ); } let table = table.build()?; register_store(&table, context.runtime_env()); diff --git a/rust/src/operations/create.rs b/rust/src/operations/create.rs index ff646698bf..0842d1628d 100644 --- a/rust/src/operations/create.rs +++ b/rust/src/operations/create.rs @@ -1,20 +1,18 @@ //! Command for creating a new delta table // https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala -use std::collections::HashMap; -use std::sync::Arc; - -use crate::{DeltaTableBuilder, DeltaTableMetaData}; - use super::transaction::commit; use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION}; use crate::action::{Action, DeltaOperation, MetaData, Protocol, SaveMode}; +use crate::builder::ensure_table_uri; use crate::schema::{SchemaDataType, SchemaField, SchemaTypeStruct}; -use crate::storage::config::StorageLocation; use crate::storage::DeltaObjectStore; use crate::{DeltaResult, DeltaTable, DeltaTableError}; +use crate::{DeltaTableBuilder, DeltaTableMetaData}; use futures::future::BoxFuture; use serde_json::{Map, Value}; +use std::collections::HashMap; +use std::sync::Arc; #[derive(thiserror::Error, Debug)] enum CreateError { @@ -209,20 +207,20 @@ impl CreateBuilder { return Err(CreateError::MissingSchema.into()); } - let (table, storage_url) = if let Some(object_store) = self.object_store { - let storage_url = StorageLocation::parse(object_store.root_uri())?; + let (storage_url, table) = if let Some(object_store) = self.object_store { ( + ensure_table_uri(object_store.root_uri())? + .as_str() + .to_string(), DeltaTable::new(object_store, Default::default()), - storage_url, ) } else { - let storage_url = - StorageLocation::parse(self.location.ok_or(CreateError::MissingLocation)?)?; + let storage_url = ensure_table_uri(self.location.ok_or(CreateError::MissingLocation)?)?; ( + storage_url.as_str().to_string(), DeltaTableBuilder::from_uri(&storage_url) .with_storage_options(self.storage_options.unwrap_or_default()) .build()?, - storage_url, ) }; @@ -253,7 +251,7 @@ impl CreateBuilder { let operation = DeltaOperation::Create { mode: self.mode.clone(), metadata: metadata.clone(), - location: storage_url.to_string(), + location: storage_url, protocol: protocol.clone(), }; @@ -331,6 +329,22 @@ mod tests { assert_eq!(table.get_metadata().unwrap().schema, table_schema) } + #[tokio::test] + async fn test_create_local_relative_path() { + let table_schema = get_delta_schema(); + + let table = DeltaOps::try_from_uri("./table") + .await + .unwrap() + .create() + .with_columns(table_schema.get_fields().clone()) + .with_save_mode(SaveMode::Ignore) + .await + .unwrap(); + assert_eq!(table.version(), 0); + assert_eq!(table.get_metadata().unwrap().schema, table_schema) + } + #[tokio::test] async fn test_create_table_metadata() { let schema = get_delta_schema(); diff --git a/rust/src/operations/load.rs b/rust/src/operations/load.rs index 70f825f994..f49929ca49 100644 --- a/rust/src/operations/load.rs +++ b/rust/src/operations/load.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use crate::storage::DeltaObjectStore; -use crate::{DeltaResult, DeltaTable}; +use crate::{DeltaResult, DeltaTable, DeltaTableError}; use datafusion::datasource::TableProvider; use datafusion::execution::context::{SessionContext, TaskContext}; @@ -77,8 +77,10 @@ impl std::future::IntoFuture for LoadBuilder { Box::pin(async move { let object_store = this.object_store.unwrap(); - let scheme = object_store.root_uri(); // underlying store, i.e. `memory:` - let scheme = scheme.trim_end_matches(':'); + let url = url::Url::parse(&object_store.root_uri()) + .map_err(|_| DeltaTableError::InvalidTableLocation(object_store.root_uri()))?; + let scheme = url.scheme(); + let store = object_store.storage_backend().clone(); let mut table = DeltaTable::new(object_store, Default::default()); table.load().await?; diff --git a/rust/src/storage/config.rs b/rust/src/storage/config.rs index 8451984d08..87a25b067f 100644 --- a/rust/src/storage/config.rs +++ b/rust/src/storage/config.rs @@ -1,19 +1,24 @@ //! Configuration handling for defining Storage backends for DeltaTables. +use super::file::FileStorageBackend; +use super::utils::str_is_truthy; +use crate::{DeltaResult, DeltaTableError}; +use object_store::memory::InMemory; use object_store::path::Path; -use object_store::{Error as ObjectStoreError, Result as ObjectStoreResult}; -use serde::de::{Error, SeqAccess, Visitor}; -use serde::ser::SerializeSeq; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use object_store::prefix::PrefixObjectStore; +use object_store::DynObjectStore; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::fmt; +use std::sync::Arc; use url::Url; #[cfg(any(feature = "s3", feature = "s3-rustls"))] -use object_store::aws::AmazonS3ConfigKey; +use super::s3::{S3StorageBackend, S3StorageOptions}; +#[cfg(any(feature = "s3", feature = "s3-rustls"))] +use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey}; #[cfg(feature = "azure")] -use object_store::azure::AzureConfigKey; +use object_store::azure::{AzureConfigKey, MicrosoftAzure, MicrosoftAzureBuilder}; #[cfg(feature = "gcs")] -use object_store::gcp::GoogleConfigKey; +use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder, GoogleConfigKey}; #[cfg(any( feature = "s3", feature = "s3-rustls", @@ -23,6 +28,7 @@ use object_store::gcp::GoogleConfigKey; use std::str::FromStr; /// Options used for configuring backend storage +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct StorageOptions(pub HashMap); impl StorageOptions { @@ -41,7 +47,7 @@ impl StorageOptions { Self(options) } - /// Denotes if unsecure connections are configures to be allowed + /// Denotes if unsecure connections via http are allowed pub fn allow_http(&self) -> bool { self.0.iter().any(|(key, value)| { key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value) @@ -85,197 +91,165 @@ impl StorageOptions { } } -/// A parsed URL identifying a storage location -/// for more information on the supported expressions -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct StorageLocation { - /// A URL that identifies a file or directory to list files from - pub(crate) url: Url, - /// The path prefix - pub(crate) prefix: Path, +impl From> for StorageOptions { + fn from(value: HashMap) -> Self { + Self::new(value) + } } -impl StorageLocation { - /// Parse a provided string as a `StorageLocation` - /// - /// # Paths without a Scheme - /// - /// If no scheme is provided, or the string is an absolute filesystem path - /// as determined [`std::path::Path::is_absolute`], the string will be - /// interpreted as a path on the local filesystem using the operating - /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix. - /// - /// Otherwise, the path will be resolved to an absolute path, returning - /// an error if it does not exist, and converted to a [file URI] - /// - /// If you wish to specify a path that does not exist on the local - /// machine you must provide it as a fully-qualified [file URI] - /// e.g. `file:///myfile.txt` - /// - /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme - /// - /// # Well-known formats - /// - /// The lists below enumerates some well known uris, that are understood by the - /// parse function. We parse uris to refer to a specific storage location, which - /// is accessed using the internal storage backends. - /// - /// ## Azure - /// - /// URIs according to : - /// - /// * az:/// - /// * adl:/// - /// * abfs(s):/// - /// - /// URIs according to : - /// - /// * abfs(s)://@.dfs.core.windows.net/ - /// - /// and a custom one - /// - /// * azure:/// - /// - /// ## S3 - /// * s3:/// - /// * s3a:/// - /// - /// ## GCS - /// * gs:/// - pub fn parse(s: impl AsRef) -> ObjectStoreResult { - let s = s.as_ref(); - - // This is necessary to handle the case of a path starting with a drive letter - if std::path::Path::new(s).is_absolute() { - return Self::parse_path(s); - } +pub(crate) enum ObjectStoreImpl { + Local(FileStorageBackend), + InMemory(InMemory), + #[cfg(any(feature = "s3", feature = "s3-rustls"))] + S3(S3StorageBackend), + #[cfg(feature = "gcs")] + Google(GoogleCloudStorage), + #[cfg(feature = "azure")] + Azure(MicrosoftAzure), +} - match Url::parse(s) { - Ok(url) => Ok(Self::new(url)), - Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s), - Err(e) => Err(ObjectStoreError::Generic { - store: "DeltaObjectStore", - source: Box::new(e), - }), +impl ObjectStoreImpl { + pub(crate) fn into_prefix(self, prefix: Path) -> Arc { + match self { + ObjectStoreImpl::Local(store) => Arc::new(PrefixObjectStore::new(store, prefix)), + ObjectStoreImpl::InMemory(store) => Arc::new(PrefixObjectStore::new(store, prefix)), + #[cfg(feature = "azure")] + ObjectStoreImpl::Azure(store) => Arc::new(PrefixObjectStore::new(store, prefix)), + #[cfg(any(feature = "s3", feature = "s3-rustls"))] + ObjectStoreImpl::S3(store) => Arc::new(PrefixObjectStore::new(store, prefix)), + #[cfg(feature = "gcs")] + ObjectStoreImpl::Google(store) => Arc::new(PrefixObjectStore::new(store, prefix)), } } - /// Creates a new [`StorageLocation`] from an url - pub fn new(url: Url) -> Self { - let prefix = Path::parse(url.path()).expect("should be URL safe"); - Self { url, prefix } - } - - /// Creates a new [`StorageUrl`] interpreting `s` as a filesystem path - fn parse_path(s: &str) -> ObjectStoreResult { - let path = - std::path::Path::new(s) - .canonicalize() - .map_err(|e| ObjectStoreError::Generic { - store: "DeltaObjectStore", - source: Box::new(e), - })?; - let url = match path.is_file() { - true => Url::from_file_path(path).unwrap(), - false => Url::from_directory_path(path).unwrap(), - }; - - Ok(Self::new(url)) - } - - /// Returns the URL scheme - pub fn scheme(&self) -> &str { - self.url.scheme() - } - - /// Create the full path from a path relative to prefix - pub fn full_path(&self, location: &Path) -> Path { - self.prefix.parts().chain(location.parts()).collect() - } - - /// Strip the constant prefix from a given path - pub fn strip_prefix(&self, path: &Path) -> Option { - Some(path.prefix_match(&self.prefix)?.collect()) - } - - /// convert a table [Path] to a fully qualified uri - pub fn to_uri(&self, location: &Path) -> String { - let uri = match self.scheme() { - "file" | "" => { - // On windows the drive (e.g. 'c:') is part of root and must not be prefixed. - #[cfg(windows)] - let os_uri = format!("{}/{}", self.prefix, location.as_ref()); - #[cfg(unix)] - let os_uri = format!("/{}/{}", self.prefix, location.as_ref()); - os_uri - } - _ => format!("{}/{}", self.as_ref(), location.as_ref()), - }; - uri.trim_end_matches('/').into() - } -} - -impl AsRef for StorageLocation { - fn as_ref(&self) -> &str { - self.url.as_ref() + pub(crate) fn into_store(self) -> Arc { + match self { + ObjectStoreImpl::Local(store) => Arc::new(store), + ObjectStoreImpl::InMemory(store) => Arc::new(store), + #[cfg(feature = "azure")] + ObjectStoreImpl::Azure(store) => Arc::new(store), + #[cfg(any(feature = "s3", feature = "s3-rustls"))] + ObjectStoreImpl::S3(store) => Arc::new(store), + #[cfg(feature = "gcs")] + ObjectStoreImpl::Google(store) => Arc::new(store), + } } } -impl std::fmt::Display for StorageLocation { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.as_ref().fmt(f) - } +pub(crate) enum ObjectStoreKind { + Local, + InMemory, + S3, + Google, + Azure, } -impl Serialize for StorageLocation { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let mut seq = serializer.serialize_seq(None)?; - seq.serialize_element(self.url.as_str())?; - seq.serialize_element(&self.prefix.to_string())?; - seq.end() +impl ObjectStoreKind { + pub fn parse_url(url: &Url) -> DeltaResult { + match url.scheme() { + "file" => Ok(ObjectStoreKind::Local), + "memory" => Ok(ObjectStoreKind::InMemory), + "az" | "abfs" | "abfss" | "azure" | "wasb" | "adl" => Ok(ObjectStoreKind::Azure), + "s3" | "s3a" => Ok(ObjectStoreKind::S3), + "gs" => Ok(ObjectStoreKind::Google), + "https" => { + let host = url.host_str().unwrap_or_default(); + if host.contains("amazonaws.com") { + Ok(ObjectStoreKind::S3) + } else if host.contains("dfs.core.windows.net") + || host.contains("blob.core.windows.net") + { + Ok(ObjectStoreKind::Azure) + } else { + Err(DeltaTableError::Generic(format!( + "unsupported url: {}", + url.as_str() + ))) + } + } + _ => Err(DeltaTableError::Generic(format!( + "unsupported url: {}", + url.as_str() + ))), + } } -} - -impl<'de> Deserialize<'de> for StorageLocation { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - struct StorageLocationVisitor {} - impl<'de> Visitor<'de> for StorageLocationVisitor { - type Value = StorageLocation; - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("struct StorageUrl") + pub fn into_impl( + self, + storage_url: impl AsRef, + options: impl Into, + ) -> DeltaResult { + let _options = options.into(); + match self { + ObjectStoreKind::Local => Ok(ObjectStoreImpl::Local(FileStorageBackend::new())), + ObjectStoreKind::InMemory => Ok(ObjectStoreImpl::InMemory(InMemory::new())), + #[cfg(any(feature = "s3", feature = "s3-rustls"))] + ObjectStoreKind::S3 => { + let store = AmazonS3Builder::new() + .with_url(storage_url.as_ref()) + .try_with_options(&_options.as_s3_options())? + .with_allow_http(_options.allow_http()) + .build() + .or_else(|_| { + AmazonS3Builder::from_env() + .with_url(storage_url.as_ref()) + .try_with_options(&_options.as_s3_options())? + .with_allow_http(_options.allow_http()) + .build() + })?; + Ok(ObjectStoreImpl::S3(S3StorageBackend::try_new( + Arc::new(store), + S3StorageOptions::from_map(&_options.0), + )?)) } - - fn visit_seq(self, mut seq: V) -> Result - where - V: SeqAccess<'de>, - { - let url = seq - .next_element()? - .ok_or_else(|| V::Error::invalid_length(0, &self))?; - let prefix: &str = seq - .next_element()? - .ok_or_else(|| V::Error::invalid_length(1, &self))?; - let url = Url::parse(url).map_err(|_| V::Error::missing_field("url"))?; - let prefix = Path::parse(prefix).map_err(|_| V::Error::missing_field("prefix"))?; - let url = StorageLocation { url, prefix }; - Ok(url) + #[cfg(not(any(feature = "s3", feature = "s3-rustls")))] + ObjectStoreKind::S3 => Err(DeltaTableError::MissingFeature { + feature: "s3", + url: storage_url.as_ref().into(), + } + .into()), + #[cfg(feature = "azure")] + ObjectStoreKind::Azure => { + let store = MicrosoftAzureBuilder::new() + .with_url(storage_url.as_ref()) + .try_with_options(&_options.as_azure_options())? + .with_allow_http(_options.allow_http()) + .build() + .or_else(|_| { + MicrosoftAzureBuilder::from_env() + .with_url(storage_url.as_ref()) + .try_with_options(&_options.as_azure_options())? + .with_allow_http(_options.allow_http()) + .build() + })?; + Ok(ObjectStoreImpl::Azure(store)) + } + #[cfg(not(feature = "azure"))] + ObjectStoreKind::Azure => Err(DeltaTableError::MissingFeature { + feature: "azure", + url: storage_url.as_ref().into(), } + .into()), + #[cfg(feature = "gcs")] + ObjectStoreKind::Google => { + let store = GoogleCloudStorageBuilder::new() + .with_url(storage_url.as_ref()) + .try_with_options(&_options.as_gcs_options())? + .build() + .or_else(|_| { + GoogleCloudStorageBuilder::from_env() + .with_url(storage_url.as_ref()) + .try_with_options(&_options.as_gcs_options())? + .build() + })?; + Ok(ObjectStoreImpl::Google(store)) + } + #[cfg(not(feature = "gcs"))] + ObjectStoreKind::Google => Err(DeltaTableError::MissingFeature { + feature: "gcs", + url: storage_url.as_ref().into(), + } + .into()), } - deserializer.deserialize_seq(StorageLocationVisitor {}) } } - -pub(crate) fn str_is_truthy(val: &str) -> bool { - val.eq_ignore_ascii_case("1") - | val.eq_ignore_ascii_case("true") - | val.eq_ignore_ascii_case("on") - | val.eq_ignore_ascii_case("yes") - | val.eq_ignore_ascii_case("y") -} diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index d5e1ef1457..12a416c8f7 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -4,27 +4,33 @@ pub mod config; pub mod file; pub mod utils; -#[cfg(any(feature = "s3", feature = "s3-rustls"))] -pub mod s3; +use self::config::{ObjectStoreKind, StorageOptions}; +use crate::DeltaResult; -use crate::storage::config::StorageLocation; use bytes::Bytes; -use futures::{stream::BoxStream, StreamExt, TryStreamExt}; +use futures::{stream::BoxStream, StreamExt}; use lazy_static::lazy_static; -pub use object_store::{ - path::{Path, DELIMITER}, - DynObjectStore, Error as ObjectStoreError, GetResult, ListResult, MultipartId, ObjectMeta, - ObjectStore, Result as ObjectStoreResult, -}; +use serde::de::{Error, SeqAccess, Visitor}; +use serde::ser::SerializeSeq; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use std::collections::HashMap; +use std::fmt; use std::ops::Range; use std::sync::Arc; use tokio::io::AsyncWrite; +use url::Url; + +#[cfg(any(feature = "s3", feature = "s3-rustls"))] +pub mod s3; -use crate::get_storage_backend; #[cfg(feature = "datafusion")] use datafusion::datasource::object_store::ObjectStoreUrl; -use serde::de::Error; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +pub use object_store::path::{Path, DELIMITER}; +pub use object_store::{ + DynObjectStore, Error as ObjectStoreError, GetResult, ListResult, MultipartId, ObjectMeta, + ObjectStore, Result as ObjectStoreResult, +}; lazy_static! { static ref DELTA_LOG_PATH: Path = Path::from("_delta_log"); @@ -43,37 +49,11 @@ pub type ObjectStoreRef = Arc; /// All [Path] are reported relative to the table root. #[derive(Debug, Clone)] pub struct DeltaObjectStore { - storage: Arc, - location: StorageLocation, -} - -impl Serialize for DeltaObjectStore { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - self.location.serialize(serializer) - } -} - -impl<'de> Deserialize<'de> for DeltaObjectStore { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let config = StorageLocation::deserialize(deserializer)?; - let (storage, storage_url) = get_storage_backend( - config.clone(), - None, // TODO: config options - Some(true), // TODO: this isn't preserved after builder stage - ) - .map_err(|_| D::Error::missing_field("storage"))?; - let storage = Arc::new(DeltaObjectStore::new(storage_url, storage)); - Ok(DeltaObjectStore { - storage, - location: config, - }) - } + storage: Arc, + location: Url, + options: StorageOptions, + #[allow(unused)] + prefix: Path, } impl std::fmt::Display for DeltaObjectStore { @@ -83,22 +63,56 @@ impl std::fmt::Display for DeltaObjectStore { } impl DeltaObjectStore { - /// Create new DeltaObjectStore - pub fn new(storage_url: StorageLocation, storage: Arc) -> Self { + /// Create a new instance of [`DeltaObjectStore`] + /// + /// # Arguemnts + /// + /// * `storage` - A shared reference to an [`ObjectStore`](object_store::ObjectStore) with "/" pointing at delta table root (i.e. where `_delta_log` is located). + /// * `location` - A url corresponding to the storagle location of `storage`. + pub fn new(storage: Arc, location: Url) -> Self { Self { storage, - location: storage_url, + location, + prefix: Path::from("/"), + options: HashMap::new().into(), } } + /// Try creating a new instance of [`DeltaObjectStore`] + /// + /// # Arguments + /// + /// * `location` - + pub fn try_new(location: Url, options: impl Into + Clone) -> DeltaResult { + let prefix = Path::from(location.path()); + let root_store = + ObjectStoreKind::parse_url(&location)?.into_impl(location.as_ref(), options.clone())?; + let storage = if prefix != Path::from("/") { + root_store.into_prefix(prefix.clone()) + } else { + root_store.into_store() + }; + Ok(Self { + storage, + location, + prefix, + options: options.into(), + }) + } + /// Get a reference to the underlying storage backend pub fn storage_backend(&self) -> Arc { self.storage.clone() } + /// Storage options used to intialize storage backend + pub fn storage_options(&self) -> &StorageOptions { + &self.options + } + /// Get fully qualified uri for table root pub fn root_uri(&self) -> String { - self.location.to_uri(&Path::from("")) + self.to_uri(&Path::from("")) } #[cfg(feature = "datafusion")] @@ -110,8 +124,7 @@ impl DeltaObjectStore { "delta-rs://{}", // NOTE We need to also replace colons, but its fine, since it just needs // to be a unique-ish identifier for the object store in datafusion - self.location - .prefix + self.prefix .as_ref() .replace(DELIMITER, "-") .replace(':', "-") @@ -126,7 +139,26 @@ impl DeltaObjectStore { /// [Path] to Delta log pub fn to_uri(&self, location: &Path) -> String { - self.location.to_uri(location) + match self.location.scheme() { + "file" => { + #[cfg(windows)] + let uri = format!( + "{}/{}", + self.location.as_ref().trim_end_matches('/'), + location.as_ref() + ) + .replace("file:///", ""); + #[cfg(unix)] + let uri = format!( + "{}/{}", + self.location.as_ref().trim_end_matches('/'), + location.as_ref() + ) + .replace("file://", ""); + uri + } + _ => format!("{}/{}", self.location.as_ref(), location.as_ref()), + } } /// Deletes object by `paths`. @@ -161,40 +193,28 @@ impl DeltaObjectStore { impl ObjectStore for DeltaObjectStore { /// Save the provided bytes to the specified location. async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult<()> { - let full_path = self.location.full_path(location); - self.storage.put(&full_path, bytes).await + self.storage.put(location, bytes).await } /// Return the bytes that are stored at the specified location. async fn get(&self, location: &Path) -> ObjectStoreResult { - let full_path = self.location.full_path(location); - self.storage.get(&full_path).await + self.storage.get(location).await } /// Return the bytes that are stored at the specified location /// in the given byte range async fn get_range(&self, location: &Path, range: Range) -> ObjectStoreResult { - let full_path = self.location.full_path(location); - object_store::ObjectStore::get_range(self.storage.as_ref(), &full_path, range).await + self.storage.get_range(location, range).await } /// Return the metadata for the specified location async fn head(&self, location: &Path) -> ObjectStoreResult { - let full_path = self.location.full_path(location); - self.storage.head(&full_path).await.map(|meta| ObjectMeta { - last_modified: meta.last_modified, - size: meta.size, - location: self - .location - .strip_prefix(&meta.location) - .unwrap_or(meta.location), - }) + self.storage.head(location).await } /// Delete the object at the specified location. async fn delete(&self, location: &Path) -> ObjectStoreResult<()> { - let full_path = self.location.full_path(location); - self.storage.delete(&full_path).await + self.storage.delete(location).await } /// List all the objects with the given prefix. @@ -205,22 +225,7 @@ impl ObjectStore for DeltaObjectStore { &self, prefix: Option<&Path>, ) -> ObjectStoreResult>> { - let prefix = prefix.map(|p| self.location.full_path(p)); - Ok(self - .storage - .list(Some( - &prefix.unwrap_or_else(|| self.location.prefix.clone()), - )) - .await? - .map_ok(|meta| ObjectMeta { - last_modified: meta.last_modified, - size: meta.size, - location: self - .location - .strip_prefix(&meta.location) - .unwrap_or(meta.location), - }) - .boxed()) + self.storage.list(prefix).await } /// List objects with the given prefix and an implementation specific @@ -230,68 +235,35 @@ impl ObjectStore for DeltaObjectStore { /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of /// `foo/bar_baz/x`. async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult { - let prefix = prefix.map(|p| self.location.full_path(p)); - self.storage - .list_with_delimiter(Some( - &prefix.unwrap_or_else(|| self.location.prefix.clone()), - )) - .await - .map(|lst| ListResult { - common_prefixes: lst - .common_prefixes - .iter() - .map(|p| self.location.strip_prefix(p).unwrap_or_else(|| p.clone())) - .collect(), - objects: lst - .objects - .iter() - .map(|meta| ObjectMeta { - last_modified: meta.last_modified, - size: meta.size, - location: self - .location - .strip_prefix(&meta.location) - .unwrap_or_else(|| meta.location.clone()), - }) - .collect(), - }) + self.storage.list_with_delimiter(prefix).await } /// Copy an object from one path to another in the same object store. /// /// If there exists an object at the destination, it will be overwritten. async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { - let full_from = self.location.full_path(from); - let full_to = self.location.full_path(to); - self.storage.copy(&full_from, &full_to).await + self.storage.copy(from, to).await } /// Copy an object from one path to another, only if destination is empty. /// /// Will return an error if the destination already has an object. async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { - let full_from = self.location.full_path(from); - let full_to = self.location.full_path(to); - self.storage.copy_if_not_exists(&full_from, &full_to).await + self.storage.copy_if_not_exists(from, to).await } /// Move an object from one path to another in the same object store. /// /// Will return an error if the destination already has an object. async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { - let full_from = self.location.full_path(from); - let full_to = self.location.full_path(to); - self.storage - .rename_if_not_exists(&full_from, &full_to) - .await + self.storage.rename_if_not_exists(from, to).await } async fn put_multipart( &self, location: &Path, ) -> ObjectStoreResult<(MultipartId, Box)> { - let full_path = self.location.full_path(location); - self.storage.put_multipart(&full_path).await + self.storage.put_multipart(location).await } async fn abort_multipart( @@ -299,7 +271,53 @@ impl ObjectStore for DeltaObjectStore { location: &Path, multipart_id: &MultipartId, ) -> ObjectStoreResult<()> { - let full_path = self.location.full_path(location); - self.storage.abort_multipart(&full_path, multipart_id).await + self.storage.abort_multipart(location, multipart_id).await + } +} + +impl Serialize for DeltaObjectStore { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut seq = serializer.serialize_seq(None)?; + seq.serialize_element(&self.location.to_string())?; + seq.serialize_element(&self.options.0)?; + seq.end() + } +} + +impl<'de> Deserialize<'de> for DeltaObjectStore { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct DeltaObjectStoreVisitor {} + + impl<'de> Visitor<'de> for DeltaObjectStoreVisitor { + type Value = DeltaObjectStore; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> fmt::Result { + formatter.write_str("struct DeltaObjectStore") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: SeqAccess<'de>, + { + let location_str: String = seq + .next_element()? + .ok_or_else(|| A::Error::invalid_length(0, &self))?; + let options: HashMap = seq + .next_element()? + .ok_or_else(|| A::Error::invalid_length(0, &self))?; + let location = Url::parse(&location_str).unwrap(); + let table = DeltaObjectStore::try_new(location, options) + .map_err(|_| A::Error::custom("Failed deserializing DeltaObjectStore"))?; + Ok(table) + } + } + + deserializer.deserialize_seq(DeltaObjectStoreVisitor {}) } } diff --git a/rust/src/storage/s3.rs b/rust/src/storage/s3.rs index da82e01191..ad5b739a23 100644 --- a/rust/src/storage/s3.rs +++ b/rust/src/storage/s3.rs @@ -1,6 +1,6 @@ //! AWS S3 storage backend. -use super::config::str_is_truthy; +use super::utils::str_is_truthy; use crate::builder::{s3_storage_options, str_option}; use bytes::Bytes; use dynamodb_lock::{DynamoError, LockClient, LockItem, DEFAULT_MAX_RETRY_ACQUIRE_LOCK_ATTEMPTS}; diff --git a/rust/src/storage/utils.rs b/rust/src/storage/utils.rs index c80a114087..0ccc6b480e 100644 --- a/rust/src/storage/utils.rs +++ b/rust/src/storage/utils.rs @@ -56,3 +56,11 @@ pub async fn flatten_list_stream( .try_collect::>() .await } + +pub(crate) fn str_is_truthy(val: &str) -> bool { + val.eq_ignore_ascii_case("1") + | val.eq_ignore_ascii_case("true") + | val.eq_ignore_ascii_case("on") + | val.eq_ignore_ascii_case("yes") + | val.eq_ignore_ascii_case("y") +} From 232bf63c76232aeb80e8846c3cc74bb832904fdd Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 14 Jan 2023 16:13:16 +0100 Subject: [PATCH 02/11] test: create table at local path --- rust/src/operations/create.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/rust/src/operations/create.rs b/rust/src/operations/create.rs index 0842d1628d..93a90a5a6e 100644 --- a/rust/src/operations/create.rs +++ b/rust/src/operations/create.rs @@ -345,6 +345,17 @@ mod tests { assert_eq!(table.get_metadata().unwrap().schema, table_schema) } + #[tokio::test] + async fn test_create_table_local_path() { + let schema = get_delta_schema(); + let table = CreateBuilder::new() + .with_location("./new-table") + .with_columns(schema.get_fields().clone()) + .await + .unwrap(); + assert_eq!(table.version(), 0); + } + #[tokio::test] async fn test_create_table_metadata() { let schema = get_delta_schema(); From 24c2a42effc71009bea9a1c03232337e4cb10034 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 14 Jan 2023 16:25:02 +0100 Subject: [PATCH 03/11] chore: clippy with fixes --- rust/src/builder.rs | 4 ++-- rust/src/delta_datafusion.rs | 6 +++--- rust/src/operations/transaction.rs | 2 +- rust/tests/add_actions_test.rs | 26 +++++++++----------------- rust/tests/checkpoint_writer.rs | 2 +- rust/tests/datafusion_test.rs | 2 +- rust/tests/fs_common/mod.rs | 2 +- 7 files changed, 18 insertions(+), 26 deletions(-) diff --git a/rust/src/builder.rs b/rust/src/builder.rs index 028becaa7b..47aae806fb 100644 --- a/rust/src/builder.rs +++ b/rust/src/builder.rs @@ -350,8 +350,8 @@ pub(crate) fn str_option(map: &HashMap, key: &str) -> Option) -> DeltaResult { let table_uri = table_uri.as_ref(); if let Ok(path) = std::fs::canonicalize(table_uri) { - return Ok(Url::from_directory_path(path) - .map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()))?); + return Url::from_directory_path(path) + .map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string())); } if let Ok(url) = Url::parse(table_uri) { return Ok(url); diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index e415293d8f..6934195263 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -413,7 +413,7 @@ impl TableProvider for DeltaTable { .await?; Ok(Arc::new(DeltaScan { - url: ensure_table_uri(&self.table_uri())?.as_str().into(), + url: ensure_table_uri(self.table_uri())?.as_str().into(), parquet_scan, })) } @@ -1006,8 +1006,8 @@ mod tests { let batch = RecordBatch::try_new( Arc::clone(&schema), vec![ - Arc::new(arrow::array::StringArray::from_slice(&["a", "b", "c", "d"])), - Arc::new(arrow::array::Int32Array::from_slice(&[1, 10, 10, 100])), + Arc::new(arrow::array::StringArray::from_slice(["a", "b", "c", "d"])), + Arc::new(arrow::array::Int32Array::from_slice([1, 10, 10, 100])), ], ) .unwrap(); diff --git a/rust/src/operations/transaction.rs b/rust/src/operations/transaction.rs index 9b06e48088..64a70b5827 100644 --- a/rust/src/operations/transaction.rs +++ b/rust/src/operations/transaction.rs @@ -159,7 +159,7 @@ mod tests { #[tokio::test] async fn test_commits_writes_file() { - let metadata = get_delta_metadata(&vec![]); + let metadata = get_delta_metadata(&[]); let operation = DeltaOperation::Create { mode: SaveMode::Append, location: "memory://".into(), diff --git a/rust/tests/add_actions_test.rs b/rust/tests/add_actions_test.rs index 0ba51b6bbb..3197663625 100644 --- a/rust/tests/add_actions_test.rs +++ b/rust/tests/add_actions_test.rs @@ -353,12 +353,10 @@ async fn test_only_struct_stats() { assert_eq!( actions - .get_field_at_path(&vec![ - "null_count", + .get_field_at_path(&["null_count", "nested_struct", "struct_element", - "nested_struct_element" - ]) + "nested_struct_element"]) .unwrap() .as_any() .downcast_ref::() @@ -368,12 +366,10 @@ async fn test_only_struct_stats() { assert_eq!( actions - .get_field_at_path(&vec![ - "min", + .get_field_at_path(&["min", "nested_struct", "struct_element", - "nested_struct_element" - ]) + "nested_struct_element"]) .unwrap() .as_any() .downcast_ref::() @@ -383,12 +379,10 @@ async fn test_only_struct_stats() { assert_eq!( actions - .get_field_at_path(&vec![ - "max", + .get_field_at_path(&["max", "nested_struct", "struct_element", - "nested_struct_element" - ]) + "nested_struct_element"]) .unwrap() .as_any() .downcast_ref::() @@ -398,11 +392,9 @@ async fn test_only_struct_stats() { assert_eq!( actions - .get_field_at_path(&vec![ - "null_count", + .get_field_at_path(&["null_count", "struct_of_array_of_map", - "struct_element" - ]) + "struct_element"]) .unwrap() .as_any() .downcast_ref::() @@ -412,7 +404,7 @@ async fn test_only_struct_stats() { assert_eq!( actions - .get_field_at_path(&vec!["tags", "OPTIMIZE_TARGET_SIZE"]) + .get_field_at_path(&["tags", "OPTIMIZE_TARGET_SIZE"]) .unwrap() .as_any() .downcast_ref::() diff --git a/rust/tests/checkpoint_writer.rs b/rust/tests/checkpoint_writer.rs index 015710c1a0..78c1e2ac2b 100644 --- a/rust/tests/checkpoint_writer.rs +++ b/rust/tests/checkpoint_writer.rs @@ -109,7 +109,7 @@ mod delete_expired_delta_log_in_checkpoint { let set_file_last_modified = |version: usize, last_modified_millis: i64| { let last_modified_secs = last_modified_millis / 1000; let path = format!("{}/_delta_log/{:020}.json", &table_path, version); - utime::set_file_times(&path, last_modified_secs, last_modified_secs).unwrap(); + utime::set_file_times(path, last_modified_secs, last_modified_secs).unwrap(); }; // create 2 commits diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index 7e1534ceec..894a2dd86c 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -61,7 +61,7 @@ async fn prepare_table( let table_dir = tempfile::tempdir().unwrap(); let table_path = table_dir.path(); let table_uri = table_path.to_str().unwrap().to_string(); - let table_schema: Schema = batches[0].schema().clone().try_into().unwrap(); + let table_schema: Schema = batches[0].schema().try_into().unwrap(); let mut table = DeltaOps::try_from_uri(table_uri) .await diff --git a/rust/tests/fs_common/mod.rs b/rust/tests/fs_common/mod.rs index de21ff89c3..7d1383afd9 100644 --- a/rust/tests/fs_common/mod.rs +++ b/rust/tests/fs_common/mod.rs @@ -15,7 +15,7 @@ pub fn cleanup_dir_except>(path: P, ignore_files: Vec) { let name = d.path().file_name().unwrap().to_str().unwrap().to_string(); if !ignore_files.contains(&name) && !name.starts_with('.') { - fs::remove_file(&path).unwrap(); + fs::remove_file(path).unwrap(); } } } From 7807b290f84f9f55a035febc728ecbaad27e7191 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 14 Jan 2023 16:26:48 +0100 Subject: [PATCH 04/11] chore: fmt --- rust/tests/add_actions_test.rs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/rust/tests/add_actions_test.rs b/rust/tests/add_actions_test.rs index 3197663625..b787868051 100644 --- a/rust/tests/add_actions_test.rs +++ b/rust/tests/add_actions_test.rs @@ -353,10 +353,12 @@ async fn test_only_struct_stats() { assert_eq!( actions - .get_field_at_path(&["null_count", + .get_field_at_path(&[ + "null_count", "nested_struct", "struct_element", - "nested_struct_element"]) + "nested_struct_element" + ]) .unwrap() .as_any() .downcast_ref::() @@ -366,10 +368,12 @@ async fn test_only_struct_stats() { assert_eq!( actions - .get_field_at_path(&["min", + .get_field_at_path(&[ + "min", "nested_struct", "struct_element", - "nested_struct_element"]) + "nested_struct_element" + ]) .unwrap() .as_any() .downcast_ref::() @@ -379,10 +383,12 @@ async fn test_only_struct_stats() { assert_eq!( actions - .get_field_at_path(&["max", + .get_field_at_path(&[ + "max", "nested_struct", "struct_element", - "nested_struct_element"]) + "nested_struct_element" + ]) .unwrap() .as_any() .downcast_ref::() @@ -392,9 +398,7 @@ async fn test_only_struct_stats() { assert_eq!( actions - .get_field_at_path(&["null_count", - "struct_of_array_of_map", - "struct_element"]) + .get_field_at_path(&["null_count", "struct_of_array_of_map", "struct_element"]) .unwrap() .as_any() .downcast_ref::() From 8ca88e017e6fb54d4c34c0f700adabce1b4b7477 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 14 Jan 2023 16:31:36 +0100 Subject: [PATCH 05/11] fix: remove unused intos --- rust/src/storage/config.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/rust/src/storage/config.rs b/rust/src/storage/config.rs index 87a25b067f..cc6833daa4 100644 --- a/rust/src/storage/config.rs +++ b/rust/src/storage/config.rs @@ -206,8 +206,7 @@ impl ObjectStoreKind { ObjectStoreKind::S3 => Err(DeltaTableError::MissingFeature { feature: "s3", url: storage_url.as_ref().into(), - } - .into()), + }), #[cfg(feature = "azure")] ObjectStoreKind::Azure => { let store = MicrosoftAzureBuilder::new() @@ -228,8 +227,7 @@ impl ObjectStoreKind { ObjectStoreKind::Azure => Err(DeltaTableError::MissingFeature { feature: "azure", url: storage_url.as_ref().into(), - } - .into()), + }), #[cfg(feature = "gcs")] ObjectStoreKind::Google => { let store = GoogleCloudStorageBuilder::new() @@ -248,8 +246,7 @@ impl ObjectStoreKind { ObjectStoreKind::Google => Err(DeltaTableError::MissingFeature { feature: "gcs", url: storage_url.as_ref().into(), - } - .into()), + }), } } } From 9c54d20dcb0a7849919c1db941b2e26d783e8405 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 14 Jan 2023 16:56:45 +0100 Subject: [PATCH 06/11] fix: normalize_table_uri --- rust/src/delta.rs | 18 ++++++++++++++++-- rust/src/storage/mod.rs | 2 ++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/rust/src/delta.rs b/rust/src/delta.rs index c26d69b5a1..474c4f82c9 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -1522,10 +1522,10 @@ mod tests { #[cfg(any(feature = "s3", feature = "s3-rustls"))] #[test] - fn normalize_table_uri() { + fn normalize_table_uri_s3() { for table_uri in [ "s3://tests/data/delta-0.8.0/", - // "s3://tests/data/delta-0.8.0//", + "s3://tests/data/delta-0.8.0//", "s3://tests/data/delta-0.8.0", ] .iter() @@ -1535,6 +1535,20 @@ mod tests { } } + #[test] + fn normalize_table_uri() { + for table_uri in [ + "file:///tests/data/delta-0.8.0/", + "file:///tests/data/delta-0.8.0//", + "file:///tests/data/delta-0.8.0", + ] + .iter() + { + let table = DeltaTableBuilder::from_uri(table_uri).build().unwrap(); + assert_eq!(table.table_uri(), "/tests/data/delta-0.8.0"); + } + } + async fn create_test_table() -> (DeltaTableMetaData, Protocol, DeltaTable, TempDir) { // Setup let test_schema = Schema::new(vec![ diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index 12a416c8f7..cbbacf1f9a 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -159,6 +159,8 @@ impl DeltaObjectStore { } _ => format!("{}/{}", self.location.as_ref(), location.as_ref()), } + .trim_end_matches('/') + .into() } /// Deletes object by `paths`. From bea4a1c8a90d77184d1ea4b85a7df4b6dd675cb0 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 14 Jan 2023 17:28:25 +0100 Subject: [PATCH 07/11] fix: normalize_table_uri again --- rust/src/builder.rs | 15 ++++++++++++++- rust/src/delta.rs | 14 -------------- rust/src/storage/mod.rs | 2 -- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/rust/src/builder.rs b/rust/src/builder.rs index 47aae806fb..bd43928f0d 100644 --- a/rust/src/builder.rs +++ b/rust/src/builder.rs @@ -354,7 +354,14 @@ pub(crate) fn ensure_table_uri(table_uri: impl AsRef) -> DeltaResult { .map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string())); } if let Ok(url) = Url::parse(table_uri) { - return Ok(url); + return Ok(match url.scheme() { + "file" => url, + _ => { + let mut new_url = url.clone(); + new_url.set_path(url.path().trim_end_matches('/')); + new_url + } + }); } // The table uri still might be a relative paths that does not exist. std::fs::create_dir_all(table_uri) @@ -378,5 +385,11 @@ mod tests { assert!(uri.is_ok()); let uri = ensure_table_uri("s3://container/path"); assert!(uri.is_ok()); + let uri = ensure_table_uri("file:///").unwrap(); + assert_eq!("file:///", uri.as_str()); + let uri = ensure_table_uri("memory://").unwrap(); + assert_eq!("memory://", uri.as_str()); + let uri = ensure_table_uri("s3://tests/data/delta-0.8.0//").unwrap(); + assert_eq!("s3://tests/data/delta-0.8.0", uri.as_str()) } } diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 474c4f82c9..71bbda6bd1 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -1535,20 +1535,6 @@ mod tests { } } - #[test] - fn normalize_table_uri() { - for table_uri in [ - "file:///tests/data/delta-0.8.0/", - "file:///tests/data/delta-0.8.0//", - "file:///tests/data/delta-0.8.0", - ] - .iter() - { - let table = DeltaTableBuilder::from_uri(table_uri).build().unwrap(); - assert_eq!(table.table_uri(), "/tests/data/delta-0.8.0"); - } - } - async fn create_test_table() -> (DeltaTableMetaData, Protocol, DeltaTable, TempDir) { // Setup let test_schema = Schema::new(vec![ diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index cbbacf1f9a..12a416c8f7 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -159,8 +159,6 @@ impl DeltaObjectStore { } _ => format!("{}/{}", self.location.as_ref(), location.as_ref()), } - .trim_end_matches('/') - .into() } /// Deletes object by `paths`. From d844b9a543e930c917e0e1731599939efdade444 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 14 Jan 2023 18:11:39 +0100 Subject: [PATCH 08/11] fix: slash trailing slashes --- rust/src/builder.rs | 9 +++++++-- rust/src/delta.rs | 1 + rust/src/storage/mod.rs | 8 +++++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/rust/src/builder.rs b/rust/src/builder.rs index bd43928f0d..072403e687 100644 --- a/rust/src/builder.rs +++ b/rust/src/builder.rs @@ -220,7 +220,10 @@ impl DeltaTableBuilder { /// Build a delta storage backend for the given config pub fn build_storage(self) -> DeltaResult { match self.options.storage_backend { - Some((storage, location)) => Ok(Arc::new(DeltaObjectStore::new(storage, location))), + Some((storage, location)) => Ok(Arc::new(DeltaObjectStore::new( + storage, + ensure_table_uri(location.as_str())?, + ))), None => { let location = ensure_table_uri(&self.options.table_uri)?; Ok(Arc::new(DeltaObjectStore::try_new( @@ -389,7 +392,9 @@ mod tests { assert_eq!("file:///", uri.as_str()); let uri = ensure_table_uri("memory://").unwrap(); assert_eq!("memory://", uri.as_str()); - let uri = ensure_table_uri("s3://tests/data/delta-0.8.0//").unwrap(); + let uri = ensure_table_uri("s3://tests/data/delta-0.8.0/").unwrap(); + assert_eq!("s3://tests/data/delta-0.8.0", uri.as_str()); + let _uri = ensure_table_uri("s3://tests/data/delta-0.8.0//").unwrap(); assert_eq!("s3://tests/data/delta-0.8.0", uri.as_str()) } } diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 71bbda6bd1..e214292227 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -1523,6 +1523,7 @@ mod tests { #[cfg(any(feature = "s3", feature = "s3-rustls"))] #[test] fn normalize_table_uri_s3() { + std::env::set_var("AWS_DEFAULT_REGION", "us-east-1"); for table_uri in [ "s3://tests/data/delta-0.8.0/", "s3://tests/data/delta-0.8.0//", diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index 12a416c8f7..55e58e8501 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -157,7 +157,13 @@ impl DeltaObjectStore { .replace("file://", ""); uri } - _ => format!("{}/{}", self.location.as_ref(), location.as_ref()), + _ => { + if location.as_ref().is_empty() || location.as_ref() == "/" { + format!("{}", self.location.as_ref()) + } else { + format!("{}/{}", self.location.as_ref(), location.as_ref()) + } + } } } From 97ed00aefd21c662384d6297950ccfdcbe593afb Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 14 Jan 2023 18:15:33 +0100 Subject: [PATCH 09/11] fix: clippy --- rust/src/storage/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index 55e58e8501..52dce17251 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -159,7 +159,7 @@ impl DeltaObjectStore { } _ => { if location.as_ref().is_empty() || location.as_ref() == "/" { - format!("{}", self.location.as_ref()) + self.location.as_ref().to_string() } else { format!("{}/{}", self.location.as_ref(), location.as_ref()) } From 8436e6723a037f4cff0cc295eba4c741489e27e2 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 14 Jan 2023 18:37:15 +0100 Subject: [PATCH 10/11] fix: randomize test table names --- rust/src/operations/create.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/rust/src/operations/create.rs b/rust/src/operations/create.rs index 93a90a5a6e..c592cf6963 100644 --- a/rust/src/operations/create.rs +++ b/rust/src/operations/create.rs @@ -314,6 +314,7 @@ mod tests { use crate::operations::DeltaOps; use crate::table_properties::APPEND_ONLY; use crate::writer::test_utils::get_delta_schema; + use rand::distributions::{Alphanumeric, DistString}; #[tokio::test] async fn test_create() { @@ -332,8 +333,8 @@ mod tests { #[tokio::test] async fn test_create_local_relative_path() { let table_schema = get_delta_schema(); - - let table = DeltaOps::try_from_uri("./table") + let name = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); + let table = DeltaOps::try_from_uri(format!("./{}", name)) .await .unwrap() .create() @@ -348,8 +349,9 @@ mod tests { #[tokio::test] async fn test_create_table_local_path() { let schema = get_delta_schema(); + let name = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); let table = CreateBuilder::new() - .with_location("./new-table") + .with_location(format!("./{}", name)) .with_columns(schema.get_fields().clone()) .await .unwrap(); From 0c68eb302a3cefc9e5662d4d1175cc08bdf894e4 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 14 Jan 2023 20:53:28 +0100 Subject: [PATCH 11/11] chore: cleanup --- rust/src/operations/load.rs | 4 +--- rust/src/storage/mod.rs | 3 ++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/rust/src/operations/load.rs b/rust/src/operations/load.rs index f49929ca49..3d1ec00702 100644 --- a/rust/src/operations/load.rs +++ b/rust/src/operations/load.rs @@ -79,8 +79,6 @@ impl std::future::IntoFuture for LoadBuilder { let object_store = this.object_store.unwrap(); let url = url::Url::parse(&object_store.root_uri()) .map_err(|_| DeltaTableError::InvalidTableLocation(object_store.root_uri()))?; - let scheme = url.scheme(); - let store = object_store.storage_backend().clone(); let mut table = DeltaTable::new(object_store, Default::default()); table.load().await?; @@ -88,7 +86,7 @@ impl std::future::IntoFuture for LoadBuilder { let ctx = SessionContext::new(); ctx.state() .runtime_env - .register_object_store(scheme, "", store); + .register_object_store(url.scheme(), "", store); let scan_plan = table.scan(&ctx.state(), None, &[], None).await?; let plan = CoalescePartitionsExec::new(scan_plan); let task_ctx = Arc::new(TaskContext::from(&ctx.state())); diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index 52dce17251..14a8240729 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -82,7 +82,8 @@ impl DeltaObjectStore { /// /// # Arguments /// - /// * `location` - + /// * `location` - A url pointing to the root of the delta table. + /// * `options` - Options passed to underlying builders. See [`with_storage_options`](crate::builder::DeltaTableBuilder::with_storage_options) pub fn try_new(location: Url, options: impl Into + Clone) -> DeltaResult { let prefix = Path::from(location.path()); let root_store =