From a76b366c16da1426cbc743d4d78bfb7cf093295e Mon Sep 17 00:00:00 2001 From: hoslo Date: Wed, 8 Jan 2025 21:28:12 +0800 Subject: [PATCH] feat(core): Implement list with deleted and versions for gcs --- core/src/services/gcs/backend.rs | 20 ++++- core/src/services/gcs/config.rs | 2 + core/src/services/gcs/core.rs | 81 ++++++++++++++++++-- core/src/services/gcs/delete.rs | 8 +- core/src/services/gcs/lister.rs | 125 +++++++++++++++++++++++++++++++ 5 files changed, 225 insertions(+), 11 deletions(-) diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs index c61ff75ca290..e7b081eb0352 100644 --- a/core/src/services/gcs/backend.rs +++ b/core/src/services/gcs/backend.rs @@ -192,6 +192,13 @@ impl GcsBuilder { self } + /// Set bucket versioning status for this backend + pub fn enable_versioning(mut self, enabled: bool) -> Self { + self.config.enable_versioning = enabled; + + self + } + /// Set the predefined acl for GCS. /// /// Available values are: @@ -326,6 +333,7 @@ impl Builder for GcsBuilder { predefined_acl: self.config.predefined_acl.clone(), default_storage_class: self.config.default_storage_class.clone(), allow_anonymous: self.config.allow_anonymous, + enable_versioning: self.config.enable_versioning, }), }; @@ -362,6 +370,7 @@ impl Access for GcsBackend { stat_has_content_md5: true, stat_has_content_length: true, stat_has_content_type: true, + stat_with_version: self.core.enable_versioning, stat_has_last_modified: true, stat_has_user_metadata: true, @@ -369,13 +378,14 @@ impl Access for GcsBackend { read_with_if_match: true, read_with_if_none_match: true, + read_with_version: self.core.enable_versioning, write: true, write_can_empty: true, write_can_multi: true, write_with_content_type: true, write_with_user_metadata: true, - write_with_if_not_exists: true, + write_with_if_not_exists: !self.core.enable_versioning, // The min multipart size of Gcs is 5 MiB. // @@ -392,6 +402,7 @@ impl Access for GcsBackend { delete: true, delete_max_size: Some(100), + delete_with_version: self.core.enable_versioning, copy: true, list: true, @@ -403,6 +414,8 @@ impl Access for GcsBackend { list_has_content_length: true, list_has_content_type: true, list_has_last_modified: true, + list_with_versions: self.core.enable_versioning, + list_with_deleted: self.core.enable_versioning, presign: true, presign_stat: true, @@ -432,6 +445,7 @@ impl Access for GcsBackend { m.set_etag(&meta.etag); m.set_content_md5(&meta.md5_hash); + m.set_version(&meta.generation); let size = meta .size @@ -554,6 +568,10 @@ struct GetObjectJsonResponse { /// /// For example: `"contentType": "image/png",` content_type: String, + /// Generation of this object. + /// + /// For example: `"generation": "1660563214863653"` + generation: String, /// Custom metadata of this object. /// /// For example: `"metadata" : { "my-key": "my-value" }` diff --git a/core/src/services/gcs/config.rs b/core/src/services/gcs/config.rs index 43ff15175942..fb12871dc0ee 100644 --- a/core/src/services/gcs/config.rs +++ b/core/src/services/gcs/config.rs @@ -53,6 +53,8 @@ pub struct GcsConfig { pub disable_vm_metadata: bool, /// Disable loading configuration from the environment. pub disable_config_load: bool, + /// Enable versioning for the bucket. + pub enable_versioning: bool, /// A Google Cloud OAuth2 token. /// /// Takes precedence over `credential` and `credential_path`. diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs index 3e06bf03b575..021d39753c08 100644 --- a/core/src/services/gcs/core.rs +++ b/core/src/services/gcs/core.rs @@ -51,6 +51,9 @@ pub mod constants { pub const X_GOOG_ACL: &str = "x-goog-acl"; pub const X_GOOG_STORAGE_CLASS: &str = "x-goog-storage-class"; pub const X_GOOG_META_PREFIX: &str = "x-goog-meta-"; + pub const IF_GENERATION_MATCH: &str = "ifGenerationMatch"; + pub const X_GOOG_IF_GENERATION_MATCH: &str = "x-goog-if-generation-match"; + pub const GENERATION: &str = "generation"; } pub struct GcsCore { @@ -69,6 +72,7 @@ pub struct GcsCore { pub default_storage_class: Option, pub allow_anonymous: bool, + pub enable_versioning: bool, } impl Debug for GcsCore { @@ -193,6 +197,10 @@ impl GcsCore { let mut req = Request::get(&url); + if let Some(version) = args.version() { + req = req.header(constants::IF_GENERATION_MATCH, version); + } + if let Some(if_match) = args.if_match() { req = req.header(IF_MATCH, if_match); } @@ -216,6 +224,10 @@ impl GcsCore { let mut req = Request::get(&url); + if let Some(version) = args.version() { + req = req.header(constants::X_GOOG_IF_GENERATION_MATCH, version); + } + if let Some(if_match) = args.if_match() { req = req.header(IF_MATCH, if_match); } @@ -372,6 +384,10 @@ impl GcsCore { let mut req = Request::get(&url); + if let Some(version) = args.version() { + req = req.header(constants::IF_GENERATION_MATCH, version); + } + if let Some(if_none_match) = args.if_none_match() { req = req.header(IF_NONE_MATCH, if_none_match); } @@ -397,6 +413,10 @@ impl GcsCore { let mut req = Request::head(&url); + if let Some(version) = args.version() { + req = req.header(constants::X_GOOG_IF_GENERATION_MATCH, version); + } + if let Some(if_none_match) = args.if_none_match() { req = req.header(IF_NONE_MATCH, if_none_match); } @@ -422,14 +442,14 @@ impl GcsCore { self.send(req).await } - pub async fn gcs_delete_object(&self, path: &str) -> Result> { - let mut req = self.gcs_delete_object_request(path)?; + pub async fn gcs_delete_object(&self, path: &str, args: OpDelete) -> Result> { + let mut req = self.gcs_delete_object_request(path, args)?; self.sign(&mut req).await?; self.send(req).await } - pub fn gcs_delete_object_request(&self, path: &str) -> Result> { + pub fn gcs_delete_object_request(&self, path: &str, args: OpDelete) -> Result> { let p = build_abs_path(&self.root, path); let url = format!( @@ -444,13 +464,13 @@ impl GcsCore { .map_err(new_request_build_error) } - pub async fn gcs_delete_objects(&self, paths: Vec) -> Result> { + pub async fn gcs_delete_objects(&self, batch: Vec<(String, OpDelete)>) -> Result> { let uri = format!("{}/batch/storage/v1", self.endpoint); let mut multipart = Multipart::new(); - for (idx, path) in paths.iter().enumerate() { - let req = self.gcs_delete_object_request(path)?; + for (idx, (path, args)) in batch.iter().enumerate() { + let req = self.gcs_delete_object_request(path, args.clone())?; multipart = multipart.part( MixedPart::from_request(req).part_header("content-id".parse().unwrap(), idx.into()), @@ -534,6 +554,54 @@ impl GcsCore { self.send(req).await } + pub async fn gcs_list_object_versions( + &self, + path: &str, + page_token: &str, + delimiter: &str, + limit: Option, + start_after: Option, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let mut url = format!( + "{}/storage/v1/b/{}/o?versions=true&prefix={}", + self.endpoint, + self.bucket, + percent_encode_path(&p) + ); + if !delimiter.is_empty() { + write!(url, "&delimiter={delimiter}").expect("write into string must succeed"); + } + if let Some(limit) = limit { + write!(url, "&maxResults={limit}").expect("write into string must succeed"); + } + if let Some(start_after) = start_after { + let start_after = build_abs_path(&self.root, &start_after); + write!(url, "&startOffset={}", percent_encode_path(&start_after)) + .expect("write into string must succeed"); + } + + if !page_token.is_empty() { + // NOTE: + // + // GCS uses pageToken in request and nextPageToken in response + // + // Don't know how will those tokens be like so this part are copied + // directly from AWS S3 service. + write!(url, "&pageToken={}", percent_encode_path(page_token)) + .expect("write into string must succeed"); + } + + let mut req = Request::get(&url) + .body(Buffer::new()) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + pub async fn gcs_initiate_multipart_upload(&self, path: &str) -> Result> { let p = build_abs_path(&self.root, path); @@ -681,6 +749,7 @@ pub struct ListResponseItem { pub md5_hash: String, pub updated: String, pub content_type: String, + pub time_deleted: Option, } /// Result of CreateMultipartUpload diff --git a/core/src/services/gcs/delete.rs b/core/src/services/gcs/delete.rs index 241b6152edc3..03968fd36228 100644 --- a/core/src/services/gcs/delete.rs +++ b/core/src/services/gcs/delete.rs @@ -34,8 +34,8 @@ impl GcsDeleter { } impl oio::BatchDelete for GcsDeleter { - async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { - let resp = self.core.gcs_delete_object(&path).await?; + async fn delete_once(&self, path: String, args: OpDelete) -> Result<()> { + let resp = self.core.gcs_delete_object(&path, args).await?; // deleting not existing objects is ok if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND { @@ -46,8 +46,8 @@ impl oio::BatchDelete for GcsDeleter { } async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result { - let paths: Vec = batch.into_iter().map(|(p, _)| p).collect(); - let resp = self.core.gcs_delete_objects(paths.clone()).await?; + let paths: Vec = batch.clone().into_iter().map(|(p, _)| p).collect(); + let resp = self.core.gcs_delete_objects(batch).await?; let status = resp.status(); diff --git a/core/src/services/gcs/lister.rs b/core/src/services/gcs/lister.rs index cd66e964f77b..682974f91e70 100644 --- a/core/src/services/gcs/lister.rs +++ b/core/src/services/gcs/lister.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use bytes::Buf; +use quick_xml::se; use serde_json; use super::core::*; @@ -134,3 +135,127 @@ impl oio::PageList for GcsLister { Ok(()) } } + +pub struct GcsObjectVersionsLister { + core: Arc, + + path: String, + args: OpList, + delimiter: &'static str, + limit: Option, + + /// Filter results to objects whose names are lexicographically + /// **equal to or after** startOffset + start_after: Option, +} + +impl GcsObjectVersionsLister { + /// Generate a new directory walker + pub fn new( + core: Arc, + path: &str, + args: OpList, + recursive: bool, + limit: Option, + start_after: Option<&str>, + ) -> Self { + let delimiter = if recursive { "" } else { "/" }; + Self { + core, + + path: path.to_string(), + args, + delimiter, + limit, + start_after: start_after.map(String::from), + } + } +} + +impl oio::PageList for GcsObjectVersionsLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { + let resp = self + .core + .gcs_list_object_versions( + &self.path, + &ctx.token, + self.delimiter, + self.limit, + if ctx.token.is_empty() { + self.start_after.clone() + } else { + None + }, + ) + .await?; + + if !resp.status().is_success() { + return Err(parse_error(resp)); + } + let bytes = resp.into_body(); + + let output: ListResponse = + serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?; + + if let Some(token) = &output.next_page_token { + ctx.token.clone_from(token); + } else { + ctx.done = true; + } + + for prefix in output.prefixes { + let de = oio::Entry::new( + &build_rel_path(&self.core.root, &prefix), + Metadata::new(EntryMode::DIR), + ); + + ctx.entries.push_back(de); + } + + let mut item_map = std::collections::HashMap::new(); + for object in output.items { + // exclude the inclusive start_after itself + let mut path = build_rel_path(&self.core.root, &object.name); + if path.is_empty() { + path = "/".to_string(); + } + if self.start_after.as_ref() == Some(&path) { + continue; + } + + let mut meta = Metadata::new(EntryMode::from_path(&path)); + + // set metadata fields + meta.set_content_md5(object.md5_hash.as_str()); + meta.set_etag(object.etag.as_str()); + + let size = object.size.parse().map_err(|e| { + Error::new(ErrorKind::Unexpected, "parse u64 from list response").set_source(e) + })?; + meta.set_content_length(size); + if !object.content_type.is_empty() { + meta.set_content_type(&object.content_type); + } + + meta.set_last_modified(parse_datetime_from_rfc3339(object.updated.as_str())?); + if object.time_deleted.is_some() { + meta.set_is_deleted(true); + } else { + meta.set_is_current(true); + } + + item_map.insert(path, meta); + } + for (path, meta) in item_map { + // `list` must be additive, so we need to include the latest version object + // + // If `deleted` is true, we include all deleted objects. + if (self.args.deleted() && meta.is_deleted()) || meta.is_current() == Some(true) { + let de = oio::Entry::with(path, meta); + ctx.entries.push_back(de); + } + } + + Ok(()) + } +}