From 0c8ea224e18f3145b47d20715966516ec07ecc9e Mon Sep 17 00:00:00 2001 From: Tobias Puetz Date: Wed, 11 Sep 2024 15:45:25 +0200 Subject: [PATCH] add list io --- Cargo.toml | 3 +- crates/iceberg/Cargo.toml | 1 + crates/iceberg/src/catalog/mod.rs | 2 +- crates/iceberg/src/io/file_io.rs | 39 +++++++++++++++++++++++- crates/iceberg/src/io/mod.rs | 5 ++- crates/iceberg/src/io/storage.rs | 28 ++++++++--------- crates/iceberg/src/io/storage_azdls.rs | 12 +++++--- crates/iceberg/src/lib.rs | 2 +- crates/iceberg/src/spec/view_metadata.rs | 16 +++++----- crates/iceberg/src/spec/view_version.rs | 8 ++--- 10 files changed, 79 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d5a9b759e..8e2f5c9d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,7 +69,8 @@ log = "0.4" mockito = "1" murmur3 = "0.5.2" once_cell = "1" -opendal = { git="https://github.com/twuebi/opendal.git", rev = "a9e3d88e97" } +#opendal = { git="https://github.com/twuebi/opendal.git", rev = "a9e3d88e97" } +opendal = { path = "../opendal/core"} ordered-float = "4" parquet = "52" pilota = "0.11.2" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 47990dddf..ef2a98a9c 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -52,6 +52,7 @@ arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-string = { workspace = true } async-std = { workspace = true, optional = true, features = ["attributes"] } +async-stream = { workspace = true } async-trait = { workspace = true } bimap = { workspace = true } bitvec = { workspace = true } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 375bcf212..d92e3ace6 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -29,7 +29,7 @@ use uuid::Uuid; use crate::spec::{ FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder, TableMetadataBuilder, - UnboundPartitionSpec, ViewRepresentations, ViewVersion + UnboundPartitionSpec, ViewRepresentations, ViewVersion, }; use crate::table::Table; use crate::{Error, ErrorKind, Result}; diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 8f63a9acd..b8fe84777 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -20,7 +20,9 @@ use std::ops::Range; use std::sync::Arc; use bytes::Bytes; -use opendal::Operator; +use futures::Stream; +use futures::stream::BoxStream; +use opendal::{Entry, Operator}; use url::Url; use super::storage::Storage; @@ -73,6 +75,41 @@ impl FileIO { Ok(op.remove_all(relative_path).await?) } + /// Lists all files in the directory. + pub async fn list(&self, path: impl AsRef, recursive: bool) -> Result> { + let (op, relative_path) = self.inner.create_operator(&path)?; + Ok(op.list_with(relative_path).recursive(recursive).await?) + } + + /// Lists all files in the directory with pagination. + pub async fn list_recursive_paginated( + &self, + path: impl AsRef, + recursive: bool, + page_size: usize, + ) -> BoxStream>> { + let path = path.as_ref().to_string(); + Box::pin(async_stream::try_stream! { + + let (op, relative_path) = self.inner.create_operator(&path)?; + + let mut next_future = op.list_with(relative_path).recursive(recursive).limit(page_size); + + loop { + let entries = next_future.await?; + + let last_path = entries.last().map(|e| e.path().to_string()); + yield entries; + + if let Some(last) = last_path { + next_future = op.list_with(relative_path).recursive(recursive).start_after(&last).limit(page_size); + } else { + break + } + } + }) + } + /// Check file exists. pub async fn is_exist(&self, path: impl AsRef) -> Result { let (op, relative_path) = self.inner.create_operator(&path)?; diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 50a3a1f8d..d94968fef 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -79,13 +79,12 @@ mod storage_s3; #[cfg(feature = "storage-s3")] pub use storage_s3::*; pub(crate) mod object_cache; -#[cfg(feature = "storage-fs")] -mod storage_fs; #[cfg(feature = "storage-azdls")] mod storage_azdls; +#[cfg(feature = "storage-fs")] +mod storage_fs; #[cfg(feature = "storage-azdls")] pub use storage_azdls::ConfigKeys as AzdlsConfigKeys; - #[cfg(feature = "storage-fs")] use storage_fs::*; #[cfg(feature = "storage-gcs")] diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index 991dff6ac..95da1128c 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -17,13 +17,14 @@ use std::sync::Arc; +#[cfg(feature = "storage-azdls")] +use opendal::services::AzdlsConfig; #[cfg(feature = "storage-gcs")] use opendal::services::GcsConfig; #[cfg(feature = "storage-s3")] use opendal::services::S3Config; use opendal::{Operator, Scheme}; -#[cfg(feature = "storage-azdls")] -use opendal::services::AzdlsConfig; + #[cfg(feature = "storage-azdls")] use super::storage_azdls; use super::FileIOBuilder; @@ -48,9 +49,7 @@ pub(crate) enum Storage { config: Arc, }, #[cfg(feature = "storage-azdls")] - Azdls { - config: Arc - }, + Azdls { config: Arc }, #[cfg(feature = "storage-gcs")] Gcs { config: Arc }, } @@ -77,12 +76,9 @@ impl Storage { config: super::gcs_config_parse(props)?.into(), }), #[cfg(feature = "storage-azdls")] - Scheme::Azdls => { - - Ok(Self::Azdls { - config: storage_azdls::azdls_config_parse(props)?.into(), - }) - } + Scheme::Azdls => Ok(Self::Azdls { + config: storage_azdls::azdls_config_parse(props)?.into(), + }), _ => Err(Error::new( ErrorKind::FeatureUnsupported, format!("Constructing file io from scheme: {scheme} not supported now",), @@ -162,14 +158,16 @@ impl Storage { } } #[cfg(feature = "storage-azdls")] - Storage::Azdls { config } => { - Ok((Operator::from_config(config.as_ref().clone())?.finish(), &path["azdls://".len()..])) - } + Storage::Azdls { config } => Ok(( + Operator::from_config(config.as_ref().clone())?.finish(), + &path["azdls://".len()..], + )), #[cfg(all( not(feature = "storage-s3"), not(feature = "storage-fs"), not(feature = "storage-gcs"), - not(feature = "storage-azdls")))] + not(feature = "storage-azdls") + ))] _ => Err(Error::new( ErrorKind::FeatureUnsupported, "No storage service has been enabled", diff --git a/crates/iceberg/src/io/storage_azdls.rs b/crates/iceberg/src/io/storage_azdls.rs index 3878e2410..3eba4e008 100644 --- a/crates/iceberg/src/io/storage_azdls.rs +++ b/crates/iceberg/src/io/storage_azdls.rs @@ -1,10 +1,14 @@ use std::collections::HashMap; use std::str::FromStr; + use opendal::services::AzdlsConfig; + use crate::{Error, ErrorKind, Result}; /// Azdls configuration keys with conversions to [`opendal::Operator`] configuration keys. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, strum::EnumString, strum::Display)] +#[derive( + Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, strum::EnumString, strum::Display, +)] #[strum(serialize_all = "snake_case")] pub enum ConfigKeys { /// Az endpoint to use @@ -22,7 +26,7 @@ pub enum ConfigKeys { /// Az filesystem to use, also known as container Filesystem, /// Az authority host, used for client credential flow - AuthorityHost + AuthorityHost, } pub(crate) fn azdls_config_parse(m: HashMap) -> Result { @@ -42,9 +46,9 @@ pub(crate) fn azdls_config_parse(m: HashMap) -> Result cfg.account_key = Some(v), ConfigKeys::AccountName => cfg.account_name = Some(v), ConfigKeys::Filesystem => cfg.filesystem = v, - ConfigKeys::AuthorityHost => cfg.authority_host = Some(v) + ConfigKeys::AuthorityHost => cfg.authority_host = Some(v), } } Ok(cfg) -} \ No newline at end of file +} diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 6afc9ee6d..9b23ed446 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -63,7 +63,7 @@ mod catalog; pub use catalog::{ Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, - TableUpdate, ViewCreation, ViewUpdate + TableUpdate, ViewCreation, ViewUpdate, }; pub mod table; diff --git a/crates/iceberg/src/spec/view_metadata.rs b/crates/iceberg/src/spec/view_metadata.rs index 01b992cb3..3a618acb9 100644 --- a/crates/iceberg/src/spec/view_metadata.rs +++ b/crates/iceberg/src/spec/view_metadata.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use _serde::ViewMetadataEnum; use chrono::{DateTime, Utc}; +use itertools::{FoldWhile, Itertools}; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use uuid::Uuid; @@ -33,11 +34,11 @@ use super::view_version::{ViewVersion, ViewVersionId, ViewVersionRef}; use super::{Schema, SchemaId, SchemaRef, ViewRepresentation}; use crate::catalog::ViewCreation; use crate::error::{timestamp_ms_to_utc, Result}; - +use crate::spec::view_properties::{ + REPLACE_DROP_DIALECT_ALLOWED, REPLACE_DROP_DIALECT_ALLOWED_DEFAULT, VERSION_HISTORY_SIZE, + VERSION_HISTORY_SIZE_DEFAULT, +}; use crate::Error; -use itertools::{FoldWhile, Itertools}; -use crate::spec::view_properties::{REPLACE_DROP_DIALECT_ALLOWED, REPLACE_DROP_DIALECT_ALLOWED_DEFAULT, VERSION_HISTORY_SIZE, VERSION_HISTORY_SIZE_DEFAULT}; - /// Reference to [`ViewMetadata`]. pub type ViewMetadataRef = Arc; @@ -305,7 +306,9 @@ fn is_same_version(a: &ViewVersion, b: &ViewVersion) -> bool { } fn is_same_schema(a: &Schema, b: &Schema) -> bool { - a.as_struct() == b.as_struct() && a.identifier_field_ids().collect::>() == b.identifier_field_ids().collect::>() + a.as_struct() == b.as_struct() + && a.identifier_field_ids().collect::>() + == b.identifier_field_ids().collect::>() } /// Manipulating view metadata. @@ -827,9 +830,8 @@ mod tests { use std::sync::Arc; use anyhow::Result; - use uuid::Uuid; - use pretty_assertions::assert_eq; + use uuid::Uuid; use super::{ViewFormatVersion, ViewMetadataBuilder, ViewVersionLog}; use crate::spec::{ diff --git a/crates/iceberg/src/spec/view_version.rs b/crates/iceberg/src/spec/view_version.rs index d894222d7..c62db9d77 100644 --- a/crates/iceberg/src/spec/view_version.rs +++ b/crates/iceberg/src/spec/view_version.rs @@ -167,7 +167,6 @@ impl IntoIterator for ViewRepresentations { } } - /// A builder for [`ViewRepresentations`]. pub struct ViewRepresentationsBuilder(Vec); @@ -185,9 +184,10 @@ impl ViewRepresentationsBuilder { /// Add a SQL representation to the list. pub fn add_sql_representation(self, sql: String, dialect: String) -> Self { - self.add_representation(ViewRepresentation::Sql( - SqlViewRepresentation { sql, dialect }, - )) + self.add_representation(ViewRepresentation::Sql(SqlViewRepresentation { + sql, + dialect, + })) } /// Build the list of representations.