Skip to content

Commit

Permalink
Refactor ObjectClient error types (part 1/2)
Browse files Browse the repository at this point in the history
Our current ObjectClient allows each implementer to provide its own
error types for each request. This is nice and flexible, but prevents
callers of an ObjectClient from being generic if they want to detect
common service errors like NoSuchKey -- they must know the concrete
error type of the particular client they're using to match on these
errors. We've been getting away with this until #69, where we need to be
able to distinguish (expected) 404 errors from other errors on
HeadObject.

This change refactors ObjectClient to provide a common service error
type for each operation. ObjectClients now return an error that is
*either* a modeled service error like NoSuchKey *or* a client-specific
error. This allows callers to be generic over the ObjectClient and still
discriminate on the interesting error types, where by "interesting" I
mean things I think it's likely a caller might want to know about.

The diff was getting pretty big so I'm splitting this into two commits.
This is Part 1, which just does the refactoring, but doesn't change our
S3CrtClient to return the new modeled service errors. That means this
change shouldn't cause any functional change -- every error will be a
client error, like it was before this commit.  I'll follow up with Part
2 that adds the service errors to S3CrtClient (so does XML parsing etc).

Signed-off-by: James Bornholt <[email protected]>
  • Loading branch information
jamesbornholt committed Feb 14, 2023
1 parent 3436138 commit 7c6e34d
Show file tree
Hide file tree
Showing 14 changed files with 386 additions and 273 deletions.
23 changes: 12 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 60 additions & 26 deletions s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ use async_trait::async_trait;
use futures::Stream;
use pin_project::pin_project;

use crate::object_client::{GetBodyPart, HeadObjectResult, PutObjectParams, PutObjectResult};
use crate::object_client::{
GetBodyPart, GetObjectError, HeadObjectError, HeadObjectResult, ListObjectsError, ObjectClientError,
ObjectClientResult, PutObjectError, PutObjectParams, PutObjectResult,
};
use crate::{ListObjectsResult, ObjectClient};

// Wrapper for injecting failures into a get stream
pub struct FailureGetWrapper<Client: ObjectClient, GetWrapperState> {
state: GetWrapperState,
result_fn: fn(&mut GetWrapperState) -> Result<(), Client::GetObjectError>,
result_fn: fn(&mut GetWrapperState) -> Result<(), Client::ClientError>,
}

#[allow(clippy::type_complexity)]
Expand All @@ -27,9 +30,20 @@ pub struct FailureClient<Client: ObjectClient, State, GetWrapperState> {
&str,
&str,
Option<Range<u64>>,
) -> Result<FailureGetWrapper<Client, GetWrapperState>, Client::GetObjectError>,
pub head_object_cb: fn(&mut State, &str, &str) -> Result<(), Client::HeadObjectError>,
pub list_objects_cb: fn(&mut State, &str, Option<&str>, &str, usize, &str) -> Result<(), Client::ListObjectsError>,
) -> Result<
FailureGetWrapper<Client, GetWrapperState>,
ObjectClientError<GetObjectError, Client::ClientError>,
>,
pub head_object_cb:
fn(&mut State, &str, &str) -> Result<(), ObjectClientError<HeadObjectError, Client::ClientError>>,
pub list_objects_cb: fn(
&mut State,
&str,
Option<&str>,
&str,
usize,
&str,
) -> Result<(), ObjectClientError<ListObjectsError, Client::ClientError>>,
}

