From 4c4dd60534ae3521fa34d6ba45b4d0ff61f2e54e Mon Sep 17 00:00:00 2001 From: Armando Zhu Date: Sat, 22 Jul 2023 23:11:28 -0700 Subject: [PATCH] feat(service/cos): add multipart upload function support --- core/src/services/cos/backend.rs | 35 ++++-- core/src/services/cos/core.rs | 178 +++++++++++++++++++++++++++ core/src/services/cos/writer.rs | 198 +++++++++++++++++++++++++++++-- 3 files changed, 395 insertions(+), 16 deletions(-) diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index 2cae02b2554c..d83a8a3761e2 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -35,6 +35,8 @@ use crate::raw::*; use crate::services::cos::appender::CosAppender; use crate::*; +const DEFAULT_WRITE_MIN_SIZE: usize = 1024 * 1024; + /// Tencent-Cloud COS services support. #[doc = include_str!("docs.md")] #[derive(Default, Clone)] @@ -46,6 +48,10 @@ pub struct CosBuilder { bucket: Option, http_client: Option, + /// the part size of cos multipart upload, which should be 1 MB to 5 GB. + /// There is no minimum size limit on the last part of your multipart upload + write_min_size: Option, + disable_config_load: bool, } @@ -120,6 +126,14 @@ impl CosBuilder { self } + /// set the minimum size of unsized write, it should be greater than 1 MB. + /// Reference: [Upload Part | Tencent Cloud](https://www.tencentcloud.com/document/product/436/7750) + pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self { + self.write_min_size = Some(write_min_size); + + self + } + /// Disable config load so that opendal will not load config from /// environment. /// @@ -155,6 +169,8 @@ impl Builder for CosBuilder { map.get("endpoint").map(|v| builder.endpoint(v)); map.get("secret_id").map(|v| builder.secret_id(v)); map.get("secret_key").map(|v| builder.secret_key(v)); + map.get("write_min_size") + .map(|v| builder.write_min_size(v.parse().expect("input must be a number"))); builder } @@ -218,6 +234,14 @@ impl Builder for CosBuilder { let cred_loader = TencentCosCredentialLoader::new(client.client(), cfg); let signer = TencentCosSigner::new(); + let write_min_size = self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE); + if write_min_size < 1024 * 1024 { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "The write minimum buffer size is misconfigured", + ) + .with_context("service", Scheme::Cos)); + } debug!("backend build finished"); Ok(CosBackend { @@ -228,6 +252,7 @@ impl Builder for CosBuilder { signer, loader: cred_loader, client, + write_min_size, }), }) } @@ -269,6 +294,7 @@ impl Accessor for CosBackend { write_can_sink: true, write_with_content_type: true, write_with_cache_control: true, + write_without_content_length: true, append: true, append_with_cache_control: true, @@ -332,16 +358,9 @@ impl Accessor for CosBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - Ok(( RpWrite::default(), - CosWriter::new(self.core.clone(), args, path.to_string()), + CosWriter::new(self.core.clone(), path, args), )) } diff --git a/core/src/services/cos/core.rs b/core/src/services/cos/core.rs index 0d54b70b3492..56081cb18abf 100644 --- a/core/src/services/cos/core.rs +++ b/core/src/services/cos/core.rs @@ -19,6 +19,7 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::time::Duration; +use bytes::Bytes; use http::header::CACHE_CONTROL; use http::header::CONTENT_DISPOSITION; use http::header::CONTENT_LENGTH; @@ -30,6 +31,8 @@ use http::Response; use reqsign::TencentCosCredential; use reqsign::TencentCosCredentialLoader; use reqsign::TencentCosSigner; +use serde::Deserialize; +use serde::Serialize; use crate::raw::*; use crate::*; @@ -42,6 +45,7 @@ pub struct CosCore { pub signer: TencentCosSigner, pub loader: TencentCosCredentialLoader, pub client: HttpClient, + pub write_min_size: usize, } impl Debug for CosCore { @@ -324,4 +328,178 @@ impl CosCore { self.send(req).await } + + pub async fn cos_initiate_multipart_upload( + &self, + path: &str, + content_type: Option<&str>, + content_disposition: Option<&str>, + cache_control: Option<&str>, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p)); + + let mut req = Request::post(&url); + + if let Some(mime) = content_type { + req = req.header(CONTENT_TYPE, mime) + } + + if let Some(content_disposition) = content_disposition { + req = req.header(CONTENT_DISPOSITION, content_disposition) + } + + if let Some(cache_control) = cache_control { + req = req.header(CACHE_CONTROL, cache_control) + } + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn cos_upload_part_request( + &self, + path: &str, + upload_id: &str, + part_number: usize, + size: Option, + body: AsyncBody, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/{}?partNumber={}&uploadId={}", + self.endpoint, + percent_encode_path(&p), + part_number, + percent_encode_path(upload_id) + ); + + let mut req = Request::put(&url); + + if let Some(size) = size { + req = req.header(CONTENT_LENGTH, size); + } + + // Set body + let mut req = req.body(body).map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn cos_complete_multipart_upload( + &self, + path: &str, + upload_id: &str, + parts: &[CompleteMultipartUploadRequestPart], + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/{}?uploadId={}", + self.endpoint, + percent_encode_path(&p), + percent_encode_path(upload_id) + ); + + let req = Request::post(&url); + + let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { + part: parts.to_vec(), + }) + .map_err(new_xml_deserialize_error)?; + // Make sure content length has been set to avoid post with chunked encoding. + let req = req.header(CONTENT_LENGTH, content.len()); + // Set content-type to `application/xml` to avoid mixed with form post. + let req = req.header(CONTENT_TYPE, "application/xml"); + + let mut req = req + .body(AsyncBody::Bytes(Bytes::from(content))) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + /// Abort an on-going multipart upload. + pub async fn cos_abort_multipart_upload( + &self, + path: &str, + upload_id: &str, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/{}?uploadId={}", + self.endpoint, + percent_encode_path(&p), + percent_encode_path(upload_id) + ); + + let mut req = Request::delete(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + self.sign(&mut req).await?; + self.send(req).await + } +} + +/// Result of CreateMultipartUpload +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename_all = "PascalCase")] +pub struct InitiateMultipartUploadResult { + pub upload_id: String, +} + +/// Request of CompleteMultipartUploadRequest +#[derive(Default, Debug, Serialize)] +#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")] +pub struct CompleteMultipartUploadRequest { + pub part: Vec, +} + +#[derive(Clone, Default, Debug, Serialize)] +#[serde(default, rename_all = "PascalCase")] +pub struct CompleteMultipartUploadRequestPart { + #[serde(rename = "PartNumber")] + pub part_number: usize, + /// # TODO + /// + /// quick-xml will do escape on `"` which leads to our serialized output is + /// not the same as aws s3's example. + /// + /// Ideally, we could use `serialize_with` to address this (buf failed) + /// + /// ```ignore + /// #[derive(Default, Debug, Serialize)] + /// #[serde(default, rename_all = "PascalCase")] + /// struct CompleteMultipartUploadRequestPart { + /// #[serde(rename = "PartNumber")] + /// part_number: usize, + /// #[serde(rename = "ETag", serialize_with = "partial_escape")] + /// etag: String, + /// } + /// + /// fn partial_escape(s: &str, ser: S) -> std::result::Result + /// where + /// S: serde::Serializer, + /// { + /// ser.serialize_str(&String::from_utf8_lossy( + /// &quick_xml::escape::partial_escape(s.as_bytes()), + /// )) + /// } + /// ``` + /// + /// ref: + #[serde(rename = "ETag")] + pub etag: String, } diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs index 3ad689598670..676e123cf1a6 100644 --- a/core/src/services/cos/writer.rs +++ b/core/src/services/cos/writer.rs @@ -18,10 +18,11 @@ use std::sync::Arc; use async_trait::async_trait; +use bytes::Buf; use bytes::Bytes; use http::StatusCode; -use super::core::CosCore; +use super::core::*; use super::error::parse_error; use crate::raw::*; use crate::*; @@ -31,11 +32,26 @@ pub struct CosWriter { op: OpWrite, path: String, + upload_id: Option, + + parts: Vec, + buffer: oio::VectorCursor, + buffer_size: usize, } impl CosWriter { - pub fn new(core: Arc, op: OpWrite, path: String) -> Self { - CosWriter { core, op, path } + pub fn new(core: Arc, path: &str, op: OpWrite) -> Self { + let buffer_size = core.write_min_size; + CosWriter { + core, + path: path.to_string(), + op, + + upload_id: None, + parts: vec![], + buffer: oio::VectorCursor::new(), + buffer_size, + } } async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> { @@ -61,24 +77,190 @@ impl CosWriter { _ => Err(parse_error(resp).await?), } } + + async fn initiate_upload(&self) -> Result { + let resp = self + .core + .cos_initiate_multipart_upload( + &self.path, + self.op.content_type(), + self.op.content_disposition(), + self.op.cache_control(), + ) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + + let result: InitiateMultipartUploadResult = + quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; + + Ok(result.upload_id) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn write_part( + &self, + upload_id: &str, + bs: Bytes, + ) -> Result { + // COS requires part number must between [1..=10000] + let part_number = self.parts.len() + 1; + + let resp = self + .core + .cos_upload_part_request( + &self.path, + upload_id, + part_number, + Some(bs.len() as u64), + AsyncBody::Bytes(bs), + ) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let etag = parse_etag(resp.headers())? + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "ETag not present in returning response", + ) + })? + .to_string(); + + resp.into_body().consume().await?; + + Ok(CompleteMultipartUploadRequestPart { part_number, etag }) + } + _ => Err(parse_error(resp).await?), + } + } } #[async_trait] impl oio::Write for CosWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { - self.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) - .await + let upload_id = match &self.upload_id { + Some(upload_id) => upload_id, + None => { + if self.op.content_length().unwrap_or_default() == bs.len() as u64 { + return self + .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) + .await; + } else { + let upload_id = self.initiate_upload().await?; + self.upload_id = Some(upload_id); + self.upload_id.as_deref().unwrap() + } + } + }; + + // Ignore empty bytes + if bs.is_empty() { + return Ok(()); + } + + self.buffer.push(bs); + // Return directly if the buffer is not full + if self.buffer.len() <= self.buffer_size { + return Ok(()); + } + + let bs = self.buffer.peak_at_least(self.buffer_size); + let size = bs.len(); + + match self.write_part(upload_id, bs).await { + Ok(part) => { + self.buffer.take(size); + self.parts.push(part); + Ok(()) + } + Err(e) => { + // If the upload fails, we should pop the given bs to make sure + // write is re-enter safe. + self.buffer.pop(); + Err(e) + } + } } async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.write_oneshot(size, AsyncBody::Stream(s)).await + if self.op.content_length().unwrap_or_default() == size { + return self.write_oneshot(size, AsyncBody::Stream(s)).await; + } else { + return Err(Error::new( + ErrorKind::Unsupported, + "COS does not support streaming multipart upload", + )); + } } async fn abort(&mut self) -> Result<()> { - Ok(()) + let upload_id = if let Some(upload_id) = &self.upload_id { + upload_id + } else { + return Ok(()); + }; + + let resp = self + .core + .cos_abort_multipart_upload(&self.path, upload_id) + .await?; + match resp.status() { + // cos returns code 204 if abort succeeds. + // Reference: https://www.tencentcloud.com/document/product/436/7740 + StatusCode::NO_CONTENT => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } } async fn close(&mut self) -> Result<()> { - Ok(()) + let upload_id = if let Some(upload_id) = &self.upload_id { + upload_id + } else { + return Ok(()); + }; + + // Make sure internal buffer has been flushed. + if !self.buffer.is_empty() { + let bs = self.buffer.peak_exact(self.buffer.len()); + + match self.write_part(upload_id, bs).await { + Ok(part) => { + self.buffer.clear(); + self.parts.push(part); + } + Err(e) => { + return Err(e); + } + } + } + + let resp = self + .core + .cos_complete_multipart_upload(&self.path, upload_id, &self.parts) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + + Ok(()) + } + _ => Err(parse_error(resp).await?), + } } }