From 38b9d406c88170f349190caf1b01ad7eed4bb194 Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Tue, 26 Nov 2024 18:21:10 +0000 Subject: [PATCH] Unify errors Signed-off-by: Alessandro Passaro --- mountpoint-s3/src/fs/error.rs | 47 +++++------------ mountpoint-s3/src/upload.rs | 47 ++++++----------- mountpoint-s3/src/upload/atomic.rs | 21 +++----- mountpoint-s3/src/upload/incremental.rs | 68 ++++++++++++------------- 4 files changed, 68 insertions(+), 115 deletions(-) diff --git a/mountpoint-s3/src/fs/error.rs b/mountpoint-s3/src/fs/error.rs index b3b0fdd01..cdbb703a6 100644 --- a/mountpoint-s3/src/fs/error.rs +++ b/mountpoint-s3/src/fs/error.rs @@ -6,7 +6,7 @@ use tracing::Level; use crate::fs::error_metadata::ErrorMetadata; use crate::prefetch::PrefetchReadError; use crate::superblock::InodeError; -use crate::upload::{AppendUploadError, UploadWriteError}; +use crate::upload::UploadError; /// Generate an error that includes a conversion to a libc errno for use in replies to FUSE. /// @@ -108,8 +108,8 @@ impl From for Error { } } -impl From> for Error { - fn from(err: UploadWriteError) -> Self { +impl From> for Error { + fn from(err: UploadError) -> Self { let errno = err.to_errno(); Error { errno, @@ -117,21 +117,7 @@ impl From> for source: Some(anyhow::anyhow!(err)), // We are having WARN as the default level of logging for fuse errors level: Level::WARN, - metadata: Default::default(), // TODO (vlaad): must be cloned from UploadWriteError - } - } -} - -impl From> for Error { - fn from(err: AppendUploadError) -> Self { - let errno = err.to_errno(); - Error { - errno, - message: String::from("upload error"), - source: Some(anyhow::anyhow!(err)), - // We are having WARN as the default level of logging for fuse errors - level: Level::WARN, - metadata: Default::default(), // TODO (vlaad): must be cloned from AppendUploadError + metadata: Default::default(), // TODO (vlaad): must be cloned from UploadError } } } @@ -193,25 +179,16 @@ impl ToErrno for InodeError { } } -impl ToErrno for UploadWriteError { - fn to_errno(&self) -> libc::c_int { - match self { - UploadWriteError::PutRequestFailed(_) => libc::EIO, - UploadWriteError::OutOfOrderWrite { .. } => libc::EINVAL, - UploadWriteError::ObjectTooBig { .. } => libc::EFBIG, - } - } -} - -impl ToErrno for AppendUploadError { +impl ToErrno for UploadError { fn to_errno(&self) -> libc::c_int { match self { - AppendUploadError::PutRequestFailed(_) => libc::EIO, - AppendUploadError::UploadAlreadyTerminated => libc::EIO, - AppendUploadError::SseCorruptedError(_) => libc::EIO, - AppendUploadError::ChecksumComputationFailed(_) => libc::EIO, - AppendUploadError::HeadObjectFailed(_) => libc::EIO, - AppendUploadError::OutOfOrderWrite { .. } => libc::EINVAL, + UploadError::PutRequestFailed(_) => libc::EIO, + UploadError::UploadAlreadyTerminated => libc::EIO, + UploadError::SseCorruptedError(_) => libc::EIO, + UploadError::ChecksumComputationFailed(_) => libc::EIO, + UploadError::HeadObjectFailed(_) => libc::EIO, + UploadError::OutOfOrderWrite { .. } => libc::EINVAL, + UploadError::ObjectTooBig { .. } => libc::EFBIG, } } } diff --git a/mountpoint-s3/src/upload.rs b/mountpoint-s3/src/upload.rs index a70d3a640..2dad5d594 100644 --- a/mountpoint-s3/src/upload.rs +++ b/mountpoint-s3/src/upload.rs @@ -42,20 +42,24 @@ pub struct Uploader { } #[derive(Debug, Error)] -pub enum UploadPutError { - #[error("put request creation failed")] - ClientError(#[from] ObjectClientError), +pub enum UploadError { + #[error("out-of-order write is NOT supported by Mountpoint, aborting the upload; expected offset {expected_offset:?} but got {write_offset:?}")] + OutOfOrderWrite { write_offset: u64, expected_offset: u64 }, + + #[error("put request failed")] + PutRequestFailed(#[from] ObjectClientError), + + #[error("upload was already terminated because of previous failures")] + UploadAlreadyTerminated, + #[error("SSE settings corrupted")] SseCorruptedError(#[from] SseCorruptedError), -} -#[derive(Debug, Error, Clone)] -pub enum UploadWriteError { - #[error("put request failed")] - PutRequestFailed(#[from] E), + #[error("error computing checksums")] + ChecksumComputationFailed(#[from] ChecksumHasherError), - #[error("out-of-order write is NOT supported by Mountpoint, aborting the upload; expected offset {expected_offset:?} but got {write_offset:?}")] - OutOfOrderWrite { write_offset: u64, expected_offset: u64 }, + #[error("head object request failed")] + HeadObjectFailed(#[from] ObjectClientError), #[error("object exceeded maximum upload size of {maximum_size} bytes")] ObjectTooBig { maximum_size: usize }, @@ -91,7 +95,7 @@ where &self, bucket: &str, key: &str, - ) -> Result, UploadPutError> { + ) -> Result, UploadError> { UploadRequest::new(self, bucket, key).await } @@ -134,27 +138,6 @@ where /// is already capped by a single PutObject request. const MAX_BYTES_IN_QUEUE: usize = 2 * 1024 * 1024 * 1024; -#[derive(Debug, Error)] -pub enum AppendUploadError { - #[error("out-of-order write is NOT supported by Mountpoint, aborting the upload; expected offset {expected_offset:?} but got {write_offset:?}")] - OutOfOrderWrite { write_offset: u64, expected_offset: u64 }, - - #[error("put request failed")] - PutRequestFailed(#[source] ObjectClientError), - - #[error("upload was already terminated because of previous failures")] - UploadAlreadyTerminated, - - #[error("SSE settings corrupted")] - SseCorruptedError(#[from] SseCorruptedError), - - #[error("error computing checksums")] - ChecksumComputationFailed(#[from] ChecksumHasherError), - - #[error("head object request failed")] - HeadObjectFailed(#[from] ObjectClientError), -} - struct BoxRuntime(Box); impl BoxRuntime { fn spawn_with_handle(&self, future: Fut) -> Result, SpawnError> diff --git a/mountpoint-s3/src/upload/atomic.rs b/mountpoint-s3/src/upload/atomic.rs index ff8bed40d..934d18805 100644 --- a/mountpoint-s3/src/upload/atomic.rs +++ b/mountpoint-s3/src/upload/atomic.rs @@ -2,7 +2,6 @@ use std::fmt::Debug; use mountpoint_s3_client::{ checksums::{crc32c_from_base64, Crc32c}, - error::{ObjectClientError, PutObjectError}, types::{ChecksumAlgorithm, PutObjectParams, PutObjectResult, PutObjectTrailingChecksums, UploadReview}, ObjectClient, PutObjectRequest, }; @@ -11,9 +10,7 @@ use tracing::error; use crate::{checksums::combine_checksums, ServerSideEncryption}; -use super::{UploadPutError, UploadWriteError, Uploader}; - -type PutRequestError = ObjectClientError::ClientError>; +use super::{UploadError, Uploader}; const MAX_S3_MULTIPART_UPLOAD_PARTS: usize = 10000; @@ -35,7 +32,7 @@ impl UploadRequest { uploader: &Uploader, bucket: &str, key: &str, - ) -> Result> { + ) -> Result> { let mut params = PutObjectParams::new(); match &uploader.default_checksum_algorithm { @@ -82,21 +79,17 @@ impl UploadRequest { self.next_request_offset } - pub async fn write( - &mut self, - offset: i64, - data: &[u8], - ) -> Result>> { + pub async fn write(&mut self, offset: i64, data: &[u8]) -> Result> { let next_offset = self.next_request_offset; if offset != next_offset as i64 { - return Err(UploadWriteError::OutOfOrderWrite { + return Err(UploadError::OutOfOrderWrite { write_offset: offset as u64, expected_offset: next_offset, }); } if let Some(maximum_size) = self.maximum_upload_size { if next_offset + data.len() as u64 > maximum_size as u64 { - return Err(UploadWriteError::ObjectTooBig { maximum_size }); + return Err(UploadError::ObjectTooBig { maximum_size }); } } @@ -106,7 +99,7 @@ impl UploadRequest { Ok(data.len()) } - pub async fn complete(self) -> Result> { + pub async fn complete(self) -> Result> { let size = self.size(); let checksum = self.hasher.finalize(); let result = self @@ -394,7 +387,7 @@ mod tests { .expect_err("sse checksum must be checked"); assert!(matches!( err, - UploadPutError::SseCorruptedError(SseCorruptedError::ChecksumMismatch(_, _)) + UploadError::SseCorruptedError(SseCorruptedError::ChecksumMismatch(_, _)) )); } diff --git a/mountpoint-s3/src/upload/incremental.rs b/mountpoint-s3/src/upload/incremental.rs index 240cb91bf..0587db3cf 100644 --- a/mountpoint-s3/src/upload/incremental.rs +++ b/mountpoint-s3/src/upload/incremental.rs @@ -18,7 +18,7 @@ use crate::sync::Arc; use crate::ServerSideEncryption; use super::hasher::ChecksumHasher; -use super::{AppendUploadError, BoxRuntime, ChecksumHasherError}; +use super::{BoxRuntime, ChecksumHasherError, UploadError}; /// Handle for appending data to an S3 object. /// @@ -61,12 +61,12 @@ where /// but will be queued to upload until the buffer is full and all previous buffers have /// been uploaded. /// On success, returns the number of bytes written. - pub async fn write(&mut self, offset: u64, data: &[u8]) -> Result> { + pub async fn write(&mut self, offset: u64, data: &[u8]) -> Result> { // Bail out if a previous request failed self.upload_queue.verify().await?; if offset != self.offset { - return Err(AppendUploadError::OutOfOrderWrite { + return Err(UploadError::OutOfOrderWrite { write_offset: offset, expected_offset: self.offset, }); @@ -96,7 +96,7 @@ where /// Complete the upload and return the last `PutObjectResult` if any PUT requests are submitted. /// The pipeline cannot be used after this. - pub async fn complete(mut self) -> Result, AppendUploadError> { + pub async fn complete(mut self) -> Result, UploadError> { if let Some(buffer) = self.buffer.take() { self.upload_queue.push(buffer).await?; } else if self.offset == 0 { @@ -135,7 +135,7 @@ struct AppendUploadQueue { /// Channel handle for sending buffers to be appended to the object. request_sender: Sender>, /// Channel handle for receiving the result of S3 requests via [Output] messages. - output_receiver: Receiver>>, + output_receiver: Receiver>>, mem_limiter: Arc>, _task_handle: RemoteHandle<()>, /// Algorithm used to compute checksums. Initialized asynchronously in [get_buffer]. @@ -187,9 +187,9 @@ where /// Returns `true` if output was sent successfully. /// When the output cannot be sent, buffer receiver will be shut down. async fn send_output( - sender: &Sender>>, + sender: &Sender>>, receiver: &Receiver>, - output: Result>, + output: Result>, ) -> bool { let error = output.is_err(); if error { @@ -230,7 +230,7 @@ where trace!(?head_object, "received head_object response"); if Some(head_object.etag) != etag { // Fail early if the etag has changed. - Err(AppendUploadError::PutRequestFailed(ObjectClientError::ServiceError( + Err(UploadError::PutRequestFailed(ObjectClientError::ServiceError( PutObjectError::PreconditionFailed, ))) } else { @@ -278,33 +278,33 @@ where } // Push given bytes with its checksum to the upload queue - pub async fn push(&mut self, buffer: UploadBuffer) -> Result<(), AppendUploadError> { + pub async fn push(&mut self, buffer: UploadBuffer) -> Result<(), UploadError> { if let Err(_send_error) = self.request_sender.send(buffer).await { // The upload queue could be closed if there was a client error from previous requests trace!("upload queue is already closed"); while self.consume_next_output().await? {} - return Err(AppendUploadError::UploadAlreadyTerminated); + return Err(UploadError::UploadAlreadyTerminated); } self.requests_in_queue += 1; Ok(()) } - pub async fn verify(&mut self) -> Result<(), AppendUploadError> { + pub async fn verify(&mut self) -> Result<(), UploadError> { if self.request_sender.is_closed() { // The upload queue could be closed if there was a client error from previous requests trace!("upload queue is already closed"); while self.consume_next_output().await? {} - return Err(AppendUploadError::UploadAlreadyTerminated); + return Err(UploadError::UploadAlreadyTerminated); } Ok(()) } // Close the upload queue, wait for all uploads in the queue to complete, and get the last `PutObjectResult` - pub async fn join(mut self) -> Result, AppendUploadError> { + pub async fn join(mut self) -> Result, UploadError> { let terminated = !self.request_sender.close(); while self.consume_next_output().await? {} if terminated { - return Err(AppendUploadError::UploadAlreadyTerminated); + return Err(UploadError::UploadAlreadyTerminated); } Ok(self.last_known_result.take()) } @@ -312,14 +312,14 @@ where pub async fn get_buffer( &mut self, capacity: usize, - ) -> Result, AppendUploadError> { + ) -> Result, UploadError> { let Some(checksum_algorithm) = self.checksum_algorithm.clone() else { trace!("wait for initial output"); match self .output_receiver .recv() .await - .unwrap_or(Err(AppendUploadError::UploadAlreadyTerminated))? + .unwrap_or(Err(UploadError::UploadAlreadyTerminated))? { Output::ChecksumAlgorithm(algorithm) => { trace!(?algorithm, "selected checksum algorithm"); @@ -337,7 +337,7 @@ where // wait for requests in the queue to complete before trying to reserve memory again trace!("wait for the next request to be processed"); if !self.consume_next_output().await? { - return Err(AppendUploadError::UploadAlreadyTerminated); + return Err(UploadError::UploadAlreadyTerminated); } } } @@ -353,7 +353,7 @@ where /// Wait on output, updating the state of the [AppendUploadQueue] when next output arrives. /// /// Returns `true` when next output is successfully consumed, or `false` when no more output is available. - async fn consume_next_output(&mut self) -> Result> { + async fn consume_next_output(&mut self) -> Result> { let Ok(output) = self.output_receiver.recv().await else { return Ok(false); }; @@ -450,7 +450,7 @@ async fn append( offset: u64, etag: Option, server_side_encryption: ServerSideEncryption, -) -> Result> { +) -> Result> { let (data, checksum) = buffer.freeze()?; let mut request_params = if offset == 0 { PutObjectSingleParams::new() @@ -459,14 +459,14 @@ async fn append( }; let (sse_type, key_id) = server_side_encryption .into_inner() - .map_err(AppendUploadError::SseCorruptedError)?; + .map_err(UploadError::SseCorruptedError)?; request_params.checksum = checksum; request_params.server_side_encryption = sse_type; request_params.ssekms_key_id = key_id; client .put_object_single(bucket, key, &request_params, data) .await - .map_err(AppendUploadError::PutRequestFailed) + .map_err(UploadError::PutRequestFailed) } #[cfg(test)] @@ -797,7 +797,7 @@ mod tests { // Verify that the request fails at completion assert!(matches!( upload_request.complete().await, - Err(AppendUploadError::PutRequestFailed(_)) + Err(UploadError::PutRequestFailed(_)) )); } @@ -845,7 +845,7 @@ mod tests { // Verify that the request fails and the error is surfaced let result = upload_request.complete().await; - assert!(matches!(result, Err(AppendUploadError::PutRequestFailed(_)))); + assert!(matches!(result, Err(UploadError::PutRequestFailed(_)))); // Verify that object is partially appended from the first request let get_request = client @@ -889,7 +889,7 @@ mod tests { write_success_count += 1; } Err(e) => { - assert!(matches!(e, AppendUploadError::PutRequestFailed(_))); + assert!(matches!(e, UploadError::PutRequestFailed(_))); break; } } @@ -902,11 +902,11 @@ mod tests { // Verify that the pipeline cannot be used after failure assert!(matches!( upload_request.write(offset, b"some data").await, - Err(AppendUploadError::UploadAlreadyTerminated) + Err(UploadError::UploadAlreadyTerminated) )); assert!(matches!( upload_request.complete().await, - Err(AppendUploadError::UploadAlreadyTerminated) + Err(UploadError::UploadAlreadyTerminated) )); } @@ -957,7 +957,7 @@ mod tests { write_success_count += 1; } Err(e) => { - assert!(matches!(e, AppendUploadError::PutRequestFailed(_))); + assert!(matches!(e, UploadError::PutRequestFailed(_))); break; } } @@ -970,11 +970,11 @@ mod tests { // Verify that the pipeline cannot be used after failure assert!(matches!( upload_request.write(offset, b"some data").await, - Err(AppendUploadError::UploadAlreadyTerminated) + Err(UploadError::UploadAlreadyTerminated) )); assert!(matches!( upload_request.complete().await, - Err(AppendUploadError::UploadAlreadyTerminated) + Err(UploadError::UploadAlreadyTerminated) )); // Verify that object is partially appended from the first request @@ -1025,7 +1025,7 @@ mod tests { Err(e) => { assert!(matches!( e, - AppendUploadError::PutRequestFailed(ObjectClientError::ServiceError( + UploadError::PutRequestFailed(ObjectClientError::ServiceError( PutObjectError::PreconditionFailed )) )); @@ -1041,11 +1041,11 @@ mod tests { // Verify that the pipeline cannot be used after failure assert!(matches!( upload_request.write(offset, b"some data").await, - Err(AppendUploadError::UploadAlreadyTerminated) + Err(UploadError::UploadAlreadyTerminated) )); assert!(matches!( upload_request.complete().await, - Err(AppendUploadError::UploadAlreadyTerminated) + Err(UploadError::UploadAlreadyTerminated) )); } @@ -1078,7 +1078,7 @@ mod tests { .await .expect_err("out-of-order write should fail"); - assert!(matches!(error, AppendUploadError::OutOfOrderWrite { .. })); + assert!(matches!(error, UploadError::OutOfOrderWrite { .. })); } #[test_case(Some("aws:kmr"), Some("some_key_alias"))] @@ -1123,7 +1123,7 @@ mod tests { // Verify that the request fails at completion assert!(matches!( upload_request.complete().await, - Err(AppendUploadError::SseCorruptedError(_)) + Err(UploadError::SseCorruptedError(_)) )); }