#[async_trait]
Expand All @@ -40,17 +54,14 @@ where
GetWrapperState: Send + Sync + 'static,
{
type GetObjectResult = FailureGetResult<Client, GetWrapperState>;
type GetObjectError = Client::GetObjectError;
type HeadObjectError = Client::HeadObjectError;
type ListObjectsError = Client::ListObjectsError;
type PutObjectError = Client::PutObjectError;
type ClientError = Client::ClientError;

async fn get_object(
&self,
bucket: &str,
key: &str,
range: Option<Range<u64>>,
) -> Result<Self::GetObjectResult, Self::GetObjectError> {
) -> ObjectClientResult<Self::GetObjectResult, GetObjectError, Self::ClientError> {
let wrapper = (self.get_object_cb)(&mut *self.state.lock().unwrap(), bucket, key, range.clone())?;
let get_result = self.client.get_object(bucket, key, range).await?;
Ok(FailureGetResult {
Expand All @@ -67,7 +78,7 @@ where
delimiter: &str,
max_keys: usize,
prefix: &str,
) -> Result<ListObjectsResult, Self::ListObjectsError> {
) -> ObjectClientResult<ListObjectsResult, ListObjectsError, Self::ClientError> {
(self.list_objects_cb)(
&mut *self.state.lock().unwrap(),
bucket,
Expand All @@ -82,7 +93,11 @@ where
.await
}

async fn head_object(&self, bucket: &str, key: &str) -> Result<HeadObjectResult, Self::HeadObjectError> {
async fn head_object(
&self,
bucket: &str,
key: &str,
) -> ObjectClientResult<HeadObjectResult, HeadObjectError, Self::ClientError> {
(self.head_object_cb)(&mut *self.state.lock().unwrap(), bucket, key)?;
self.client.head_object(bucket, key).await
}
Expand All @@ -93,7 +108,7 @@ where
key: &str,
params: &PutObjectParams,
contents: impl Stream<Item = impl AsRef<[u8]> + Send> + Send,
) -> Result<PutObjectResult, Self::PutObjectError> {
) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
// TODO Add put fault injection hooks
self.client.put_object(bucket, key, params, contents).await
}
Expand All @@ -102,13 +117,13 @@ where
#[pin_project]
pub struct FailureGetResult<Client: ObjectClient, GetWrapperState> {
state: GetWrapperState,
result_fn: fn(&mut GetWrapperState) -> Result<(), Client::GetObjectError>,
result_fn: fn(&mut GetWrapperState) -> Result<(), Client::ClientError>,
#[pin]
get_result: Client::GetObjectResult,
}

impl<Client: ObjectClient, FailState> Stream for FailureGetResult<Client, FailState> {
type Item = Result<GetBodyPart, Client::GetObjectError>;
type Item = ObjectClientResult<GetBodyPart, GetObjectError, Client::ClientError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
Expand All @@ -121,22 +136,30 @@ impl<Client: ObjectClient, FailState> Stream for FailureGetResult<Client, FailSt
pub type CountdownFailureClient<Client> =
FailureClient<Client, CountdownFailureClientState<Client>, CountdownFailureGetState<Client>>;

pub type GetFailureMap<Client> = HashMap<
usize,
Result<
(usize, <Client as ObjectClient>::ClientError),
ObjectClientError<GetObjectError, <Client as ObjectClient>::ClientError>,
>,
>;

#[allow(clippy::type_complexity)]
#[derive(Default)]
pub struct CountdownFailureClientState<Client: ObjectClient> {
get_count: usize,
get_results: HashMap<usize, Result<(usize, Client::GetObjectError), Client::GetObjectError>>,
get_results: GetFailureMap<Client>,
head_count: usize,
head_failures: HashMap<usize, Client::HeadObjectError>,
head_failures: HashMap<usize, ObjectClientError<HeadObjectError, Client::ClientError>>,
list_count: usize,
list_failures: HashMap<usize, Client::ListObjectsError>,
list_failures: HashMap<usize, ObjectClientError<ListObjectsError, Client::ClientError>>,
}

#[derive(Debug, Default)]
pub struct CountdownFailureGetState<Client: ObjectClient> {
count: usize,
fail_count: usize,
error: Option<Client::GetObjectError>,
error: Option<Client::ClientError>,
}

#[allow(clippy::type_complexity)]
Expand All @@ -148,11 +171,11 @@ pub fn countdown_failure_client<Client: ObjectClient>(
// returns error E on the n'th read request from that stream, otherwise reads from the underlying stream
// (Note: we could also define a failure client that tracks offsets, and returns an error when the offset
// reaches a specified threshold.)
get_results: HashMap<usize, Result<(usize, Client::GetObjectError), Client::GetObjectError>>,
get_results: GetFailureMap<Client>,
// For HEAD and LIST, map entries are interpreted as follows:
// (k -> E) means inject error E on the k'th call to that operation
head_failures: HashMap<usize, Client::HeadObjectError>,
list_failures: HashMap<usize, Client::ListObjectsError>,
head_failures: HashMap<usize, ObjectClientError<HeadObjectError, Client::ClientError>>,
list_failures: HashMap<usize, ObjectClientError<ListObjectsError, Client::ClientError>>,
// TODO add put failures
) -> CountdownFailureClient<Client> {
let state = Mutex::new(CountdownFailureClientState {
Expand Down Expand Up @@ -212,7 +235,7 @@ pub fn countdown_failure_client<Client: ObjectClient>(
#[cfg(test)]
mod tests {
use super::*;
use crate::mock_client::{GetObjectError, MockClient, MockClientConfig, MockObject};
use crate::mock_client::{MockClient, MockClientConfig, MockClientError, MockObject};
use std::collections::HashSet;

#[tokio::test]
Expand All @@ -229,9 +252,20 @@ mod tests {
client.add_object(key, MockObject::from_bytes(&body));

let mut get_failures = HashMap::new();
get_failures.insert(2, Err(GetObjectError::InvalidRange(3)));
get_failures.insert(4, Err(GetObjectError::NoSuchObject));
get_failures.insert(5, Err(GetObjectError::NoSuchBucket));
get_failures.insert(
2,
Err(ObjectClientError::ClientError(MockClientError(
"invalid range, length=3".into(),
))),
);
get_failures.insert(
4,
Err(ObjectClientError::ClientError(MockClientError("no such object".into()))),
);
get_failures.insert(
5,
Err(ObjectClientError::ClientError(MockClientError("no such bucket".into()))),
);

let fail_client = countdown_failure_client(client, get_failures, HashMap::new(), HashMap::new());

Expand Down
4 changes: 1 addition & 3 deletions s3-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ mod s3_crt_client;
mod util;

pub use endpoint::{AddressingStyle, Endpoint};
pub use object_client::{ListObjectsResult, ObjectClient};
pub use s3_crt_client::get_object::GetObjectError;
pub use object_client::*;
pub use s3_crt_client::head_bucket::HeadBucketError;
pub use s3_crt_client::list_objects::ListObjectsError;
pub use s3_crt_client::{S3ClientConfig, S3CrtClient, S3RequestError};

#[cfg(test)]
Expand Down
Loading

0 comments on commit 7c6e34d

Please sign in to comment.