From f55470b16031b95d25819cd568b99dfba8494b36 Mon Sep 17 00:00:00 2001 From: Tobias Puetz Date: Tue, 13 Aug 2024 09:38:33 +0200 Subject: [PATCH] support azdls --- Cargo.toml | 3 +- crates/iceberg/Cargo.toml | 4 +- crates/iceberg/src/io/file_io.rs | 6 +++ crates/iceberg/src/io/mod.rs | 5 +++ crates/iceberg/src/io/storage.rs | 21 ++++++++- crates/iceberg/src/io/storage_azdls.rs | 50 +++++++++++++++++++++ crates/iceberg/src/spec/view_metadata.rs | 56 +++++++++++++++++------- crates/iceberg/src/spec/view_version.rs | 42 +++++++++++++++++- 8 files changed, 166 insertions(+), 21 deletions(-) create mode 100644 crates/iceberg/src/io/storage_azdls.rs diff --git a/Cargo.toml b/Cargo.toml index cb00ad599..cc2f02ed2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,7 @@ log = "^0.4" mockito = "^1" murmur3 = "0.5.2" once_cell = "1" -opendal = "0.48" +opendal = { git="https://github.com/twuebi/opendal.git", branch="tp/azdls-client-secret" } ordered-float = "4.0.0" parquet = "52" pilota = "0.11.2" @@ -85,6 +85,7 @@ serde_derive = "^1.0" serde_json = "^1.0" serde_repr = "0.1.16" serde_with = "3.4.0" +strum = "0.26.3" tempfile = "3.8" tokio = { version = "1", default-features = false } typed-builder = "^0.19" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index de5b7cdc5..dbda19b0f 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -30,11 +30,12 @@ keywords = ["iceberg"] [features] default = ["storage-memory", "storage-fs", "storage-s3", "tokio"] -storage-all = ["storage-memory", "storage-fs", "storage-s3"] +storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-azdls"] storage-memory = ["opendal/services-memory"] storage-fs = ["opendal/services-fs"] storage-s3 = ["opendal/services-s3"] +storage-azdls = ["opendal/services-azdls"] async-std = ["dep:async-std"] tokio = ["dep:tokio"] @@ -73,6 +74,7 @@ serde_derive = { workspace = true } serde_json = { workspace = true } serde_repr = { workspace = true } serde_with = { workspace = true } +strum = { workspace = true } tokio = { workspace = true, optional = true } typed-builder = { workspace = true } url = { workspace = true } diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 9af398270..8f63a9acd 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -67,6 +67,12 @@ impl FileIO { Ok(op.delete(relative_path).await?) } + /// Deletes all files in the directory. + pub async fn remove_all(&self, path: impl AsRef) -> Result<()> { + let (op, relative_path) = self.inner.create_operator(&path)?; + Ok(op.remove_all(relative_path).await?) + } + /// Check file exists. pub async fn is_exist(&self, path: impl AsRef) -> Result { let (op, relative_path) = self.inner.create_operator(&path)?; diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 12ad8097e..9677f137c 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -80,5 +80,10 @@ mod storage_s3; pub use storage_s3::*; #[cfg(feature = "storage-fs")] mod storage_fs; +#[cfg(feature = "storage-azdls")] +mod storage_azdls; +#[cfg(feature = "storage-azdls")] +pub use storage_azdls::ConfigKeys as AzdlsConfigKeys; + #[cfg(feature = "storage-fs")] use storage_fs::*; diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index 870e61ec6..dccd56a20 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -20,7 +20,10 @@ use std::sync::Arc; #[cfg(feature = "storage-s3")] use opendal::services::S3Config; use opendal::{Operator, Scheme}; - +#[cfg(feature = "storage-azdls")] +use opendal::services::AzdlsConfig; +#[cfg(feature = "storage-azdls")] +use super::storage_azdls; use super::FileIOBuilder; use crate::{Error, ErrorKind}; @@ -38,6 +41,10 @@ pub(crate) enum Storage { scheme_str: String, config: Arc, }, + #[cfg(feature = "storage-azdls")] + Azdls { + config: Arc + }, } impl Storage { @@ -56,6 +63,13 @@ impl Storage { scheme_str, config: super::s3_config_parse(props)?.into(), }), + #[cfg(feature = "storage-azdls")] + Scheme::Azdls => { + + Ok(Self::Azdls { + config: storage_azdls::azdls_config_parse(props)?.into(), + }) + } _ => Err(Error::new( ErrorKind::FeatureUnsupported, format!("Constructing file io from scheme: {scheme} not supported now",), @@ -117,6 +131,10 @@ impl Storage { )) } } + #[cfg(feature = "storage-azdls")] + Storage::Azdls { config } => { + Ok((Operator::from_config(config.as_ref().clone())?.finish(), &path["azdls://".len()..])) + } #[cfg(all(not(feature = "storage-s3"), not(feature = "storage-fs")))] _ => Err(Error::new( ErrorKind::FeatureUnsupported, @@ -131,6 +149,7 @@ impl Storage { "memory" => Ok(Scheme::Memory), "file" | "" => Ok(Scheme::Fs), "s3" | "s3a" => Ok(Scheme::S3), + "azdls" => Ok(Scheme::Azdls), s => Ok(s.parse::()?), } } diff --git a/crates/iceberg/src/io/storage_azdls.rs b/crates/iceberg/src/io/storage_azdls.rs new file mode 100644 index 000000000..3878e2410 --- /dev/null +++ b/crates/iceberg/src/io/storage_azdls.rs @@ -0,0 +1,50 @@ +use std::collections::HashMap; +use std::str::FromStr; +use opendal::services::AzdlsConfig; +use crate::{Error, ErrorKind, Result}; + +/// Azdls configuration keys with conversions to [`opendal::Operator`] configuration keys. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, strum::EnumString, strum::Display)] +#[strum(serialize_all = "snake_case")] +pub enum ConfigKeys { + /// Az endpoint to use + Endpoint, + /// Az client id, used for client credential flow, created in microsoft app registration + ClientId, + /// Az client secret, used for client credential flow, created in microsoft app registration + ClientSecret, + /// Az tenant id, required for client credential flow + TenantId, + /// Az account key, used for shared key authentication + AccountKey, + /// Az storage account name + AccountName, + /// Az filesystem to use, also known as container + Filesystem, + /// Az authority host, used for client credential flow + AuthorityHost +} + +pub(crate) fn azdls_config_parse(m: HashMap) -> Result { + let mut cfg = AzdlsConfig::default(); + for (k, v) in m.into_iter() { + let config_key = ConfigKeys::from_str(k.as_str()).map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid azdls config key: {}", k), + ) + })?; + match config_key { + ConfigKeys::Endpoint => cfg.endpoint = Some(v), + ConfigKeys::ClientId => cfg.client_id = Some(v), + ConfigKeys::ClientSecret => cfg.client_secret = Some(v), + ConfigKeys::TenantId => cfg.tenant_id = Some(v), + ConfigKeys::AccountKey => cfg.account_key = Some(v), + ConfigKeys::AccountName => cfg.account_name = Some(v), + ConfigKeys::Filesystem => cfg.filesystem = v, + ConfigKeys::AuthorityHost => cfg.authority_host = Some(v) + } + } + + Ok(cfg) +} \ No newline at end of file diff --git a/crates/iceberg/src/spec/view_metadata.rs b/crates/iceberg/src/spec/view_metadata.rs index e0a08501f..44361f3aa 100644 --- a/crates/iceberg/src/spec/view_metadata.rs +++ b/crates/iceberg/src/spec/view_metadata.rs @@ -19,7 +19,7 @@ //! The main struct here is [ViewMetadata] which defines the data for a view. use std::cmp::Ordering; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter}; use std::sync::Arc; @@ -30,7 +30,7 @@ use serde_repr::{Deserialize_repr, Serialize_repr}; use uuid::Uuid; use super::view_version::{ViewVersion, ViewVersionId, ViewVersionRef}; -use super::{SchemaId, SchemaRef}; +use super::{Schema, SchemaId, SchemaRef, ViewRepresentation}; use crate::catalog::ViewCreation; use crate::error::{timestamp_ms_to_utc, Result}; @@ -58,12 +58,12 @@ pub struct ViewMetadata { /// The view's base location; used to create metadata file locations pub location: String, /// ID of the current version of the view (version-id) - pub(crate) current_version_id: ViewVersionId, + pub current_version_id: ViewVersionId, /// A list of known versions of the view - pub(crate) versions: HashMap, + pub versions: HashMap, /// A list of version log entries with the timestamp and version-id for every /// change to current-version-id - pub(crate) version_log: Vec, + pub version_log: Vec, /// A list of schemas, stored as objects with schema-id. pub schemas: HashMap, /// A string to string map of view properties. @@ -144,7 +144,7 @@ impl ViewMetadata { } /// Append view version to view - fn add_version(&mut self, view_version: ViewVersion) -> Result> { + fn add_version(&mut self, view_version: ViewVersion) -> Result> { if self.versions.contains_key(&view_version.version_id()) { return Err(crate::Error::new( crate::ErrorKind::DataInvalid, @@ -305,14 +305,14 @@ fn is_same_version(a: &ViewVersion, b: &ViewVersion) -> bool { } fn is_same_schema(a: &Schema, b: &Schema) -> bool { - a.as_struct() == b.as_struct() && a.identifier_field_ids() == b.identifier_field_ids() + a.as_struct() == b.as_struct() && a.identifier_field_ids().collect::>() == b.identifier_field_ids().collect::>() } /// Manipulating view metadata. pub struct ViewMetadataBuilder { previous: ViewVersionRef, metadata: ViewMetadata, - last_added_version: Option, + last_added_version: Option, last_added_schema: Option, added_versions: usize, // TODO: Update tracking needed? } @@ -350,7 +350,7 @@ impl ViewMetadataBuilder { } let (_, maybe_err) = version.as_ref().representations().iter().fold_while((HashSet::new(), None), |(mut dialects, _), r| match r { - ViewRepresentation::SqlViewRepresentation(sql) => { + ViewRepresentation::Sql(sql) => { if dialects.insert(sql.dialect.as_str()) { FoldWhile::Continue((dialects, None)) } else { @@ -404,7 +404,7 @@ impl ViewMetadataBuilder { } /// Sets the current version id. - pub fn set_current_version_id(mut self, current_version_id: i64) -> Result { + pub fn set_current_version_id(mut self, current_version_id: ViewVersionId) -> Result { if current_version_id == -1 { if let Some(last_added_version) = self.last_added_version { return self.set_current_version_id(last_added_version); @@ -572,7 +572,7 @@ fn sql_dialects_for(view_version: &ViewVersion) -> HashSet { .representations() .iter() .map(|repr| match repr { - ViewRepresentation::SqlViewRepresentation(sql) => sql.dialect.to_lowercase(), + ViewRepresentation::Sql(sql) => sql.dialect.to_lowercase(), }) .collect() } @@ -595,7 +595,7 @@ pub mod view_properties { pub const REPLACE_DROP_DIALECT_ALLOWED_DEFAULT: bool = false; } -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)] #[serde(rename_all = "kebab-case")] /// A log of when each snapshot was made. pub struct ViewVersionLog { @@ -606,18 +606,39 @@ pub struct ViewVersionLog { } impl ViewVersionLog { - /// Returns the last updated timestamp as a DateTime with millisecond precision - pub fn timestamp(&self) -> DateTime { - Utc.timestamp_millis_opt(self.timestamp_ms).unwrap() + #[inline] + /// Creates a new view version log. + pub fn new(version_id: ViewVersionId, timestamp: i64) -> Self { + Self { + version_id, + timestamp_ms: timestamp, + } } /// Returns a new ViewVersionLog with the current timestamp - pub fn now(version_id: i64) -> Self { + pub fn now(version_id: ViewVersionId) -> Self { Self { version_id, timestamp_ms: Utc::now().timestamp_millis(), } } + + /// Returns the version id. + #[inline] + pub fn version_id(&self) -> ViewVersionId { + self.version_id + } + + /// Returns the timestamp in milliseconds from epoch. + #[inline] + pub fn timestamp_ms(&self) -> i64 { + self.timestamp_ms + } + + /// Returns the last updated timestamp as a DateTime with millisecond precision. + pub fn timestamp(self) -> Result> { + timestamp_ms_to_utc(self.timestamp_ms) + } } pub(super) mod _serde { @@ -806,9 +827,10 @@ mod tests { use std::sync::Arc; use anyhow::Result; - use pretty_assertions::assert_eq; use uuid::Uuid; + use pretty_assertions::assert_eq; + use super::{ViewFormatVersion, ViewMetadataBuilder, ViewVersionLog}; use crate::spec::{ NestedField, PrimitiveType, Schema, SqlViewRepresentation, Type, ViewMetadata, diff --git a/crates/iceberg/src/spec/view_version.rs b/crates/iceberg/src/spec/view_version.rs index b30c10bde..d894222d7 100644 --- a/crates/iceberg/src/spec/view_version.rs +++ b/crates/iceberg/src/spec/view_version.rs @@ -130,10 +130,15 @@ impl ViewVersion { } /// A list of view representations. -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)] pub struct ViewRepresentations(pub(crate) Vec); impl ViewRepresentations { + /// Create a new list of view representations. + pub fn new(representations: Vec) -> Self { + Self(representations) + } + #[inline] /// Get the number of representations pub fn len(&self) -> usize { @@ -162,6 +167,41 @@ impl IntoIterator for ViewRepresentations { } } + +/// A builder for [`ViewRepresentations`]. +pub struct ViewRepresentationsBuilder(Vec); + +impl ViewRepresentationsBuilder { + /// Create a new builder. + pub fn new() -> Self { + Self(Vec::new()) + } + + /// Add a representation to the list. + pub fn add_representation(mut self, representation: ViewRepresentation) -> Self { + self.0.push(representation); + self + } + + /// Add a SQL representation to the list. + pub fn add_sql_representation(self, sql: String, dialect: String) -> Self { + self.add_representation(ViewRepresentation::Sql( + SqlViewRepresentation { sql, dialect }, + )) + } + + /// Build the list of representations. + pub fn build(self) -> ViewRepresentations { + ViewRepresentations(self.0) + } +} + +impl Default for ViewRepresentationsBuilder { + fn default() -> Self { + Self::new() + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(tag = "type")] /// View definitions can be represented in multiple ways.