Skip to content

Commit

Permalink
feat(core): Implement list with deleted and versions for gcs
Browse files Browse the repository at this point in the history
  • Loading branch information
hoslo committed Jan 15, 2025
1 parent a8b793b commit 4e9999d
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 24 deletions.
49 changes: 38 additions & 11 deletions core/src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use bytes::Buf;
use http::Response;
use http::StatusCode;
use log::debug;
use oio::PageLister;
use reqsign::GoogleCredentialLoader;
use reqsign::GoogleSigner;
use reqsign::GoogleTokenLoad;
Expand All @@ -34,7 +35,7 @@ use serde_json;
use super::core::*;
use super::delete::GcsDeleter;
use super::error::parse_error;
use super::lister::GcsLister;
use super::lister::{GcsLister, GcsListers, GcsObjectVersionsLister};
use super::writer::GcsWriter;
use super::writer::GcsWriters;
use crate::raw::oio::BatchDeleter;
Expand Down Expand Up @@ -192,6 +193,13 @@ impl GcsBuilder {
self
}

/// Set bucket versioning status for this backend
pub fn enable_versioning(mut self, enabled: bool) -> Self {
self.config.enable_versioning = enabled;

self
}

/// Set the predefined acl for GCS.
///
/// Available values are:
Expand Down Expand Up @@ -326,6 +334,7 @@ impl Builder for GcsBuilder {
predefined_acl: self.config.predefined_acl.clone(),
default_storage_class: self.config.default_storage_class.clone(),
allow_anonymous: self.config.allow_anonymous,
enable_versioning: self.config.enable_versioning,
}),
};

