Skip to content

Commit

Permalink
feat(writer): configurable buffer size of unsized write (apache#2143)
Browse files Browse the repository at this point in the history
* config buffer size for gcs

* config buffer size for oss s3

* refactor

* refactor buffer size limitation

* resolve comments

* minor

* minor

* resolve comments

* rebase main and resolve conflict

* minor

* typo
  • Loading branch information
wcy-fdu authored and suyanhanx committed May 8, 2023
1 parent 5853df5 commit 4c05f51
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 31 deletions.
26 changes: 25 additions & 1 deletion core/src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ use crate::*;

const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com";
const DEFAULT_GCS_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.read_write";

/// It's recommended that you use at least 8 MiB for the chunk size.
const DEFAULT_WRITE_FIXED_SIZE: usize = 8 * 1024 * 1024;
/// Google Cloud Storage service.
///
/// # Capabilities
Expand Down Expand Up @@ -120,6 +121,9 @@ pub struct GcsBuilder {
customed_token_loader: Option<Box<dyn GoogleTokenLoad>>,
predefined_acl: Option<String>,
default_storage_class: Option<String>,

/// the fixed size writer uses to flush into underlying storage.
write_fixed_size: Option<usize>,
}

impl GcsBuilder {
Expand Down Expand Up @@ -237,6 +241,16 @@ impl GcsBuilder {
};
self
}

/// The buffer size should be a multiple of 256 KiB (256 x 1024 bytes), unless it's the last chunk that completes the upload.
/// Larger chunk sizes typically make uploads faster, but note that there's a tradeoff between speed and memory usage.
/// It's recommended that you use at least 8 MiB for the chunk size.
/// Reference: [Perform resumable uploads](https://cloud.google.com/storage/docs/performing-resumable-uploads)
pub fn write_fixed_size(&mut self, fixed_buffer_size: usize) -> &mut Self {
self.write_fixed_size = Some(fixed_buffer_size);

self
}
}

