Skip to content

Commit

Permalink
support azdls
Browse files Browse the repository at this point in the history
  • Loading branch information
twuebi committed Aug 13, 2024
1 parent 47626ad commit f55470b
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 21 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ log = "^0.4"
mockito = "^1"
murmur3 = "0.5.2"
once_cell = "1"
opendal = "0.48"
opendal = { git="https://github.com/twuebi/opendal.git", branch="tp/azdls-client-secret" }
ordered-float = "4.0.0"
parquet = "52"
pilota = "0.11.2"
Expand All @@ -85,6 +85,7 @@ serde_derive = "^1.0"
serde_json = "^1.0"
serde_repr = "0.1.16"
serde_with = "3.4.0"
strum = "0.26.3"
tempfile = "3.8"
tokio = { version = "1", default-features = false }
typed-builder = "^0.19"
Expand Down
4 changes: 3 additions & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ keywords = ["iceberg"]

[features]
default = ["storage-memory", "storage-fs", "storage-s3", "tokio"]
storage-all = ["storage-memory", "storage-fs", "storage-s3"]
storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-azdls"]

storage-memory = ["opendal/services-memory"]
storage-fs = ["opendal/services-fs"]
storage-s3 = ["opendal/services-s3"]
storage-azdls = ["opendal/services-azdls"]

async-std = ["dep:async-std"]
tokio = ["dep:tokio"]
Expand Down Expand Up @@ -73,6 +74,7 @@ serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_repr = { workspace = true }
serde_with = { workspace = true }
strum = { workspace = true }
tokio = { workspace = true, optional = true }
typed-builder = { workspace = true }
url = { workspace = true }
Expand Down
6 changes: 6 additions & 0 deletions crates/iceberg/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ impl FileIO {
Ok(op.delete(relative_path).await?)
}

/// Deletes all files in the directory.
pub async fn remove_all(&self, path: impl AsRef<str>) -> Result<()> {
let (op, relative_path) = self.inner.create_operator(&path)?;
Ok(op.remove_all(relative_path).await?)
}

/// Check file exists.
pub async fn is_exist(&self, path: impl AsRef<str>) -> Result<bool> {
let (op, relative_path) = self.inner.create_operator(&path)?;
Expand Down
5 changes: 5 additions & 0 deletions crates/iceberg/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,10 @@ mod storage_s3;
pub use storage_s3::*;
#[cfg(feature = "storage-fs")]
mod storage_fs;
#[cfg(feature = "storage-azdls")]
mod storage_azdls;
#[cfg(feature = "storage-azdls")]
pub use storage_azdls::ConfigKeys as AzdlsConfigKeys;

#[cfg(feature = "storage-fs")]
use storage_fs::*;
21 changes: 20 additions & 1 deletion crates/iceberg/src/io/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use std::sync::Arc;
#[cfg(feature = "storage-s3")]
use opendal::services::S3Config;
use opendal::{Operator, Scheme};

#[cfg(feature = "storage-azdls")]
use opendal::services::AzdlsConfig;
#[cfg(feature = "storage-azdls")]
use super::storage_azdls;
use super::FileIOBuilder;
use crate::{Error, ErrorKind};

Expand All @@ -38,6 +41,10 @@ pub(crate) enum Storage {
scheme_str: String,
config: Arc<S3Config>,
},
#[cfg(feature = "storage-azdls")]
Azdls {
config: Arc<AzdlsConfig>
},
}