Expand All @@ -342,7 +351,7 @@ pub struct GcsBackend {
impl Access for GcsBackend {
type Reader = HttpBody;
type Writer = GcsWriters;
type Lister = oio::PageLister<GcsLister>;
type Lister = GcsListers;
type Deleter = oio::BatchDeleter<GcsDeleter>;
type BlockingReader = ();
type BlockingWriter = ();
Expand All @@ -362,20 +371,22 @@ impl Access for GcsBackend {
stat_has_content_md5: true,
stat_has_content_length: true,
stat_has_content_type: true,
stat_with_version: self.core.enable_versioning,
stat_has_last_modified: true,
stat_has_user_metadata: true,

read: true,

read_with_if_match: true,
read_with_if_none_match: true,
read_with_version: self.core.enable_versioning,

write: true,
write_can_empty: true,
write_can_multi: true,
write_with_content_type: true,
write_with_user_metadata: true,
write_with_if_not_exists: true,
write_with_if_not_exists: !self.core.enable_versioning,

// The min multipart size of Gcs is 5 MiB.
//
Expand All @@ -392,6 +403,7 @@ impl Access for GcsBackend {

delete: true,
delete_max_size: Some(100),
delete_with_version: self.core.enable_versioning,
copy: true,

list: true,
Expand All @@ -403,6 +415,8 @@ impl Access for GcsBackend {
list_has_content_length: true,
list_has_content_type: true,
list_has_last_modified: true,
list_with_versions: self.core.enable_versioning,
list_with_deleted: self.core.enable_versioning,

presign: true,
presign_stat: true,
Expand Down Expand Up @@ -432,6 +446,7 @@ impl Access for GcsBackend {

m.set_etag(&meta.etag);
m.set_content_md5(&meta.md5_hash);
m.set_version(&meta.generation);

let size = meta
.size
Expand Down Expand Up @@ -485,15 +500,23 @@ impl Access for GcsBackend {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let l = GcsLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
);
let l = if args.versions() || args.deleted() {
TwoWays::Two(PageLister::new(GcsObjectVersionsLister::new(
self.core.clone(),
path,
args,
)))
} else {
TwoWays::One(PageLister::new(GcsLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
)))
};

Ok((RpList::default(), oio::PageLister::new(l)))
Ok((RpList::default(), l))
}

async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
Expand Down Expand Up @@ -554,6 +577,10 @@ struct GetObjectJsonResponse {
///
/// For example: `"contentType": "image/png",`
content_type: String,
/// Generation of this object.
///
/// For example: `"generation": "1660563214863653"`
generation: String,
/// Custom metadata of this object.
///
/// For example: `"metadata" : { "my-key": "my-value" }`
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/gcs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub struct GcsConfig {
pub disable_vm_metadata: bool,
/// Disable loading configuration from the environment.
pub disable_config_load: bool,
/// Enable versioning for the bucket.
pub enable_versioning: bool,
/// A Google Cloud OAuth2 token.
///
/// Takes precedence over `credential` and `credential_path`.
Expand Down
94 changes: 85 additions & 9 deletions core/src/services/gcs/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ pub mod constants {
pub const X_GOOG_ACL: &str = "x-goog-acl";
pub const X_GOOG_STORAGE_CLASS: &str = "x-goog-storage-class";
pub const X_GOOG_META_PREFIX: &str = "x-goog-meta-";
pub const IF_GENERATION_MATCH: &str = "ifGenerationMatch";
pub const X_GOOG_IF_GENERATION_MATCH: &str = "x-goog-if-generation-match";
pub const GENERATION: &str = "generation";
}

pub struct GcsCore {
Expand All @@ -69,6 +72,7 @@ pub struct GcsCore {
pub default_storage_class: Option<String>,

pub allow_anonymous: bool,
pub enable_versioning: bool,
}

impl Debug for GcsCore {
Expand Down Expand Up @@ -193,6 +197,10 @@ impl GcsCore {

let mut req = Request::get(&url);

if let Some(version) = args.version() {
req = req.header(constants::GENERATION, version);
}

if let Some(if_match) = args.if_match() {
req = req.header(IF_MATCH, if_match);
}
Expand All @@ -216,6 +224,10 @@ impl GcsCore {

let mut req = Request::get(&url);

if let Some(version) = args.version() {
req = req.header(constants::X_GOOG_IF_GENERATION_MATCH, version);
}

if let Some(if_match) = args.if_match() {
req = req.header(IF_MATCH, if_match);
}
Expand Down Expand Up @@ -372,6 +384,10 @@ impl GcsCore {

let mut req = Request::get(&url);

if let Some(version) = args.version() {
req = req.header(constants::IF_GENERATION_MATCH, version);
}

if let Some(if_none_match) = args.if_none_match() {
req = req.header(IF_NONE_MATCH, if_none_match);
}
Expand All @@ -397,6 +413,10 @@ impl GcsCore {

let mut req = Request::head(&url);

if let Some(version) = args.version() {
req = req.header(constants::X_GOOG_IF_GENERATION_MATCH, version);
}

if let Some(if_none_match) = args.if_none_match() {
req = req.header(IF_NONE_MATCH, if_none_match);
}
Expand All @@ -422,14 +442,14 @@ impl GcsCore {
self.send(req).await
}

pub async fn gcs_delete_object(&self, path: &str) -> Result<Response<Buffer>> {
let mut req = self.gcs_delete_object_request(path)?;
pub async fn gcs_delete_object(&self, path: &str, args: OpDelete) -> Result<Response<Buffer>> {
let mut req = self.gcs_delete_object_request(path, args)?;

self.sign(&mut req).await?;
self.send(req).await
}

pub fn gcs_delete_object_request(&self, path: &str) -> Result<Request<Buffer>> {
pub fn gcs_delete_object_request(&self, path: &str, args: OpDelete) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);

let url = format!(
Expand All @@ -439,18 +459,25 @@ impl GcsCore {
percent_encode_path(&p)
);

Request::delete(&url)
.body(Buffer::new())
.map_err(new_request_build_error)
let mut req = Request::delete(&url);

if let Some(version) = args.version() {
req = req.header(constants::GENERATION, version);
}

req.body(Buffer::new()).map_err(new_request_build_error)
}

pub async fn gcs_delete_objects(&self, paths: Vec<String>) -> Result<Response<Buffer>> {
pub async fn gcs_delete_objects(
&self,
batch: Vec<(String, OpDelete)>,
) -> Result<Response<Buffer>> {
let uri = format!("{}/batch/storage/v1", self.endpoint);

let mut multipart = Multipart::new();

for (idx, path) in paths.iter().enumerate() {
let req = self.gcs_delete_object_request(path)?;
for (idx, (path, args)) in batch.iter().enumerate() {
let req = self.gcs_delete_object_request(path, args.clone())?;

multipart = multipart.part(
MixedPart::from_request(req).part_header("content-id".parse().unwrap(), idx.into()),
Expand Down Expand Up @@ -534,6 +561,54 @@ impl GcsCore {
self.send(req).await
}

pub async fn gcs_list_object_versions(
&self,
path: &str,
page_token: &str,
delimiter: &str,
limit: Option<usize>,
start_after: Option<String>,
) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);

let mut url = format!(
"{}/storage/v1/b/{}/o?versions=true&prefix={}",
self.endpoint,
self.bucket,
percent_encode_path(&p)
);
if !delimiter.is_empty() {
write!(url, "&delimiter={delimiter}").expect("write into string must succeed");
}
if let Some(limit) = limit {
write!(url, "&maxResults={limit}").expect("write into string must succeed");
}
if let Some(start_after) = start_after {
let start_after = build_abs_path(&self.root, &start_after);
write!(url, "&startOffset={}", percent_encode_path(&start_after))
.expect("write into string must succeed");
}

if !page_token.is_empty() {
// NOTE:
//
// GCS uses pageToken in request and nextPageToken in response
//
// Don't know how will those tokens be like so this part are copied
// directly from AWS S3 service.
write!(url, "&pageToken={}", percent_encode_path(page_token))
.expect("write into string must succeed");
}

let mut req = Request::get(&url)
.body(Buffer::new())
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;

self.send(req).await
}

pub async fn gcs_initiate_multipart_upload(&self, path: &str) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);

Expand Down Expand Up @@ -681,6 +756,7 @@ pub struct ListResponseItem {
pub md5_hash: String,
pub updated: String,
pub content_type: String,
pub time_deleted: Option<String>,
}

/// Result of CreateMultipartUpload
Expand Down
8 changes: 4 additions & 4 deletions core/src/services/gcs/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ impl GcsDeleter {
}

impl oio::BatchDelete for GcsDeleter {
async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
let resp = self.core.gcs_delete_object(&path).await?;
async fn delete_once(&self, path: String, args: OpDelete) -> Result<()> {
let resp = self.core.gcs_delete_object(&path, args).await?;

// deleting not existing objects is ok
if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND {
Expand All @@ -46,8 +46,8 @@ impl oio::BatchDelete for GcsDeleter {
}

async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result<BatchDeleteResult> {
let paths: Vec<String> = batch.into_iter().map(|(p, _)| p).collect();
let resp = self.core.gcs_delete_objects(paths.clone()).await?;
let paths: Vec<String> = batch.clone().into_iter().map(|(p, _)| p).collect();
let resp = self.core.gcs_delete_objects(batch).await?;

let status = resp.status();

Expand Down
Loading

0 comments on commit 4e9999d

Please sign in to comment.