-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: multipart copy #319
feat: multipart copy #319
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great. The biggest thing missing I think is an example in the published docs showing how to use it. Other than that, the other suggestions are mostly cosmetic.
src/s3/multipartcopy.rs
Outdated
/// | ||
/// The size of the source object in bytes. | ||
#[instrument(skip(client))] | ||
pub async fn get_source_size( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: We don't expose this in the public API, so we can remove the pub
here 👍
pub async fn get_source_size( | |
async fn get_source_size( |
src/s3/multipartcopy.rs
Outdated
|
||
let parts: Vec<_> = (1..=part_count) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion 1: We can avoid the .collect()
into a Vec
here, since stream::iter
will accept the iterator itself.
suggestion 2: Seeing a 1-based iterator is a bit unexpected, so a quick comment would help the reader know that it's intentional, and the reason behind it.
let parts: Vec<_> = (1..=part_count) | |
// Use 1-based indexing to match expectations of `CopyUploadPart`. | |
let parts = (1..=part_count) |
src/s3/multipartcopy.rs
Outdated
}) | ||
.collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: See above.
}) | |
.collect(); | |
}); |
src/s3/multipartcopy.rs
Outdated
let completed_parts: Vec<CompletedPart> = stream::iter(parts) | ||
.buffer_unordered(self.max_concurrent_uploads) | ||
.try_collect::<Vec<CompletedPart>>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: (personal preference). We can avoid the explicit types here, for less busy code.
let completed_parts: Vec<CompletedPart> = stream::iter(parts) | |
.buffer_unordered(self.max_concurrent_uploads) | |
.try_collect::<Vec<CompletedPart>>() | |
let completed_parts = stream::iter(parts) | |
.buffer_unordered(self.max_concurrent_uploads) | |
.try_collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer the explicit types to help reduce the cognitive load on the reader of the code. However, we can remove one of the types and still achieve clarity:
let completed_parts: Vec<CompletedPart> = stream::iter(parts)
.buffer_unordered(self.max_concurrent_uploads)
.try_collect()
src/s3/multipartcopy.rs
Outdated
let source_size = | ||
get_source_size(&self.client, self.source.bucket(), self.source.key()).await?; | ||
let part_count = | ||
((f64::value_from(source_size)? / f64::value_from(self.part_size)?).ceil()).approx()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: This piece of code is begging for an edge case bug. Would be good to have explicit tests for either side of the edges to make 100% sure it does the right thing. Either as additional test cases of the full function, or factored out into a helper method with individual tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored this code and exercised this with property tests and type checked values.
byte_range: (i64, i64), | ||
} | ||
|
||
/// A struct to handle S3 multipart copy operations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Could we add some more docs on how to use this feature? It looks like we get some docs for free from TypedBuilder
, but they're not great.

In particular, there's no documentation of the individual fields, their types, or how they should be used in the builder. I guess we could work around this by making the fields pub
and adding docstrings, or else perhaps an example, like below (but formatted correctly 😅).
In particular, it would be helpful to have some guidance on the part_size
and max_concurrent_uploads
fields, and when/why the user might want to set them.
/// A struct to handle S3 multipart copy operations. | |
/// A struct to handle S3 multipart copy operations. | |
/// | |
/// let dest = S3Object::new(test_bucket, dst_key); | |
/// let copyier = S3MultipartCopier::builder() | |
/// .client(client.clone()) | |
/// .source(src) | |
/// .destination(dest.clone()) | |
/// .part_size((5 * MIB).try_into()?) | |
/// .max_concurrent_uploads(2) | |
/// .build(); | |
/// copyier.send().await?; |
You can run the following command to generate the docs locally
RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --open --all-features
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, it's important to note the potential increase in costs due to the higher number of API requests made by multipart copy operations.
Would be useful to leave a comment for the user about this in this documentation block 👍
src/s3/multipartcopy.rs
Outdated
Ok(head_object.content_length().unwrap_or(0)) | ||
} | ||
|
||
const DEFAULT_COPY_PART_SIZE: i64 = 50 * MIB as i64; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question; Is the choice of 50MB here significant? Could we leave a small comment justifying this choice. If it's mostly arbitrary then we can just mention that 👍
Could you also update the |
src/s3/s3_object.rs
Outdated
|
||
pub fn bucket(&self) -> &str { | ||
&self.bucket | ||
} | ||
|
||
pub fn key(&self) -> &str { | ||
&self.key | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: What's the motivation for the additional methods here? My preference would be to keep the API as small as possible and have the user access the fields directly for this relatively simple struct.
pub fn bucket(&self) -> &str { | |
&self.bucket | |
} | |
pub fn key(&self) -> &str { | |
&self.key | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exposing values through getters rather than directly accessing public fields ensures better encapsulation. However, removing the pub
fields would break backward compatibility, so we must retain them.
fd04b81
to
dd4cd42
Compare
Review comments addressed. This includes adding property tests to explore edge cases. |
src/s3/multipartcopy.rs
Outdated
part_size = self.part_size.as_ref(), | ||
); | ||
|
||
if source_size.as_ref() <= self.part_size.as_ref() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
observation: There's a slight contraction here with line 488
. We use source <= part
here, but source < part
(but spelt backwards) at 488
🤔 I don't know if it makes much difference which one we change. If the block size is equal to the part size, then I guess an atomic copy will be more efficient? In that case we should change 488
to be source <= part
. I'd also flip line 488
so that the inequality is spelt in the same order as 466
, just to make it easier to see how these two lines relate to each other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, one non-blocking observation that we might want to clean up, but other than that this is good to go 👍 The second pass clean up with additional tests really took this code up a notch, great work 🚀
📢 What
What changes have been made within this PR?
This PR introduces a new
S3MultipartCopier
struct for handling multipart copy operations in S3. The changes include:S3MultipartCopier
struct to efficiently copy large files (greater than 5GB) using multipart copy.❓ Why
Why are we submitting this PR?
Currently, S3 copy operations only work for files up to 5GB. Multipart copy is not only necessary for larger files but is also much faster for files under 5GB due to parallel copies. However, multipart copy will make more API requests, potentially increasing costs. This PR addresses the need for efficient, scalable, and reliable copying of large objects in S3.
🚦 Depends on
Are there any other PRs that need to be merged first?
😟 Concerns
📝 Notes