Skip to content

Commit

Permalink
Unify errors
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro committed Nov 27, 2024
1 parent 39204e9 commit 38b9d40
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 115 deletions.
47 changes: 12 additions & 35 deletions mountpoint-s3/src/fs/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tracing::Level;
use crate::fs::error_metadata::ErrorMetadata;
use crate::prefetch::PrefetchReadError;
use crate::superblock::InodeError;
use crate::upload::{AppendUploadError, UploadWriteError};
use crate::upload::UploadError;

/// Generate an error that includes a conversion to a libc errno for use in replies to FUSE.
///
Expand Down Expand Up @@ -108,30 +108,16 @@ impl From<InodeError> for Error {
}
}

impl<E: std::error::Error + Send + Sync + 'static> From<UploadWriteError<E>> for Error {
fn from(err: UploadWriteError<E>) -> Self {
impl<E: std::error::Error + Send + Sync + 'static> From<UploadError<E>> for Error {
fn from(err: UploadError<E>) -> Self {
let errno = err.to_errno();
Error {
errno,
message: String::from("upload error"),
source: Some(anyhow::anyhow!(err)),
// We are having WARN as the default level of logging for fuse errors
level: Level::WARN,
metadata: Default::default(), // TODO (vlaad): must be cloned from UploadWriteError
}
}
}

impl<E: std::error::Error + Send + Sync + 'static> From<AppendUploadError<E>> for Error {
fn from(err: AppendUploadError<E>) -> Self {
let errno = err.to_errno();
Error {
errno,
message: String::from("upload error"),
source: Some(anyhow::anyhow!(err)),
// We are having WARN as the default level of logging for fuse errors
level: Level::WARN,
metadata: Default::default(), // TODO (vlaad): must be cloned from AppendUploadError
metadata: Default::default(), // TODO (vlaad): must be cloned from UploadError
}
}
}
Expand Down Expand Up @@ -193,25 +179,16 @@ impl ToErrno for InodeError {
}
}

impl<E: std::error::Error> ToErrno for UploadWriteError<E> {
fn to_errno(&self) -> libc::c_int {
match self {
UploadWriteError::PutRequestFailed(_) => libc::EIO,
UploadWriteError::OutOfOrderWrite { .. } => libc::EINVAL,
UploadWriteError::ObjectTooBig { .. } => libc::EFBIG,
}
}
}

