Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve storage location handling #1065

Merged
merged 11 commits into from
Jan 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 92 additions & 182 deletions rust/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,14 @@ use std::sync::Arc;

use crate::delta::{DeltaResult, DeltaTable, DeltaTableError};
use crate::schema::DeltaDataTypeVersion;
use crate::storage::config::{StorageLocation, StorageOptions};
use crate::storage::file::FileStorageBackend;
use crate::storage::config::StorageOptions;
use crate::storage::{DeltaObjectStore, ObjectStoreRef};

use chrono::{DateTime, FixedOffset, Utc};
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::{DynObjectStore, Error as ObjectStoreError, Result as ObjectStoreResult};
use object_store::DynObjectStore;
use serde::{Deserialize, Serialize};
use url::Url;

#[cfg(any(feature = "s3", feature = "s3-rustls"))]
use crate::storage::s3::{S3StorageBackend, S3StorageOptions};
#[cfg(any(feature = "s3", feature = "s3-rustls"))]
use object_store::aws::AmazonS3Builder;
#[cfg(feature = "azure")]
use object_store::azure::MicrosoftAzureBuilder;
#[cfg(feature = "gcs")]
use object_store::gcp::GoogleCloudStorageBuilder;

#[allow(dead_code)]
#[derive(Debug, thiserror::Error)]
enum BuilderError {
Expand All @@ -38,6 +26,8 @@ enum BuilderError {
Decode(String),
#[error("Delta-rs must be build with feature '{feature}' to support url: {url}.")]
MissingFeature { feature: &'static str, url: String },
#[error("Failed to parse table uri")]
TableUri(#[from] url::ParseError),
}

impl From<BuilderError> for DeltaTableError {
Expand Down Expand Up @@ -98,7 +88,7 @@ pub struct DeltaTableLoadOptions {
/// table root uri
pub table_uri: String,
/// backend to access storage system
pub storage_backend: Option<(Arc<DynObjectStore>, Path)>,
pub storage_backend: Option<(Arc<DynObjectStore>, Url)>,
/// specify the version we are going to load: a time stamp, a version, or just the newest
/// available version
pub version: DeltaVersion,
Expand Down Expand Up @@ -168,7 +158,7 @@ impl DeltaTableBuilder {
}

/// specify the timestamp given as ISO-8601/RFC-3339 timestamp
pub fn with_datestring(self, date_string: impl AsRef<str>) -> Result<Self, DeltaTableError> {
pub fn with_datestring(self, date_string: impl AsRef<str>) -> DeltaResult<Self> {
let datetime = DateTime::<Utc>::from(DateTime::<FixedOffset>::parse_from_rfc3339(
date_string.as_ref(),
)?);
Expand All @@ -183,13 +173,14 @@ impl DeltaTableBuilder {

/// Set the storage backend.
///
/// `table_root` denotes the [object_store::path::Path] within the store to the root of the delta.
/// This is required since we cannot infer the relative location of the table from the `table_uri`
/// For non-standard object store implementations.
///
/// If a backend is not provided then it is derived from `table_uri`.
pub fn with_storage_backend(mut self, storage: Arc<DynObjectStore>, table_root: &Path) -> Self {
self.options.storage_backend = Some((storage, table_root.clone()));
///
/// # Arguments
///
/// * `storage` - A shared reference to an [`ObjectStore`](object_store::ObjectStore) with "/" pointing at delta table root (i.e. where `_delta_log` is located).
/// * `location` - A url corresponding to the storagle location of `storage`.
pub fn with_storage_backend(mut self, storage: Arc<DynObjectStore>, location: Url) -> Self {
self.options.storage_backend = Some((storage, location));
self
}

Expand All @@ -214,53 +205,49 @@ impl DeltaTableBuilder {
self
}

/// Build a delta storage backend for the given config
pub fn build_storage(self) -> Result<ObjectStoreRef, DeltaTableError> {
let (storage, storage_url) = match self.options.storage_backend {
// Some(storage) => storage,
None => get_storage_backend(
&self.options.table_uri,
self.storage_options,
self.allow_http,
)?,
_ => todo!(),
/// Storage options for configuring backend object store
pub fn storage_options(&self) -> StorageOptions {
let mut storage_options = self.storage_options.clone().unwrap_or_default();
if let Some(allow) = self.allow_http {
storage_options.insert(
"allow_http".into(),
if allow { "true" } else { "false" }.into(),
);
};
let object_store = Arc::new(DeltaObjectStore::new(storage_url, storage));
Ok(object_store)
storage_options.into()
}

/// Build a delta storage backend for the given config
pub fn build_storage(self) -> DeltaResult<ObjectStoreRef> {
match self.options.storage_backend {
Some((storage, location)) => Ok(Arc::new(DeltaObjectStore::new(
storage,
ensure_table_uri(location.as_str())?,
))),
None => {
let location = ensure_table_uri(&self.options.table_uri)?;
Ok(Arc::new(DeltaObjectStore::try_new(
location,
self.storage_options(),
)?))
}
}
}

/// Build the [`DeltaTable`] from specified options.
///
/// This will not load the log, i.e. the table is not initialized. To get an initialized
/// table use the `load` function
pub fn build(self) -> Result<DeltaTable, DeltaTableError> {
let (storage, storage_url) = match self.options.storage_backend {
Some((store, path)) => {
let mut uri = self.options.table_uri + path.as_ref();
if !uri.contains(':') {
uri = format!("file://{}", uri);
}
let url = Url::parse(uri.as_str())
.map_err(|_| DeltaTableError::Generic(format!("Can't parse uri: {}", uri)))?;
let url = StorageLocation::new(url);
(store, url)
}
None => get_storage_backend(
&self.options.table_uri,
self.storage_options,
self.allow_http,
)?,
};
pub fn build(self) -> DeltaResult<DeltaTable> {
let config = DeltaTableConfig {
require_tombstones: self.options.require_tombstones,
require_files: self.options.require_files,
};
let object_store = Arc::new(DeltaObjectStore::new(storage_url, storage));
Ok(DeltaTable::new(object_store, config))
Ok(DeltaTable::new(self.build_storage()?, config))
}

/// Build the [`DeltaTable`] and load its state
pub async fn load(self) -> Result<DeltaTable, DeltaTableError> {
pub async fn load(self) -> DeltaResult<DeltaTable> {
let version = self.options.version.clone();
let mut table = self.build()?;
match version {
Expand All @@ -272,132 +259,6 @@ impl DeltaTableBuilder {
}
}

enum ObjectStoreKind {
Local,
InMemory,
S3,
Google,
Azure,
}

impl ObjectStoreKind {
pub fn parse_url(url: &Url) -> ObjectStoreResult<Self> {
match url.scheme() {
"file" => Ok(ObjectStoreKind::Local),
"memory" => Ok(ObjectStoreKind::InMemory),
"az" | "abfs" | "abfss" | "azure" | "wasb" | "adl" => Ok(ObjectStoreKind::Azure),
"s3" | "s3a" => Ok(ObjectStoreKind::S3),
"gs" => Ok(ObjectStoreKind::Google),
"https" => {
let host = url.host_str().unwrap_or_default();
if host.contains("amazonaws.com") {
Ok(ObjectStoreKind::S3)
} else if host.contains("dfs.core.windows.net")
|| host.contains("blob.core.windows.net")
{
Ok(ObjectStoreKind::Azure)
} else {
Err(ObjectStoreError::NotImplemented)
}
}
_ => Err(ObjectStoreError::NotImplemented),
}
}
}

/// Create a new storage backend used in Delta table
pub(crate) fn get_storage_backend(
table_uri: impl AsRef<str>,
// annotation needed for some feature builds
#[allow(unused_variables)] options: Option<HashMap<String, String>>,
#[allow(unused_variables)] allow_http: Option<bool>,
) -> DeltaResult<(Arc<DynObjectStore>, StorageLocation)> {
let storage_url = StorageLocation::parse(table_uri)?;
let mut options = options.unwrap_or_default();
if let Some(allow) = allow_http {
options.insert(
"allow_http".into(),
if allow { "true" } else { "false" }.into(),
);
}
let _options = StorageOptions::new(options);

match ObjectStoreKind::parse_url(&storage_url.url)? {
ObjectStoreKind::Local => Ok((Arc::new(FileStorageBackend::new()), storage_url)),
ObjectStoreKind::InMemory => Ok((Arc::new(InMemory::new()), storage_url)),
#[cfg(any(feature = "s3", feature = "s3-rustls"))]
ObjectStoreKind::S3 => {
let store = AmazonS3Builder::new()
.with_url(storage_url.as_ref())
.try_with_options(&_options.as_s3_options())?
.with_allow_http(_options.allow_http())
.build()
.or_else(|_| {
AmazonS3Builder::from_env()
.with_url(storage_url.as_ref())
.try_with_options(&_options.as_s3_options())?
.with_allow_http(_options.allow_http())
.build()
})?;
Ok((
Arc::new(S3StorageBackend::try_new(
Arc::new(store),
S3StorageOptions::from_map(&_options.0),
)?),
storage_url,
))
}
#[cfg(not(any(feature = "s3", feature = "s3-rustls")))]
ObjectStoreKind::S3 => Err(BuilderError::MissingFeature {
feature: "s3",
url: storage_url.as_ref().into(),
}
.into()),
#[cfg(feature = "azure")]
ObjectStoreKind::Azure => {
let store = MicrosoftAzureBuilder::new()
.with_url(storage_url.as_ref())
.try_with_options(&_options.as_azure_options())?
.with_allow_http(_options.allow_http())
.build()
.or_else(|_| {
MicrosoftAzureBuilder::from_env()
.with_url(storage_url.as_ref())
.try_with_options(&_options.as_azure_options())?
.with_allow_http(_options.allow_http())
.build()
})?;
Ok((Arc::new(store), storage_url))
}
#[cfg(not(feature = "azure"))]
ObjectStoreKind::Azure => Err(BuilderError::MissingFeature {
feature: "azure",
url: storage_url.as_ref().into(),
}
.into()),
#[cfg(feature = "gcs")]
ObjectStoreKind::Google => {
let store = GoogleCloudStorageBuilder::new()
.with_url(storage_url.as_ref())
.try_with_options(&_options.as_gcs_options())?
.build()
.or_else(|_| {
GoogleCloudStorageBuilder::from_env()
.with_url(storage_url.as_ref())
.try_with_options(&_options.as_gcs_options())?
.build()
})?;
Ok((Arc::new(store), storage_url))
}
#[cfg(not(feature = "gcs"))]
ObjectStoreKind::Google => Err(BuilderError::MissingFeature {
feature: "gcs",
url: storage_url.as_ref().into(),
}
.into()),
}
}

/// Storage option keys to use when creating [crate::storage::s3::S3StorageOptions].
/// The same key should be used whether passing a key in the hashmap or setting it as an environment variable.
/// Provided keys may include configuration for the S3 backend and also the optional DynamoDb lock used for atomic rename.
Expand Down Expand Up @@ -432,7 +293,7 @@ pub mod s3_storage_options {
/// Hence, the `connection closed before message completed` could occur.
/// To avoid that, the default value of this setting is 15 seconds if it's not set otherwise.
pub const AWS_S3_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_S3_POOL_IDLE_TIMEOUT_SECONDS";
/// The `pool_idle_timeout` for the as3_storage_optionsws sts client. See
/// The `pool_idle_timeout` for the as3_storage_options sts client. See
/// the reasoning in `AWS_S3_POOL_IDLE_TIMEOUT_SECONDS`.
pub const AWS_STS_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_STS_POOL_IDLE_TIMEOUT_SECONDS";
/// The number of retries for S3 GET requests failed with 500 Internal Server Error.
Expand Down Expand Up @@ -488,3 +349,52 @@ pub(crate) fn str_option(map: &HashMap<String, String>, key: &str) -> Option<Str
map.get(key)
.map_or_else(|| std::env::var(key).ok(), |v| Some(v.to_owned()))
}

pub(crate) fn ensure_table_uri(table_uri: impl AsRef<str>) -> DeltaResult<Url> {
let table_uri = table_uri.as_ref();
if let Ok(path) = std::fs::canonicalize(table_uri) {
return Url::from_directory_path(path)
.map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()));
}
if let Ok(url) = Url::parse(table_uri) {
return Ok(match url.scheme() {
"file" => url,
_ => {
let mut new_url = url.clone();
new_url.set_path(url.path().trim_end_matches('/'));
new_url
}
});
}
// The table uri still might be a relative paths that does not exist.
std::fs::create_dir_all(table_uri)
.map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()))?;
let path = std::fs::canonicalize(table_uri)
.map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()))?;
Url::from_directory_path(path)
.map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_ensure_table_uri() {
// parse an exisiting relative directory
let uri = ensure_table_uri(".");
assert!(uri.is_ok());
let _uri = ensure_table_uri("./nonexistent");
assert!(uri.is_ok());
let uri = ensure_table_uri("s3://container/path");
assert!(uri.is_ok());
let uri = ensure_table_uri("file:///").unwrap();
assert_eq!("file:///", uri.as_str());
let uri = ensure_table_uri("memory://").unwrap();
assert_eq!("memory://", uri.as_str());
let uri = ensure_table_uri("s3://tests/data/delta-0.8.0/").unwrap();
assert_eq!("s3://tests/data/delta-0.8.0", uri.as_str());
let _uri = ensure_table_uri("s3://tests/data/delta-0.8.0//").unwrap();
assert_eq!("s3://tests/data/delta-0.8.0", uri.as_str())
}
}
16 changes: 14 additions & 2 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,17 @@ pub enum DeltaTableError {
/// Error returned when user attempts to commit actions that don't belong to the next version.
#[error("Delta transaction failed, version {0} does not follow {1}")]
VersionMismatch(DeltaDataTypeVersion, DeltaDataTypeVersion),
/// A Feature is missing to perform operation
#[error("Delta-rs must be build with feature '{feature}' to support loading from: {url}.")]
MissingFeature {
/// Name of the missiing feature
feature: &'static str,
/// Storage location url
url: String,
},
/// A Feature is missing to perform operation
#[error("Cannot infer storage location from: {0}")]
InvalidTableLocation(String),
/// Generic Delta Table error
#[error("Generic DeltaTable error: {0}")]
Generic(String),
Expand Down Expand Up @@ -1511,10 +1522,11 @@ mod tests {

#[cfg(any(feature = "s3", feature = "s3-rustls"))]
#[test]
fn normalize_table_uri() {
fn normalize_table_uri_s3() {
std::env::set_var("AWS_DEFAULT_REGION", "us-east-1");
for table_uri in [
"s3://tests/data/delta-0.8.0/",
// "s3://tests/data/delta-0.8.0//",
"s3://tests/data/delta-0.8.0//",
"s3://tests/data/delta-0.8.0",
]
.iter()
Expand Down
Loading