impl Storage {
Expand All @@ -56,6 +63,13 @@ impl Storage {
scheme_str,
config: super::s3_config_parse(props)?.into(),
}),
#[cfg(feature = "storage-azdls")]
Scheme::Azdls => {

Ok(Self::Azdls {
config: storage_azdls::azdls_config_parse(props)?.into(),
})
}
_ => Err(Error::new(
ErrorKind::FeatureUnsupported,
format!("Constructing file io from scheme: {scheme} not supported now",),
Expand Down Expand Up @@ -117,6 +131,10 @@ impl Storage {
))
}
}
#[cfg(feature = "storage-azdls")]
Storage::Azdls { config } => {
Ok((Operator::from_config(config.as_ref().clone())?.finish(), &path["azdls://".len()..]))
}
#[cfg(all(not(feature = "storage-s3"), not(feature = "storage-fs")))]
_ => Err(Error::new(
ErrorKind::FeatureUnsupported,
Expand All @@ -131,6 +149,7 @@ impl Storage {
"memory" => Ok(Scheme::Memory),
"file" | "" => Ok(Scheme::Fs),
"s3" | "s3a" => Ok(Scheme::S3),
"azdls" => Ok(Scheme::Azdls),
s => Ok(s.parse::<Scheme>()?),
}
}
Expand Down
50 changes: 50 additions & 0 deletions crates/iceberg/src/io/storage_azdls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::collections::HashMap;
use std::str::FromStr;
use opendal::services::AzdlsConfig;
use crate::{Error, ErrorKind, Result};

/// Azdls configuration keys with conversions to [`opendal::Operator`] configuration keys.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, strum::EnumString, strum::Display)]
#[strum(serialize_all = "snake_case")]
pub enum ConfigKeys {
/// Az endpoint to use
Endpoint,
/// Az client id, used for client credential flow, created in microsoft app registration
ClientId,
/// Az client secret, used for client credential flow, created in microsoft app registration
ClientSecret,
/// Az tenant id, required for client credential flow
TenantId,
/// Az account key, used for shared key authentication
AccountKey,
/// Az storage account name
AccountName,
/// Az filesystem to use, also known as container
Filesystem,
/// Az authority host, used for client credential flow
AuthorityHost
}

pub(crate) fn azdls_config_parse(m: HashMap<String, String>) -> Result<AzdlsConfig> {
let mut cfg = AzdlsConfig::default();
for (k, v) in m.into_iter() {
let config_key = ConfigKeys::from_str(k.as_str()).map_err(|_| {
Error::new(
ErrorKind::DataInvalid,
format!("Invalid azdls config key: {}", k),
)
})?;
match config_key {
ConfigKeys::Endpoint => cfg.endpoint = Some(v),
ConfigKeys::ClientId => cfg.client_id = Some(v),
ConfigKeys::ClientSecret => cfg.client_secret = Some(v),
ConfigKeys::TenantId => cfg.tenant_id = Some(v),
ConfigKeys::AccountKey => cfg.account_key = Some(v),
ConfigKeys::AccountName => cfg.account_name = Some(v),
ConfigKeys::Filesystem => cfg.filesystem = v,
ConfigKeys::AuthorityHost => cfg.authority_host = Some(v)
}
}

Ok(cfg)
}
56 changes: 39 additions & 17 deletions crates/iceberg/src/spec/view_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! The main struct here is [ViewMetadata] which defines the data for a view.
use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::sync::Arc;

Expand All @@ -30,7 +30,7 @@ use serde_repr::{Deserialize_repr, Serialize_repr};
use uuid::Uuid;

use super::view_version::{ViewVersion, ViewVersionId, ViewVersionRef};
use super::{SchemaId, SchemaRef};
use super::{Schema, SchemaId, SchemaRef, ViewRepresentation};
use crate::catalog::ViewCreation;
use crate::error::{timestamp_ms_to_utc, Result};

Expand Down Expand Up @@ -58,12 +58,12 @@ pub struct ViewMetadata {
/// The view's base location; used to create metadata file locations
pub location: String,
/// ID of the current version of the view (version-id)
pub(crate) current_version_id: ViewVersionId,
pub current_version_id: ViewVersionId,
/// A list of known versions of the view
pub(crate) versions: HashMap<ViewVersionId, ViewVersionRef>,
pub versions: HashMap<ViewVersionId, ViewVersionRef>,
/// A list of version log entries with the timestamp and version-id for every
/// change to current-version-id
pub(crate) version_log: Vec<ViewVersionLog>,
pub version_log: Vec<ViewVersionLog>,
/// A list of schemas, stored as objects with schema-id.
pub schemas: HashMap<i32, SchemaRef>,
/// A string to string map of view properties.
Expand Down Expand Up @@ -144,7 +144,7 @@ impl ViewMetadata {
}

/// Append view version to view
fn add_version(&mut self, view_version: ViewVersion) -> Result<AddedOrPresent<i64>> {
fn add_version(&mut self, view_version: ViewVersion) -> Result<AddedOrPresent<ViewVersionId>> {
if self.versions.contains_key(&view_version.version_id()) {
return Err(crate::Error::new(
crate::ErrorKind::DataInvalid,
Expand Down Expand Up @@ -305,14 +305,14 @@ fn is_same_version(a: &ViewVersion, b: &ViewVersion) -> bool {
}

fn is_same_schema(a: &Schema, b: &Schema) -> bool {
a.as_struct() == b.as_struct() && a.identifier_field_ids() == b.identifier_field_ids()
a.as_struct() == b.as_struct() && a.identifier_field_ids().collect::<HashSet<_>>() == b.identifier_field_ids().collect::<HashSet<_>>()
}

/// Manipulating view metadata.
pub struct ViewMetadataBuilder {
previous: ViewVersionRef,
metadata: ViewMetadata,
last_added_version: Option<i64>,
last_added_version: Option<ViewVersionId>,
last_added_schema: Option<i32>,
added_versions: usize, // TODO: Update tracking needed?
}
Expand Down Expand Up @@ -350,7 +350,7 @@ impl ViewMetadataBuilder {
}

let (_, maybe_err) = version.as_ref().representations().iter().fold_while((HashSet::new(), None), |(mut dialects, _), r| match r {
ViewRepresentation::SqlViewRepresentation(sql) => {
ViewRepresentation::Sql(sql) => {
if dialects.insert(sql.dialect.as_str()) {
FoldWhile::Continue((dialects, None))
} else {
Expand Down Expand Up @@ -404,7 +404,7 @@ impl ViewMetadataBuilder {
}

/// Sets the current version id.
pub fn set_current_version_id(mut self, current_version_id: i64) -> Result<Self> {
pub fn set_current_version_id(mut self, current_version_id: ViewVersionId) -> Result<Self> {
if current_version_id == -1 {
if let Some(last_added_version) = self.last_added_version {
return self.set_current_version_id(last_added_version);
Expand Down Expand Up @@ -572,7 +572,7 @@ fn sql_dialects_for(view_version: &ViewVersion) -> HashSet<String> {
.representations()
.iter()
.map(|repr| match repr {
ViewRepresentation::SqlViewRepresentation(sql) => sql.dialect.to_lowercase(),
ViewRepresentation::Sql(sql) => sql.dialect.to_lowercase(),
})
.collect()
}
Expand All @@ -595,7 +595,7 @@ pub mod view_properties {
pub const REPLACE_DROP_DIALECT_ALLOWED_DEFAULT: bool = false;
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)]
#[serde(rename_all = "kebab-case")]
/// A log of when each snapshot was made.
pub struct ViewVersionLog {
Expand All @@ -606,18 +606,39 @@ pub struct ViewVersionLog {
}

impl ViewVersionLog {
/// Returns the last updated timestamp as a DateTime<Utc> with millisecond precision
pub fn timestamp(&self) -> DateTime<Utc> {
Utc.timestamp_millis_opt(self.timestamp_ms).unwrap()
#[inline]
/// Creates a new view version log.
pub fn new(version_id: ViewVersionId, timestamp: i64) -> Self {
Self {
version_id,
timestamp_ms: timestamp,
}
}

/// Returns a new ViewVersionLog with the current timestamp
pub fn now(version_id: i64) -> Self {
pub fn now(version_id: ViewVersionId) -> Self {
Self {
version_id,
timestamp_ms: Utc::now().timestamp_millis(),
}
}

/// Returns the version id.
#[inline]
pub fn version_id(&self) -> ViewVersionId {
self.version_id
}

/// Returns the timestamp in milliseconds from epoch.
#[inline]
pub fn timestamp_ms(&self) -> i64 {
self.timestamp_ms
}

/// Returns the last updated timestamp as a DateTime<Utc> with millisecond precision.
pub fn timestamp(self) -> Result<DateTime<Utc>> {
timestamp_ms_to_utc(self.timestamp_ms)
}
}

pub(super) mod _serde {
Expand Down Expand Up @@ -806,9 +827,10 @@ mod tests {
use std::sync::Arc;

use anyhow::Result;
use pretty_assertions::assert_eq;
use uuid::Uuid;

use pretty_assertions::assert_eq;

use super::{ViewFormatVersion, ViewMetadataBuilder, ViewVersionLog};
use crate::spec::{
NestedField, PrimitiveType, Schema, SqlViewRepresentation, Type, ViewMetadata,
Expand Down
42 changes: 41 additions & 1 deletion crates/iceberg/src/spec/view_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,15 @@ impl ViewVersion {
}

/// A list of view representations.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
pub struct ViewRepresentations(pub(crate) Vec<ViewRepresentation>);

impl ViewRepresentations {
/// Create a new list of view representations.
pub fn new(representations: Vec<ViewRepresentation>) -> Self {
Self(representations)
}

#[inline]
/// Get the number of representations
pub fn len(&self) -> usize {
Expand Down Expand Up @@ -162,6 +167,41 @@ impl IntoIterator for ViewRepresentations {
}
}


/// A builder for [`ViewRepresentations`].
pub struct ViewRepresentationsBuilder(Vec<ViewRepresentation>);

impl ViewRepresentationsBuilder {
/// Create a new builder.
pub fn new() -> Self {
Self(Vec::new())
}

/// Add a representation to the list.
pub fn add_representation(mut self, representation: ViewRepresentation) -> Self {
self.0.push(representation);
self
}

/// Add a SQL representation to the list.
pub fn add_sql_representation(self, sql: String, dialect: String) -> Self {
self.add_representation(ViewRepresentation::Sql(
SqlViewRepresentation { sql, dialect },
))
}

/// Build the list of representations.
pub fn build(self) -> ViewRepresentations {
ViewRepresentations(self.0)
}
}

impl Default for ViewRepresentationsBuilder {
fn default() -> Self {
Self::new()
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(tag = "type")]
/// View definitions can be represented in multiple ways.
Expand Down

0 comments on commit f55470b

Please sign in to comment.