impl<E: std::error::Error> ToErrno for AppendUploadError<E> {
impl<E: std::error::Error> ToErrno for UploadError<E> {
fn to_errno(&self) -> libc::c_int {
match self {
AppendUploadError::PutRequestFailed(_) => libc::EIO,
AppendUploadError::UploadAlreadyTerminated => libc::EIO,
AppendUploadError::SseCorruptedError(_) => libc::EIO,
AppendUploadError::ChecksumComputationFailed(_) => libc::EIO,
AppendUploadError::HeadObjectFailed(_) => libc::EIO,
AppendUploadError::OutOfOrderWrite { .. } => libc::EINVAL,
UploadError::PutRequestFailed(_) => libc::EIO,
UploadError::UploadAlreadyTerminated => libc::EIO,
UploadError::SseCorruptedError(_) => libc::EIO,
UploadError::ChecksumComputationFailed(_) => libc::EIO,
UploadError::HeadObjectFailed(_) => libc::EIO,
UploadError::OutOfOrderWrite { .. } => libc::EINVAL,
UploadError::ObjectTooBig { .. } => libc::EFBIG,
}
}
}
Expand Down
47 changes: 15 additions & 32 deletions mountpoint-s3/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,24 @@ pub struct Uploader<Client: ObjectClient> {
}

#[derive(Debug, Error)]
pub enum UploadPutError<S, C> {
#[error("put request creation failed")]
ClientError(#[from] ObjectClientError<S, C>),
pub enum UploadError<E> {
#[error("out-of-order write is NOT supported by Mountpoint, aborting the upload; expected offset {expected_offset:?} but got {write_offset:?}")]
OutOfOrderWrite { write_offset: u64, expected_offset: u64 },

#[error("put request failed")]
PutRequestFailed(#[from] ObjectClientError<PutObjectError, E>),

#[error("upload was already terminated because of previous failures")]
UploadAlreadyTerminated,

#[error("SSE settings corrupted")]
SseCorruptedError(#[from] SseCorruptedError),
}

#[derive(Debug, Error, Clone)]
pub enum UploadWriteError<E: std::error::Error> {
#[error("put request failed")]
PutRequestFailed(#[from] E),
#[error("error computing checksums")]
ChecksumComputationFailed(#[from] ChecksumHasherError),

#[error("out-of-order write is NOT supported by Mountpoint, aborting the upload; expected offset {expected_offset:?} but got {write_offset:?}")]
OutOfOrderWrite { write_offset: u64, expected_offset: u64 },
#[error("head object request failed")]
HeadObjectFailed(#[from] ObjectClientError<HeadObjectError, E>),

#[error("object exceeded maximum upload size of {maximum_size} bytes")]
ObjectTooBig { maximum_size: usize },
Expand Down Expand Up @@ -91,7 +95,7 @@ where
&self,
bucket: &str,
key: &str,
) -> Result<UploadRequest<Client>, UploadPutError<PutObjectError, Client::ClientError>> {
) -> Result<UploadRequest<Client>, UploadError<Client::ClientError>> {
UploadRequest::new(self, bucket, key).await
}

Expand Down Expand Up @@ -134,27 +138,6 @@ where
/// is already capped by a single PutObject request.
const MAX_BYTES_IN_QUEUE: usize = 2 * 1024 * 1024 * 1024;

#[derive(Debug, Error)]
pub enum AppendUploadError<E> {
#[error("out-of-order write is NOT supported by Mountpoint, aborting the upload; expected offset {expected_offset:?} but got {write_offset:?}")]
OutOfOrderWrite { write_offset: u64, expected_offset: u64 },

#[error("put request failed")]
PutRequestFailed(#[source] ObjectClientError<PutObjectError, E>),

#[error("upload was already terminated because of previous failures")]
UploadAlreadyTerminated,

#[error("SSE settings corrupted")]
SseCorruptedError(#[from] SseCorruptedError),

#[error("error computing checksums")]
ChecksumComputationFailed(#[from] ChecksumHasherError),

#[error("head object request failed")]
HeadObjectFailed(#[from] ObjectClientError<HeadObjectError, E>),
}

struct BoxRuntime(Box<dyn Spawn + Send + Sync>);
impl BoxRuntime {
fn spawn_with_handle<Fut>(&self, future: Fut) -> Result<RemoteHandle<Fut::Output>, SpawnError>
Expand Down
21 changes: 7 additions & 14 deletions mountpoint-s3/src/upload/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::fmt::Debug;

use mountpoint_s3_client::{
checksums::{crc32c_from_base64, Crc32c},
error::{ObjectClientError, PutObjectError},
types::{ChecksumAlgorithm, PutObjectParams, PutObjectResult, PutObjectTrailingChecksums, UploadReview},
ObjectClient, PutObjectRequest,
};
Expand All @@ -11,9 +10,7 @@ use tracing::error;

use crate::{checksums::combine_checksums, ServerSideEncryption};

use super::{UploadPutError, UploadWriteError, Uploader};

type PutRequestError<Client> = ObjectClientError<PutObjectError, <Client as ObjectClient>::ClientError>;
use super::{UploadError, Uploader};

const MAX_S3_MULTIPART_UPLOAD_PARTS: usize = 10000;

Expand All @@ -35,7 +32,7 @@ impl<Client: ObjectClient> UploadRequest<Client> {
uploader: &Uploader<Client>,
bucket: &str,
key: &str,
) -> Result<Self, UploadPutError<PutObjectError, Client::ClientError>> {
) -> Result<Self, UploadError<Client::ClientError>> {
let mut params = PutObjectParams::new();

match &uploader.default_checksum_algorithm {
Expand Down Expand Up @@ -82,21 +79,17 @@ impl<Client: ObjectClient> UploadRequest<Client> {
self.next_request_offset
}

pub async fn write(
&mut self,
offset: i64,
data: &[u8],
) -> Result<usize, UploadWriteError<PutRequestError<Client>>> {
pub async fn write(&mut self, offset: i64, data: &[u8]) -> Result<usize, UploadError<Client::ClientError>> {
let next_offset = self.next_request_offset;
if offset != next_offset as i64 {
return Err(UploadWriteError::OutOfOrderWrite {
return Err(UploadError::OutOfOrderWrite {
write_offset: offset as u64,
expected_offset: next_offset,
});
}
if let Some(maximum_size) = self.maximum_upload_size {
if next_offset + data.len() as u64 > maximum_size as u64 {
return Err(UploadWriteError::ObjectTooBig { maximum_size });
return Err(UploadError::ObjectTooBig { maximum_size });
}
}

Expand All @@ -106,7 +99,7 @@ impl<Client: ObjectClient> UploadRequest<Client> {
Ok(data.len())
}

pub async fn complete(self) -> Result<PutObjectResult, PutRequestError<Client>> {
pub async fn complete(self) -> Result<PutObjectResult, UploadError<Client::ClientError>> {
let size = self.size();
let checksum = self.hasher.finalize();
let result = self
Expand Down Expand Up @@ -394,7 +387,7 @@ mod tests {
.expect_err("sse checksum must be checked");
assert!(matches!(
err,
UploadPutError::SseCorruptedError(SseCorruptedError::ChecksumMismatch(_, _))
UploadError::SseCorruptedError(SseCorruptedError::ChecksumMismatch(_, _))
));
}

Expand Down
Loading

0 comments on commit 38b9d40

Please sign in to comment.