impl Debug for GcsBuilder {
Expand Down Expand Up @@ -336,6 +350,15 @@ impl Builder for GcsBuilder {

let signer = GoogleSigner::new("storage");

let write_fixed_size = self.write_fixed_size.unwrap_or(DEFAULT_WRITE_FIXED_SIZE);
if write_fixed_size.checked_rem_euclid(256 * 1024).is_some() {
return Err(Error::new(
ErrorKind::ConfigInvalid,
"The write fixed buffer size is misconfigured",
)
.with_context("service", Scheme::Gcs));
}

let backend = GcsBackend {
core: Arc::new(GcsCore {
endpoint,
Expand All @@ -347,6 +370,7 @@ impl Builder for GcsBuilder {
credential_loader: cred_loader,
predefined_acl: self.predefined_acl.clone(),
default_storage_class: self.default_storage_class.clone(),
write_fixed_size,
}),
};

Expand Down
2 changes: 2 additions & 0 deletions core/src/services/gcs/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct GcsCore {

pub predefined_acl: Option<String>,
pub default_storage_class: Option<String>,

pub write_fixed_size: usize,
}

impl Debug for GcsCore {
Expand Down
23 changes: 7 additions & 16 deletions core/src/services/gcs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ pub struct GcsWriter {
location: Option<String>,
written: u64,
buffer: oio::VectorCursor,
buffer_size: usize,
write_fixed_size: usize,
}

impl GcsWriter {
pub fn new(core: Arc<GcsCore>, path: &str, op: OpWrite) -> Self {
let write_fixed_size = core.write_fixed_size;
GcsWriter {
core,
path: path.to_string(),
Expand All @@ -48,17 +49,7 @@ impl GcsWriter {
location: None,
written: 0,
buffer: oio::VectorCursor::new(),
// The chunk size should be a multiple of 256 KiB
// (256 x 1024 bytes), unless it's the last chunk
// that completes the upload.
//
// Larger chunk sizes typically make uploads faster,
// but note that there's a tradeoff between speed and
// memory usage. It's recommended that you use at least
// 8 MiB for the chunk size.
//
// TODO: allow this value to be configured.
buffer_size: 8 * 1024 * 1024,
write_fixed_size,
}
}

Expand Down Expand Up @@ -151,16 +142,16 @@ impl oio::Write for GcsWriter {

self.buffer.push(bs);
// Return directly if the buffer is not full
if self.buffer.len() <= self.buffer_size {
if self.buffer.len() <= self.write_fixed_size {
return Ok(());
}

let bs = self.buffer.peak_exact(self.buffer_size);
let bs = self.buffer.peak_exact(self.write_fixed_size);

match self.write_part(location, bs).await {
Ok(_) => {
self.buffer.take(self.buffer_size);
self.written += self.buffer_size as u64;
self.buffer.take(self.write_fixed_size);
self.written += self.write_fixed_size as u64;
Ok(())
}
Err(e) => {
Expand Down
22 changes: 22 additions & 0 deletions core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::ops::*;
use crate::raw::*;
use crate::*;

const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
/// Aliyun Object Storage Service (OSS) support
///
/// # Capabilities
Expand Down Expand Up @@ -123,6 +124,8 @@ pub struct OssBuilder {
access_key_secret: Option<String>,

http_client: Option<HttpClient>,
/// the size of each part, and the range is 5MB ~ 5 GB.
write_min_size: Option<usize>,
}

impl Debug for OssBuilder {
Expand Down Expand Up @@ -292,6 +295,14 @@ impl OssBuilder {
}
self
}

/// set the minimum size of unsized write, it should be greater than 5 MB.
/// Reference: [OSS Multipart upload](https://www.alibabacloud.com/help/en/object-storage-service/latest/multipart-upload-6)
pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self {
self.write_min_size = Some(write_min_size);

self
}
}

impl Builder for OssBuilder {
Expand All @@ -313,6 +324,8 @@ impl Builder for OssBuilder {
.map(|v| builder.server_side_encryption(v));
map.get("server_side_encryption_key_id")
.map(|v| builder.server_side_encryption_key_id(v));
map.get("write_min_size")
.map(|v| builder.write_min_size(v.parse::<usize>().unwrap()));
builder
}

Expand Down Expand Up @@ -384,6 +397,14 @@ impl Builder for OssBuilder {

let signer = AliyunOssSigner::new(bucket);

let write_min_size = self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE);
if write_min_size < 5 * 1024 * 1024 {
return Err(Error::new(
ErrorKind::ConfigInvalid,
"The write minimum buffer size is misconfigured",
)
.with_context("service", Scheme::Oss));
}
debug!("Backend build finished");

Ok(OssBackend {
Expand All @@ -398,6 +419,7 @@ impl Builder for OssBuilder {
client,
server_side_encryption,
server_side_encryption_key_id,
write_min_size,
}),
})
}
Expand Down
1 change: 1 addition & 0 deletions core/src/services/oss/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub struct OssCore {
pub client: HttpClient,
pub loader: AliyunLoader,
pub signer: AliyunOssSigner,
pub write_min_size: usize,
}

impl Debug for OssCore {
Expand Down
9 changes: 2 additions & 7 deletions core/src/services/oss/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct OssWriter {

impl OssWriter {
pub fn new(core: Arc<OssCore>, path: &str, op: OpWrite) -> Self {
let buffer_size = core.write_min_size;
OssWriter {
core,
path: path.to_string(),
Expand All @@ -50,13 +51,7 @@ impl OssWriter {
upload_id: None,
parts: vec![],
buffer: oio::VectorCursor::new(),
// The part size must be 5 MiB to 5 GiB. There is no minimum
// size limit on the last part of your multipart upload.
//
// We pick the default value as 8 MiB for better thoughput.
//
// TODO: allow this value to be configured.
buffer_size: 8 * 1024 * 1024,
buffer_size,
}
}

Expand Down
22 changes: 22 additions & 0 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ static ENDPOINT_TEMPLATES: Lazy<HashMap<&'static str, &'static str>> = Lazy::new
m
});

const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
/// Aws S3 and compatible services (including minio, digitalocean space, Tencent Cloud Object Storage(COS) and so on) support.
/// For more information about s3-compatible services, refer to [Compatible Services](#compatible-services).
///
Expand Down Expand Up @@ -305,6 +306,10 @@ pub struct S3Builder {

http_client: Option<HttpClient>,
customed_credential_load: Option<Box<dyn AwsCredentialLoad>>,

/// the part size of s3 multipart upload, which should be 5 MiB to 5 GiB.
/// There is no minimum size limit on the last part of your multipart upload
write_min_size: Option<usize>,
}

impl Debug for S3Builder {
Expand Down Expand Up @@ -705,6 +710,14 @@ impl S3Builder {

endpoint
}

/// set the minimum size of unsized write, it should be greater than 5 MB.
/// Reference: [Amazon S3 multipart upload limits](https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html)
pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self {
self.write_min_size = Some(write_min_size);

self
}
}

impl Builder for S3Builder {
Expand Down Expand Up @@ -872,6 +885,14 @@ impl Builder for S3Builder {
}

let signer = AwsV4Signer::new("s3", &region);
let write_min_size = self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE);
if write_min_size < 5 * 1024 * 1024 {
return Err(Error::new(
ErrorKind::ConfigInvalid,
"The write minimum buffer size is misconfigured",
)
.with_context("service", Scheme::S3));
}

debug!("backend build finished");
Ok(S3Backend {
Expand All @@ -888,6 +909,7 @@ impl Builder for S3Builder {
signer,
loader,
client,
write_min_size,
}),
})
}
Expand Down
1 change: 1 addition & 0 deletions core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub struct S3Core {
pub signer: AwsV4Signer,
pub loader: AwsLoader,
pub client: HttpClient,
pub write_min_size: usize,
}

impl Debug for S3Core {
Expand Down
9 changes: 2 additions & 7 deletions core/src/services/s3/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct S3Writer {

impl S3Writer {
pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> Self {
let buffer_size = core.write_min_size;
S3Writer {
core,
path: path.to_string(),
Expand All @@ -50,13 +51,7 @@ impl S3Writer {
upload_id: None,
parts: vec![],
buffer: oio::VectorCursor::new(),
// The part size must be 5 MiB to 5 GiB. There is no minimum
// size limit on the last part of your multipart upload.
//
// We pick the default value as 8 MiB for better throughput.
//
// TODO: allow this value to be configured.
buffer_size: 8 * 1024 * 1024,
buffer_size,
}
}

Expand Down

0 comments on commit 4c05f51

Please sign in to comment.