diff --git a/Cargo.lock b/Cargo.lock index cf59a0fbe..f246bd4d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1000,9 +1000,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7a407cfaa3385c4ae6b23e84623d48c2798d06e3e6a1878f7f59f17b3f86499" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" dependencies = [ "instant", ] @@ -1618,14 +1618,14 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5d732bc30207a6423068df043e3d02e0735b155ad7ce1a6f76fe2baa5b158de" +checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9" dependencies = [ "libc", "log", "wasi", - "windows-sys 0.42.0", + "windows-sys 0.45.0", ] [[package]] @@ -1707,9 +1707,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.17.0" +version = "1.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66" +checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" [[package]] name = "oorandom" @@ -2591,10 +2591,11 @@ dependencies = [ [[package]] name = "thread_local" -version = "1.1.4" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" dependencies = [ + "cfg-if", "once_cell", ] @@ -2701,9 +2702,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.6" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc6a3b08b64e6dfad376fa2432c7b1f01522e37a623c3050bc95db2d3ff21583" +checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" dependencies = [ "bytes", "futures-core", diff --git a/s3-client/src/failure_client.rs b/s3-client/src/failure_client.rs index 53a3ac4b2..45ee8d9b7 100644 --- a/s3-client/src/failure_client.rs +++ b/s3-client/src/failure_client.rs @@ -9,13 +9,16 @@ use async_trait::async_trait; use futures::Stream; use pin_project::pin_project; -use crate::object_client::{GetBodyPart, HeadObjectResult, PutObjectParams, PutObjectResult}; +use crate::object_client::{ + GetBodyPart, GetObjectError, HeadObjectError, HeadObjectResult, ListObjectsError, ObjectClientError, + ObjectClientResult, PutObjectError, PutObjectParams, PutObjectResult, +}; use crate::{ListObjectsResult, ObjectClient}; // Wrapper for injecting failures into a get stream pub struct FailureGetWrapper { state: GetWrapperState, - result_fn: fn(&mut GetWrapperState) -> Result<(), Client::GetObjectError>, + result_fn: fn(&mut GetWrapperState) -> Result<(), Client::ClientError>, } #[allow(clippy::type_complexity)] @@ -27,9 +30,20 @@ pub struct FailureClient { &str, &str, Option>, - ) -> Result, Client::GetObjectError>, - pub head_object_cb: fn(&mut State, &str, &str) -> Result<(), Client::HeadObjectError>, - pub list_objects_cb: fn(&mut State, &str, Option<&str>, &str, usize, &str) -> Result<(), Client::ListObjectsError>, + ) -> Result< + FailureGetWrapper, + ObjectClientError, + >, + pub head_object_cb: + fn(&mut State, &str, &str) -> Result<(), ObjectClientError>, + pub list_objects_cb: fn( + &mut State, + &str, + Option<&str>, + &str, + usize, + &str, + ) -> Result<(), ObjectClientError>, } #[async_trait] @@ -40,17 +54,14 @@ where GetWrapperState: Send + Sync + 'static, { type GetObjectResult = FailureGetResult; - type GetObjectError = Client::GetObjectError; - type HeadObjectError = Client::HeadObjectError; - type ListObjectsError = Client::ListObjectsError; - type PutObjectError = Client::PutObjectError; + type ClientError = Client::ClientError; async fn get_object( &self, bucket: &str, key: &str, range: Option>, - ) -> Result { + ) -> ObjectClientResult { let wrapper = (self.get_object_cb)(&mut *self.state.lock().unwrap(), bucket, key, range.clone())?; let get_result = self.client.get_object(bucket, key, range).await?; Ok(FailureGetResult { @@ -67,7 +78,7 @@ where delimiter: &str, max_keys: usize, prefix: &str, - ) -> Result { + ) -> ObjectClientResult { (self.list_objects_cb)( &mut *self.state.lock().unwrap(), bucket, @@ -82,7 +93,11 @@ where .await } - async fn head_object(&self, bucket: &str, key: &str) -> Result { + async fn head_object( + &self, + bucket: &str, + key: &str, + ) -> ObjectClientResult { (self.head_object_cb)(&mut *self.state.lock().unwrap(), bucket, key)?; self.client.head_object(bucket, key).await } @@ -93,7 +108,7 @@ where key: &str, params: &PutObjectParams, contents: impl Stream + Send> + Send, - ) -> Result { + ) -> ObjectClientResult { // TODO Add put fault injection hooks self.client.put_object(bucket, key, params, contents).await } @@ -102,13 +117,13 @@ where #[pin_project] pub struct FailureGetResult { state: GetWrapperState, - result_fn: fn(&mut GetWrapperState) -> Result<(), Client::GetObjectError>, + result_fn: fn(&mut GetWrapperState) -> Result<(), Client::ClientError>, #[pin] get_result: Client::GetObjectResult, } impl Stream for FailureGetResult { - type Item = Result; + type Item = ObjectClientResult; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.project(); @@ -121,22 +136,30 @@ impl Stream for FailureGetResult = FailureClient, CountdownFailureGetState>; +pub type GetFailureMap = HashMap< + usize, + Result< + (usize, ::ClientError), + ObjectClientError::ClientError>, + >, +>; + #[allow(clippy::type_complexity)] #[derive(Default)] pub struct CountdownFailureClientState { get_count: usize, - get_results: HashMap>, + get_results: GetFailureMap, head_count: usize, - head_failures: HashMap, + head_failures: HashMap>, list_count: usize, - list_failures: HashMap, + list_failures: HashMap>, } #[derive(Debug, Default)] pub struct CountdownFailureGetState { count: usize, fail_count: usize, - error: Option, + error: Option, } #[allow(clippy::type_complexity)] @@ -148,11 +171,11 @@ pub fn countdown_failure_client( // returns error E on the n'th read request from that stream, otherwise reads from the underlying stream // (Note: we could also define a failure client that tracks offsets, and returns an error when the offset // reaches a specified threshold.) - get_results: HashMap>, + get_results: GetFailureMap, // For HEAD and LIST, map entries are interpreted as follows: // (k -> E) means inject error E on the k'th call to that operation - head_failures: HashMap, - list_failures: HashMap, + head_failures: HashMap>, + list_failures: HashMap>, // TODO add put failures ) -> CountdownFailureClient { let state = Mutex::new(CountdownFailureClientState { @@ -212,7 +235,7 @@ pub fn countdown_failure_client( #[cfg(test)] mod tests { use super::*; - use crate::mock_client::{GetObjectError, MockClient, MockClientConfig, MockObject}; + use crate::mock_client::{MockClient, MockClientConfig, MockClientError, MockObject}; use std::collections::HashSet; #[tokio::test] @@ -229,9 +252,20 @@ mod tests { client.add_object(key, MockObject::from_bytes(&body)); let mut get_failures = HashMap::new(); - get_failures.insert(2, Err(GetObjectError::InvalidRange(3))); - get_failures.insert(4, Err(GetObjectError::NoSuchObject)); - get_failures.insert(5, Err(GetObjectError::NoSuchBucket)); + get_failures.insert( + 2, + Err(ObjectClientError::ClientError(MockClientError( + "invalid range, length=3".into(), + ))), + ); + get_failures.insert( + 4, + Err(ObjectClientError::ClientError(MockClientError("no such object".into()))), + ); + get_failures.insert( + 5, + Err(ObjectClientError::ClientError(MockClientError("no such bucket".into()))), + ); let fail_client = countdown_failure_client(client, get_failures, HashMap::new(), HashMap::new()); diff --git a/s3-client/src/lib.rs b/s3-client/src/lib.rs index 96b865dc4..35d35d7e5 100644 --- a/s3-client/src/lib.rs +++ b/s3-client/src/lib.rs @@ -6,10 +6,8 @@ mod s3_crt_client; mod util; pub use endpoint::{AddressingStyle, Endpoint}; -pub use object_client::{ListObjectsResult, ObjectClient}; -pub use s3_crt_client::get_object::GetObjectError; +pub use object_client::*; pub use s3_crt_client::head_bucket::HeadBucketError; -pub use s3_crt_client::list_objects::ListObjectsError; pub use s3_crt_client::{S3ClientConfig, S3CrtClient, S3RequestError}; #[cfg(test)] diff --git a/s3-client/src/mock_client.rs b/s3-client/src/mock_client.rs index 9063975d5..514203d24 100644 --- a/s3-client/src/mock_client.rs +++ b/s3-client/src/mock_client.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::collections::{BTreeMap, BTreeSet}; use std::ops::Range; use std::pin::Pin; @@ -12,7 +13,8 @@ use time::OffsetDateTime; use tracing::trace; use crate::object_client::{ - GetBodyPart, HeadObjectResult, ListObjectsResult, ObjectInfo, PutObjectParams, PutObjectResult, + GetBodyPart, GetObjectError, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult, + ObjectClientError, ObjectClientResult, ObjectInfo, PutObjectError, PutObjectParams, PutObjectResult, }; use crate::ObjectClient; @@ -149,7 +151,7 @@ pub struct GetObjectResult { } impl Stream for GetObjectResult { - type Item = Result; + type Item = ObjectClientResult; fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { if self.length == 0 { @@ -166,61 +168,41 @@ impl Stream for GetObjectResult { } } -#[derive(Error, Debug, Clone, Copy, PartialEq, Eq)] -pub enum GetObjectError { - #[error("bucket does not exist")] - NoSuchBucket, - #[error("object does not exist")] - NoSuchObject, - #[error("invalid range for object size {0}")] - InvalidRange(u64), -} - -#[derive(Error, Debug, Clone, Copy, PartialEq, Eq)] -pub enum ListObjectsError { - #[error("bucket does not exist")] - NoSuchBucket, -} +#[derive(Debug, Error, PartialEq, Eq)] +pub struct MockClientError(pub Cow<'static, str>); -#[derive(Error, Debug, Clone, Copy, PartialEq, Eq)] -pub enum HeadObjectError { - #[error("bucket does not exist")] - NoSuchBucket, - #[error("object does not exist")] - NoSuchObject, +impl std::fmt::Display for MockClientError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } } -#[derive(Error, Debug, Clone, Copy, PartialEq, Eq)] -pub enum PutObjectError { - #[error("bucket does not exist")] - NoSuchBucket, +fn mock_client_error(s: impl Into>) -> ObjectClientResult { + Err(ObjectClientError::ClientError(MockClientError(s.into()))) } #[async_trait] impl ObjectClient for MockClient { type GetObjectResult = GetObjectResult; - type GetObjectError = GetObjectError; - type HeadObjectError = HeadObjectError; - type ListObjectsError = ListObjectsError; - type PutObjectError = PutObjectError; + type ClientError = MockClientError; async fn get_object( &self, bucket: &str, key: &str, range: Option>, - ) -> Result { + ) -> ObjectClientResult { trace!(bucket, key, ?range, "GetObject"); if bucket != self.config.bucket { - return Err(GetObjectError::NoSuchBucket); + return Err(ObjectClientError::ServiceError(GetObjectError::NoSuchBucket)); } let objects = self.objects.read().unwrap(); if let Some(object) = objects.get(key) { let (next_offset, length) = if let Some(range) = range { if range.start >= object.len() as u64 || range.end > object.len() as u64 { - return Err(GetObjectError::InvalidRange(object.len() as u64)); + return mock_client_error(format!("invalid range, length={}", object.len())); } (range.start, (range.end - range.start) as usize) } else { @@ -234,15 +216,19 @@ impl ObjectClient for MockClient { part_size: self.config.part_size, }) } else { - Err(GetObjectError::NoSuchObject) + Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) } } - async fn head_object(&self, bucket: &str, key: &str) -> Result { + async fn head_object( + &self, + bucket: &str, + key: &str, + ) -> ObjectClientResult { trace!(bucket, key, "HeadObject"); if bucket != self.config.bucket { - return Err(HeadObjectError::NoSuchBucket); + return Err(ObjectClientError::ServiceError(HeadObjectError::NotFound)); } let objects = self.objects.read().unwrap(); @@ -258,7 +244,7 @@ impl ObjectClient for MockClient { }, }) } else { - Err(HeadObjectError::NoSuchObject) + Err(ObjectClientError::ServiceError(HeadObjectError::NotFound)) } } @@ -269,11 +255,11 @@ impl ObjectClient for MockClient { delimiter: &str, max_keys: usize, prefix: &str, - ) -> Result { + ) -> ObjectClientResult { trace!(bucket, ?continuation_token, delimiter, max_keys, prefix, "ListObjects"); if bucket != self.config.bucket { - return Err(ListObjectsError::NoSuchBucket); + return Err(ObjectClientError::ServiceError(ListObjectsError::NoSuchBucket)); } // TODO delimiter and prefix should be optional in the API @@ -371,11 +357,11 @@ impl ObjectClient for MockClient { key: &str, _params: &PutObjectParams, contents: impl Stream + Send> + Send, - ) -> Result { + ) -> ObjectClientResult { trace!(bucket, key, "PutObject"); if bucket != self.config.bucket { - return Err(PutObjectError::NoSuchBucket); + return Err(ObjectClientError::ServiceError(PutObjectError::NoSuchBucket)); } let mut buffer = vec![]; @@ -453,56 +439,47 @@ mod tests { rng.fill_bytes(&mut body); client.add_object("key1", body[..].into()); - assert_eq!( - client - .get_object("wrong_bucket", "key1", None) - .await - .expect_err("should fail"), - GetObjectError::NoSuchBucket - ); + macro_rules! assert_client_error { + ($e:expr, $err:expr) => { + let err = $e.expect_err("should fail"); + match err { + ObjectClientError::ClientError(MockClientError(m)) => { + assert_eq!(&*m, $err); + } + _ => assert!(false, "wrong error type"), + } + }; + } - assert_eq!( - client - .get_object("test_bucket", "wrong_key", None) - .await - .expect_err("should fail"), - GetObjectError::NoSuchObject - ); + assert!(matches!( + client.get_object("wrong_bucket", "key1", None).await, + Err(ObjectClientError::ServiceError(GetObjectError::NoSuchBucket)) + )); + + assert!(matches!( + client.get_object("test_bucket", "wrong_key", None).await, + Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) + )); - assert_eq!( - client - .get_object("test_bucket", "key1", Some(0..2001)) - .await - .expect_err("should fail"), - GetObjectError::InvalidRange(2000) + assert_client_error!( + client.get_object("test_bucket", "key1", Some(0..2001)).await, + "invalid range, length=2000" ); - assert_eq!( - client - .get_object("test_bucket", "key1", Some(2000..2000)) - .await - .expect_err("should fail"), - GetObjectError::InvalidRange(2000) + assert_client_error!( + client.get_object("test_bucket", "key1", Some(2000..2000)).await, + "invalid range, length=2000" ); - assert_eq!( - client - .get_object("test_bucket", "key1", Some(500..2001)) - .await - .expect_err("should fail"), - GetObjectError::InvalidRange(2000) + assert_client_error!( + client.get_object("test_bucket", "key1", Some(500..2001)).await, + "invalid range, length=2000" ); - assert_eq!( - client - .get_object("test_bucket", "key1", Some(5000..2001)) - .await - .expect_err("should fail"), - GetObjectError::InvalidRange(2000) + assert_client_error!( + client.get_object("test_bucket", "key1", Some(5000..2001)).await, + "invalid range, length=2000" ); - assert_eq!( - client - .get_object("test_bucket", "key1", Some(5000..1)) - .await - .expect_err("should fail"), - GetObjectError::InvalidRange(2000) + assert_client_error!( + client.get_object("test_bucket", "key1", Some(5000..1)).await, + "invalid range, length=2000" ); } diff --git a/s3-client/src/object_client.rs b/s3-client/src/object_client.rs index 59a9d48dc..9c1ff6324 100644 --- a/s3-client/src/object_client.rs +++ b/s3-client/src/object_client.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use auto_impl::auto_impl; use futures::Stream; use std::ops::Range; +use thiserror::Error; use time::OffsetDateTime; /// A single element of the [ObjectClient::get_object] response is a pair of offset within the @@ -12,11 +13,8 @@ pub type GetBodyPart = (u64, Box<[u8]>); #[async_trait] #[auto_impl(Arc)] pub trait ObjectClient { - type GetObjectResult: Stream> + Send; - type GetObjectError: std::error::Error + Send + Sync + 'static; - type HeadObjectError: std::error::Error + Send + Sync + 'static; - type ListObjectsError: std::error::Error + Send + Sync + 'static; - type PutObjectError: std::error::Error + Send + Sync + 'static; + type GetObjectResult: Stream> + Send; + type ClientError: std::error::Error + Send + Sync + 'static; /// Get an object from the object store. Returns a stream of body parts of the object. Parts are /// guaranteed to be returned by the stream in order and contiguously. @@ -25,7 +23,7 @@ pub trait ObjectClient { bucket: &str, key: &str, range: Option>, - ) -> Result; + ) -> ObjectClientResult; /// List the objects in a bucket under a given prefix async fn list_objects( @@ -35,10 +33,14 @@ pub trait ObjectClient { delimiter: &str, max_keys: usize, prefix: &str, - ) -> Result; + ) -> ObjectClientResult; /// Retrieve object metadata without retrieving the object contents - async fn head_object(&self, bucket: &str, key: &str) -> Result; + async fn head_object( + &self, + bucket: &str, + key: &str, + ) -> ObjectClientResult; /// Put an object into the object store. /// The contents are provided by the client as an async stream of buffers. @@ -48,7 +50,41 @@ pub trait ObjectClient { key: &str, params: &PutObjectParams, contents: impl Stream + Send> + Send, - ) -> Result; + ) -> ObjectClientResult; +} + +/// Errors returned by calls to an [ObjectClient]. Errors that are explicitly modeled on a +/// per-request-type basis are [ServiceError]s. Other generic or unhandled errors are +/// [ClientError]s. +/// +/// The distinction between these two types of error can sometimes be blurry. As a rough heuristic, +/// [ServiceError]s are those that *any reasonable implementation* of an object client would be +/// capable of experiencing, and [ClientError]s are anything else. For example, any object client +/// could experience a "no such key" error, but only object clients that implement a permissions +/// system could experience "permission denied" errors. When in doubt, we err towards *not* adding +/// new [ServiceError]s, as they are public API for *every* object client. +#[derive(Debug, Error)] +pub enum ObjectClientError { + /// An error returned by the service itself + #[error("Service error")] + ServiceError(#[source] S), + + /// An error within the object client (for example, an unexpected response, or a failure to + /// construct the request). + #[error("Client error")] + ClientError(#[from] C), +} + +pub type ObjectClientResult = Result>; + +#[derive(Debug, Error, PartialEq, Eq)] +#[non_exhaustive] +pub enum GetObjectError { + #[error("The bucket does not exist")] + NoSuchBucket, + + #[error("The key does not exist")] + NoSuchKey, } /// Result of a [ObjectClient::list_objects] request @@ -68,6 +104,13 @@ pub struct ListObjectsResult { pub next_continuation_token: Option, } +#[derive(Debug, Error, PartialEq, Eq)] +#[non_exhaustive] +pub enum ListObjectsError { + #[error("The bucket does not exist")] + NoSuchBucket, +} + /// Result of a [ObjectClient::head_object] request #[derive(Debug)] pub struct HeadObjectResult { @@ -78,6 +121,14 @@ pub struct HeadObjectResult { pub object: ObjectInfo, } +#[derive(Debug, Error, PartialEq, Eq)] +#[non_exhaustive] +pub enum HeadObjectError { + /// Note that HeadObject cannot distinguish between NoSuchBucket and NoSuchKey errors + #[error("The object was not found")] + NotFound, +} + /// Parameters to a [ObjectClient::put_object] request /// TODO: Populate this struct with parameters from the S3 API, e.g., storage class, encryption. #[derive(Debug, Default)] @@ -88,6 +139,13 @@ pub struct PutObjectParams {} #[derive(Debug)] pub struct PutObjectResult {} +#[derive(Debug, Error, PartialEq, Eq)] +#[non_exhaustive] +pub enum PutObjectError { + #[error("The bucket does not exist")] + NoSuchBucket, +} + /// Metadata about a single S3 object. /// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_Object.html for more details. #[derive(Debug)] diff --git a/s3-client/src/s3_crt_client.rs b/s3-client/src/s3_crt_client.rs index 359f6903c..d7f92fe1a 100644 --- a/s3-client/src/s3_crt_client.rs +++ b/s3-client/src/s3_crt_client.rs @@ -27,11 +27,8 @@ use thiserror::Error; use tracing::{debug, error, trace, warn, Span}; use crate::endpoint::{AddressingStyle, Endpoint, EndpointError}; -use crate::object_client::{HeadObjectResult, ListObjectsResult, ObjectClient, PutObjectParams, PutObjectResult}; -use crate::s3_crt_client::get_object::{GetObjectError, GetObjectRequest}; -use crate::s3_crt_client::head_object::HeadObjectError; -use crate::s3_crt_client::list_objects::ListObjectsError; -use crate::s3_crt_client::put_object::PutObjectError; +use crate::object_client::*; +use crate::s3_crt_client::get_object::GetObjectRequest; macro_rules! request_span { ($self:expr, $method:expr) => {{ @@ -165,17 +162,19 @@ impl S3CrtClient { } /// Make an HTTP request using this S3 client that invokes the given callbacks as the request - /// makes progress. The `on_finish` callback is invoked only if the request succeeds. - fn make_meta_request( + /// makes progress. The `on_finish` callback is invoked on both successful and failed requests; + /// it should call `.is_err()` on the [MetaRequestResult] to decide whether the request + /// succeeded. + fn make_meta_request( &self, message: S3Message, meta_request_type: MetaRequestType, request_span: Span, mut on_headers: impl FnMut(&Headers, i32) + Send + 'static, mut on_body: impl FnMut(u64, &[u8]) + Send + 'static, - on_finish: impl FnOnce(MetaRequestResult) -> Result + Send + 'static, - ) -> Result, S3RequestError> { - let (tx, rx) = oneshot::channel::>>(); + on_finish: impl FnOnce(MetaRequestResult) -> ObjectClientResult + Send + 'static, + ) -> Result, S3RequestError> { + let (tx, rx) = oneshot::channel::>(); let span_body = request_span.clone(); let span_finish = request_span; @@ -219,20 +218,14 @@ impl S3CrtClient { metrics::counter!("s3.meta_requests", 1, "op" => op); - let result = if !request_result.is_err() { - debug!( - request_id = request_id.as_deref().unwrap_or("unknown"), - duration_us = start_time.elapsed().as_micros(), - "request finished" - ); - on_finish(request_result).map_err(|e| S3RequestError::ServiceError(e)) - } else { + if request_result.is_err() { warn!( request_id = request_id.as_deref().unwrap_or("unknown"), duration_us = start_time.elapsed().as_micros(), ?request_result, "request failed" ); + // If it's not a real HTTP status, encode the CRT error instead let error_status = if request_result.response_status >= 100 { request_result.response_status @@ -240,8 +233,15 @@ impl S3CrtClient { -request_result.crt_error.raw_error() }; metrics::counter!("s3.meta_request_failures", 1, "op" => op, "status" => format!("{error_status}")); - Err(S3RequestError::ResponseError(request_result)) - }; + } else { + debug!( + request_id = request_id.as_deref().unwrap_or("unknown"), + duration_us = start_time.elapsed().as_micros(), + "request finished" + ); + } + + let result = on_finish(request_result); let _ = tx.send(result); }) @@ -256,13 +256,18 @@ impl S3CrtClient { Ok(S3HttpRequest { receiver: rx }) } - /// Make an HTTP request using this S3 client that returns the body on success - fn make_simple_http_request( + /// Make an HTTP request using this S3 client that returns the body on success or invokes the + /// given callback on failure. + /// + /// The `on_error` callback can assume that `result.is_err()` is true for the result it + /// receives. + fn make_simple_http_request( &self, message: S3Message, request_type: MetaRequestType, request_span: Span, - ) -> Result, E>, S3RequestError> { + on_error: impl FnOnce(MetaRequestResult) -> ObjectClientError + Send + 'static, + ) -> Result, E>, S3RequestError> { // Accumulate the body of the response into this Vec let body: Arc>> = Default::default(); let body_clone = Arc::clone(&body); @@ -277,7 +282,13 @@ impl S3CrtClient { assert_eq!(offset as usize, body.len()); body.extend_from_slice(data); }, - move |_result| Ok(std::mem::take(&mut *body.lock().unwrap())), + move |result| { + if result.is_err() { + Err(on_error(result)) + } else { + Ok(std::mem::take(&mut *body.lock().unwrap())) + } + }, ) } @@ -364,19 +375,23 @@ impl<'a> S3Message<'a> { #[derive(Debug)] #[pin_project] -struct S3HttpRequest { +struct S3HttpRequest { #[pin] - receiver: oneshot::Receiver>>, + receiver: oneshot::Receiver>, } -impl Future for S3HttpRequest { - type Output = Result>; +impl Future for S3HttpRequest { + type Output = ObjectClientResult; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - this.receiver - .poll(cx) - .map(|result| result.unwrap_or_else(|err| Err(S3RequestError::InternalError(Box::new(err))))) + this.receiver.poll(cx).map(|result| { + result.unwrap_or_else(|err| { + Err(ObjectClientError::ClientError(S3RequestError::InternalError(Box::new( + err, + )))) + }) + }) } } @@ -391,26 +406,28 @@ pub enum NewClientError { /// Failed S3 request results #[derive(Error, Debug)] -pub enum S3RequestError { +pub enum S3RequestError { /// An internal error from within the S3 client. The request may have been sent. #[error("Internal S3 client error")] InternalError(#[source] Box), /// An internal error from within the AWS Common Runtime. The request may have been sent. - #[error("Unknown CRT error: {0}")] + #[error("Unknown CRT error")] CrtError(#[from] aws_crt_s3::common::error::Error), /// An error during construction of a request. The request was not sent. - #[error("Failed to construct request: {0}")] + #[error("Failed to construct request")] ConstructionFailure(#[from] ConstructionError), /// The request was sent but an unknown or unhandled failure occurred while processing it. #[error("Unknown response error: {0:?}")] ResponseError(MetaRequestResult), +} - /// The request was sent and the service returned an error. - #[error("Error received from S3: {0:?}")] - ServiceError(#[source] E), +impl S3RequestError { + fn construction_failure(inner: impl Into) -> Self { + S3RequestError::ConstructionFailure(inner.into()) + } } #[derive(Error, Debug)] @@ -427,17 +444,14 @@ pub enum ConstructionError { #[async_trait] impl ObjectClient for S3CrtClient { type GetObjectResult = GetObjectRequest; - type GetObjectError = S3RequestError; - type HeadObjectError = S3RequestError; - type ListObjectsError = S3RequestError; - type PutObjectError = S3RequestError; + type ClientError = S3RequestError; async fn get_object( &self, bucket: &str, key: &str, range: Option>, - ) -> Result { + ) -> ObjectClientResult { self.get_object(bucket, key, range) } @@ -448,12 +462,16 @@ impl ObjectClient for S3CrtClient { delimiter: &str, max_keys: usize, prefix: &str, - ) -> Result { + ) -> ObjectClientResult { self.list_objects(bucket, continuation_token, delimiter, max_keys, prefix) .await } - async fn head_object(&self, bucket: &str, key: &str) -> Result { + async fn head_object( + &self, + bucket: &str, + key: &str, + ) -> ObjectClientResult { self.head_object(bucket, key).await } @@ -463,7 +481,7 @@ impl ObjectClient for S3CrtClient { key: &str, params: &PutObjectParams, contents: impl futures::Stream + Send> + Send, - ) -> Result { + ) -> ObjectClientResult { self.put_object(bucket, key, params, contents).await } } diff --git a/s3-client/src/s3_crt_client/get_object.rs b/s3-client/src/s3_crt_client/get_object.rs index 7a6622920..7d45d181d 100644 --- a/s3-client/src/s3_crt_client/get_object.rs +++ b/s3-client/src/s3_crt_client/get_object.rs @@ -9,12 +9,11 @@ use aws_crt_s3::s3::client::MetaRequestType; use futures::channel::mpsc::UnboundedReceiver; use futures::Stream; use pin_project::pin_project; -use thiserror::Error; -use tracing::{debug, error}; +use tracing::debug; -use crate::object_client::GetBodyPart; -use crate::s3_crt_client::{ConstructionError, S3HttpRequest}; -use crate::{S3CrtClient, S3RequestError}; +use crate::object_client::{GetBodyPart, GetObjectError, ObjectClientError}; +use crate::s3_crt_client::S3HttpRequest; +use crate::{ObjectClientResult, S3CrtClient, S3RequestError}; impl S3CrtClient { /// Create and begin a new GetObject request. The returned [GetObjectRequest] is a [Stream] (for @@ -25,27 +24,33 @@ impl S3CrtClient { bucket: &str, key: &str, range: Option>, - ) -> Result> { + ) -> Result> { let span = request_span!(self, "get_object"); span.in_scope( || debug!(?bucket, ?key, ?range, size=?range.as_ref().map(|range| range.end - range.start), "new request"), ); - let mut message = self.new_request_template("GET", bucket)?; + let mut message = self + .new_request_template("GET", bucket) + .map_err(S3RequestError::construction_failure)?; // Overwrite "accept" header since this returns raw object data. - message.add_header(&Header::new("accept", "*/*"))?; + message + .add_header(&Header::new("accept", "*/*")) + .map_err(S3RequestError::construction_failure)?; if let Some(range) = range { // Range HTTP header is bounded below *inclusive* let range_value = format!("bytes={}-{}", range.start, range.end.saturating_sub(1)); message .add_header(&Header::new("Range", range_value)) - .map_err(ConstructionError::CrtError)?; + .map_err(S3RequestError::construction_failure)?; } let key = format!("/{key}"); - message.set_request_path(key).map_err(ConstructionError::CrtError)?; + message + .set_request_path(key) + .map_err(S3RequestError::construction_failure)?; let (sender, receiver) = futures::channel::mpsc::unbounded(); @@ -57,7 +62,13 @@ impl S3CrtClient { move |offset, data| { let _ = sender.unbounded_send(Ok((offset, data.into()))); }, - move |_result| Ok(()), + move |result| { + if result.is_err() { + Err(ObjectClientError::ClientError(S3RequestError::ResponseError(result))) + } else { + Ok(()) + } + }, )?; Ok(GetObjectRequest { @@ -68,13 +79,6 @@ impl S3CrtClient { } } -#[derive(Error, Debug)] -#[non_exhaustive] -pub enum GetObjectError { - #[error("CRT error")] - CRTError(#[from] Error), -} - #[derive(Debug)] #[pin_project] pub struct GetObjectRequest { @@ -86,7 +90,7 @@ pub struct GetObjectRequest { } impl Stream for GetObjectRequest { - type Item = Result>; + type Item = ObjectClientResult; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if self.finished { @@ -96,7 +100,7 @@ impl Stream for GetObjectRequest { let this = self.project(); if let Poll::Ready(Some(val)) = this.finish_receiver.poll_next(cx) { - return Poll::Ready(Some(val.map_err(|e| e.into()))); + return Poll::Ready(Some(val.map_err(|e| ObjectClientError::ClientError(e.into())))); } match this.request.poll(cx) { diff --git a/s3-client/src/s3_crt_client/head_bucket.rs b/s3-client/src/s3_crt_client/head_bucket.rs index b3e0b3698..7f22f21f3 100644 --- a/s3-client/src/s3_crt_client/head_bucket.rs +++ b/s3-client/src/s3_crt_client/head_bucket.rs @@ -1,4 +1,4 @@ -use crate::s3_crt_client::ConstructionError; +use crate::object_client::{ObjectClientError, ObjectClientResult}; use crate::{S3CrtClient, S3RequestError}; use aws_crt_s3::s3::client::{MetaRequestResult, MetaRequestType}; use thiserror::Error; @@ -15,29 +15,34 @@ pub enum HeadBucketError { } impl S3CrtClient { - pub async fn head_bucket(&self, bucket: &str) -> Result<(), S3RequestError> { + pub async fn head_bucket(&self, bucket: &str) -> ObjectClientResult<(), HeadBucketError, S3RequestError> { let body = { - let mut message = self.new_request_template("HEAD", bucket)?; + let mut message = self + .new_request_template("HEAD", bucket) + .map_err(S3RequestError::construction_failure)?; - message.set_request_path("/").map_err(ConstructionError::CrtError)?; + message + .set_request_path("/") + .map_err(S3RequestError::construction_failure)?; let span = request_span!(self, "head_bucket"); span.in_scope(|| debug!(?bucket, endpoint = ?self.endpoint, "new request")); - self.make_simple_http_request(message, MetaRequestType::Default, span)? + self.make_simple_http_request(message, MetaRequestType::Default, span, |request_result| { + match request_result.response_status { + 301 => try_parse_redirect(&request_result) + .map(ObjectClientError::ServiceError) + .unwrap_or(ObjectClientError::ClientError(S3RequestError::ResponseError( + request_result, + ))), + // S3 returns 400 for invalid or expired STS tokens + 400 | 403 => ObjectClientError::ServiceError(HeadBucketError::PermissionDenied(request_result)), + _ => ObjectClientError::ClientError(S3RequestError::ResponseError(request_result)), + } + })? }; - body.await.map(|_| ()).map_err(|err| match err { - S3RequestError::ResponseError(request_result) => match request_result.response_status { - 301 => try_parse_redirect(&request_result) - .map(S3RequestError::ServiceError) - .unwrap_or(S3RequestError::ResponseError(request_result)), - // S3 returns 400 for invalid or expired STS tokens - 400 | 403 => S3RequestError::ServiceError(HeadBucketError::PermissionDenied(request_result)), - _ => S3RequestError::ResponseError(request_result), - }, - err => err, - }) + body.await.map(|_body| ()) } } diff --git a/s3-client/src/s3_crt_client/head_object.rs b/s3-client/src/s3_crt_client/head_object.rs index 84e6db73f..f26b7b680 100644 --- a/s3-client/src/s3_crt_client/head_object.rs +++ b/s3-client/src/s3_crt_client/head_object.rs @@ -1,5 +1,5 @@ -use crate::object_client::{HeadObjectResult, ObjectInfo}; -use crate::s3_crt_client::{ConstructionError, S3RequestError}; +use crate::object_client::{HeadObjectError, HeadObjectResult, ObjectClientError, ObjectClientResult, ObjectInfo}; +use crate::s3_crt_client::S3RequestError; use crate::S3CrtClient; use aws_crt_s3::http::request_response::{Headers, HeadersError}; use aws_crt_s3::s3::client::MetaRequestType; @@ -11,13 +11,6 @@ use time::format_description::well_known::Rfc2822; use time::OffsetDateTime; use tracing::{debug, error}; -#[derive(Error, Debug)] -#[non_exhaustive] -pub enum HeadObjectError { - #[error("Error parsing response: {0}")] - ParseError(#[from] ParseError), -} - #[derive(Error, Debug)] #[non_exhaustive] pub enum ParseError { @@ -67,15 +60,17 @@ impl S3CrtClient { &self, bucket: &str, key: &str, - ) -> Result> { + ) -> ObjectClientResult { let request = { - let mut message = self.new_request_template("HEAD", bucket)?; + let mut message = self + .new_request_template("HEAD", bucket) + .map_err(S3RequestError::construction_failure)?; // Don't URI encode the key, since "/" needs to be preserved let key = key.to_string(); message .set_request_path(format!("/{key}")) - .map_err(ConstructionError::CrtError)?; + .map_err(S3RequestError::construction_failure)?; let bucket = bucket.to_owned(); @@ -100,7 +95,18 @@ impl S3CrtClient { )); }, |_, _| (), - move |_result| header.lock().unwrap().take().unwrap().map_err(|e| e.into()), + move |result| { + if result.is_err() { + Err(ObjectClientError::ClientError(S3RequestError::ResponseError(result))) + } else { + header + .lock() + .unwrap() + .take() + .unwrap() + .map_err(|e| ObjectClientError::ClientError(S3RequestError::InternalError(Box::new(e)))) + } + }, )? }; diff --git a/s3-client/src/s3_crt_client/list_objects.rs b/s3-client/src/s3_crt_client/list_objects.rs index c0f14fe5e..e31abc262 100644 --- a/s3-client/src/s3_crt_client/list_objects.rs +++ b/s3-client/src/s3_crt_client/list_objects.rs @@ -1,5 +1,5 @@ -use crate::object_client::{ListObjectsResult, ObjectInfo}; -use crate::s3_crt_client::{ConstructionError, S3RequestError}; +use crate::object_client::{ListObjectsError, ListObjectsResult, ObjectClientError, ObjectClientResult, ObjectInfo}; +use crate::s3_crt_client::S3RequestError; use crate::S3CrtClient; use aws_crt_s3::s3::client::MetaRequestType; use std::str::FromStr; @@ -8,13 +8,6 @@ use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; use tracing::{debug, error}; -#[derive(Error, Debug)] -#[non_exhaustive] -pub enum ListObjectsError { - #[error("Error parsing response: {0}")] - ParseError(#[from] ParseError), -} - #[derive(Error, Debug)] #[non_exhaustive] pub enum ParseError { @@ -139,10 +132,12 @@ impl S3CrtClient { delimiter: &str, max_keys: usize, prefix: &str, - ) -> Result> { + ) -> ObjectClientResult { // Scope the endpoint, message, etc. since otherwise rustc thinks we use Message across the await. let body = { - let mut message = self.new_request_template("GET", bucket)?; + let mut message = self + .new_request_template("GET", bucket) + .map_err(S3RequestError::construction_failure)?; // Don't URI encode delimiter or prefix, since "/" in those needs to be a real "/". let mut request = format!("/?list-type=2&delimiter={delimiter}&max-keys={max_keys}&prefix={prefix}"); @@ -153,7 +148,9 @@ impl S3CrtClient { request = format!("{request}&continuation-token={continuation_token}"); } - message.set_request_path(request).map_err(ConstructionError::CrtError)?; + message + .set_request_path(request) + .map_err(S3RequestError::construction_failure)?; let span = request_span!(self, "list_objects"); span.in_scope(|| { @@ -167,11 +164,14 @@ impl S3CrtClient { ) }); - self.make_simple_http_request(message, MetaRequestType::Default, span)? + self.make_simple_http_request(message, MetaRequestType::Default, span, |result| { + ObjectClientError::ClientError(S3RequestError::ResponseError(result)) + })? }; let body = body.await?; - ListObjectsResult::parse_from_bytes(&body).map_err(|e| S3RequestError::ServiceError(e.into())) + ListObjectsResult::parse_from_bytes(&body) + .map_err(|e| ObjectClientError::ClientError(S3RequestError::InternalError(e.into()))) } } diff --git a/s3-client/src/s3_crt_client/put_object.rs b/s3-client/src/s3_crt_client/put_object.rs index 12f1f68fa..aca77085f 100644 --- a/s3-client/src/s3_crt_client/put_object.rs +++ b/s3-client/src/s3_crt_client/put_object.rs @@ -1,17 +1,11 @@ -use crate::object_client::{PutObjectParams, PutObjectResult}; -use crate::s3_crt_client::ConstructionError; -use crate::{S3CrtClient, S3RequestError}; +use crate::object_client::{ObjectClientResult, PutObjectError, PutObjectParams, PutObjectResult}; +use crate::{ObjectClientError, S3CrtClient, S3RequestError}; use aws_crt_s3::http::request_response::Header; use aws_crt_s3::io::stream::InputStream; use aws_crt_s3::s3::client::MetaRequestType; use futures::{Stream, StreamExt}; -use thiserror::Error; use tracing::debug; -#[derive(Error, Debug)] -#[non_exhaustive] -pub enum PutObjectError {} - impl S3CrtClient { pub(super) async fn put_object( &self, @@ -19,7 +13,7 @@ impl S3CrtClient { key: &str, params: &PutObjectParams, contents: impl Stream + Send> + Send, - ) -> Result> { + ) -> ObjectClientResult { let mut buffer = vec![]; // Accumulate the stream contents into a buffer. @@ -32,22 +26,29 @@ impl S3CrtClient { .await; let body = { - let mut message = self.new_request_template("PUT", bucket)?; + let mut message = self + .new_request_template("PUT", bucket) + .map_err(S3RequestError::construction_failure)?; message .add_header(&Header::new("Content-Length", buffer.len().to_string())) - .map_err(ConstructionError::CrtError)?; + .map_err(S3RequestError::construction_failure)?; let key = format!("/{key}"); - message.set_request_path(&key).map_err(ConstructionError::CrtError)?; + message + .set_request_path(&key) + .map_err(S3RequestError::construction_failure)?; - let body_input_stream = InputStream::new_from_slice(&self.allocator, &buffer)?; + let body_input_stream = + InputStream::new_from_slice(&self.allocator, &buffer).map_err(S3RequestError::CrtError)?; message.set_body_stream(Some(body_input_stream)); let span = request_span!(self, "put_object"); span.in_scope(|| debug!(?bucket, ?key, ?params, "new request")); - self.make_simple_http_request(message, MetaRequestType::PutObject, span)? + self.make_simple_http_request(message, MetaRequestType::PutObject, span, |result| { + ObjectClientError::ClientError(S3RequestError::ResponseError(result)) + })? }; body.await?; diff --git a/s3-client/tests/head_bucket.rs b/s3-client/tests/head_bucket.rs index ed7e94b94..87e738ab3 100644 --- a/s3-client/tests/head_bucket.rs +++ b/s3-client/tests/head_bucket.rs @@ -3,7 +3,7 @@ pub mod common; use common::*; -use s3_client::{HeadBucketError, S3CrtClient, S3RequestError}; +use s3_client::{HeadBucketError, ObjectClientError, S3CrtClient}; #[tokio::test] async fn test_head_bucket_correct_region() { @@ -22,7 +22,7 @@ async fn test_head_bucket_wrong_region() { let result = client.head_bucket(&bucket).await; match result { - Err(S3RequestError::ServiceError(HeadBucketError::IncorrectRegion(actual_region))) => { + Err(ObjectClientError::ServiceError(HeadBucketError::IncorrectRegion(actual_region))) => { assert_eq!(actual_region, expected_region, "wrong region returned") } _ => panic!("incorrect result {result:?}"), @@ -38,6 +38,6 @@ async fn test_head_bucket_forbidden() { assert!(matches!( result, - Err(S3RequestError::ServiceError(HeadBucketError::PermissionDenied(_))) + Err(ObjectClientError::ServiceError(HeadBucketError::PermissionDenied(_))) )); } diff --git a/s3-file-connector/src/main.rs b/s3-file-connector/src/main.rs index a0f79c98d..06ce5e2e7 100644 --- a/s3-file-connector/src/main.rs +++ b/s3-file-connector/src/main.rs @@ -4,8 +4,7 @@ use anyhow::{anyhow, Context as _}; use aws_crt_s3::common::rust_log_adapter::RustLogAdapter; use clap::{ArgGroup, Parser}; use fuser::{BackgroundSession, MountOption, Session}; -use s3_client::{AddressingStyle, Endpoint, HeadBucketError, S3ClientConfig, S3CrtClient, S3RequestError}; - +use s3_client::{AddressingStyle, Endpoint, HeadBucketError, ObjectClientError, S3ClientConfig, S3CrtClient}; use s3_file_connector::fs::S3FilesystemConfig; use s3_file_connector::fuse::S3FuseFilesystem; use s3_file_connector::metrics::{metrics_tracing_span_layer, MetricsSink}; @@ -250,7 +249,7 @@ fn create_client_for_bucket( match futures::executor::block_on(head_request) { Ok(_) => Ok(client), // Don't try to automatically correct the region if it was manually specified incorrectly - Err(S3RequestError::ServiceError(HeadBucketError::IncorrectRegion(region))) if supposed_region.is_none() => { + Err(ObjectClientError::ServiceError(HeadBucketError::IncorrectRegion(region))) if supposed_region.is_none() => { tracing::warn!("bucket {bucket} is in region {region}, not {region_to_try}. redirecting..."); let endpoint = Endpoint::from_region(®ion, addressing_style)?; let new_client = S3CrtClient::new( diff --git a/s3-file-connector/src/prefetch.rs b/s3-file-connector/src/prefetch.rs index d0d93d3be..e4cc26159 100644 --- a/s3-file-connector/src/prefetch.rs +++ b/s3-file-connector/src/prefetch.rs @@ -21,7 +21,7 @@ use futures::pin_mut; use futures::stream::StreamExt; use futures::task::{Spawn, SpawnExt}; use metrics::counter; -use s3_client::ObjectClient; +use s3_client::{GetObjectError, ObjectClient, ObjectClientError}; use thiserror::Error; use tracing::{debug_span, error, trace, Instrument}; @@ -29,6 +29,8 @@ use crate::prefetch::part::Part; use crate::prefetch::part_queue::PartQueue; use crate::sync::{Arc, RwLock}; +type TaskError = ObjectClientError::ClientError>; + #[derive(Debug, Clone, Copy)] pub struct PrefetcherConfig { /// Size of the first request in a prefetch run @@ -92,9 +94,9 @@ where #[derive(Debug)] pub struct PrefetchGetObject { inner: Arc>, - current_task: Option>, + current_task: Option>>, // Currently we only every spawn at most one future task (see [spawn_next_request]) - future_tasks: Arc>>>, + future_tasks: Arc>>>>, bucket: String, key: String, next_sequential_read_offset: u64, @@ -126,7 +128,7 @@ where /// Read some bytes from the object. Blocks until the desired bytes are available or EOF. This /// function will always return exactly `size` bytes, except at the end of the object where it /// will return however many bytes are left (including possibly 0 bytes). - pub fn read(&mut self, offset: u64, length: usize) -> Result> { + pub fn read(&mut self, offset: u64, length: usize) -> Result>> { trace!( offset, length, @@ -226,7 +228,7 @@ where } /// Spawn the next required request - fn spawn_next_request(&mut self) -> Option> { + fn spawn_next_request(&mut self) -> Option>> { let start = self.next_request_offset; let end = (start + self.next_request_size as u64).min(self.size); @@ -325,8 +327,8 @@ mod tests { use proptest::sample::SizeRange; use proptest::strategy::{Just, Strategy}; use proptest_derive::Arbitrary; - use s3_client::failure_client::countdown_failure_client; - use s3_client::mock_client::{ramp_bytes, GetObjectError, MockClient, MockClientConfig, MockObject}; + use s3_client::failure_client::{countdown_failure_client, GetFailureMap}; + use s3_client::mock_client::{ramp_bytes, MockClient, MockClientConfig, MockClientError, MockObject}; use std::collections::HashMap; #[derive(Debug, Arbitrary)] @@ -411,7 +413,7 @@ mod tests { size: u64, read_size: usize, test_config: TestConfig, - get_failures: HashMap>, + get_failures: GetFailureMap, ) { let config = MockClientConfig { bucket: "test-bucket".to_string(), @@ -460,7 +462,12 @@ mod tests { }; let mut get_failures = HashMap::new(); - get_failures.insert(2, Err(s3_client::mock_client::GetObjectError::InvalidRange(42))); + get_failures.insert( + 2, + Err(ObjectClientError::ClientError(MockClientError( + "invalid range; length=42".into(), + ))), + ); fail_sequential_read_test(1024 * 1024 + 111, 1024 * 1024, config, get_failures); } @@ -475,7 +482,12 @@ mod tests { }; let mut get_failures = HashMap::new(); - get_failures.insert(2, Ok((1, s3_client::mock_client::GetObjectError::InvalidRange(42)))); + get_failures.insert( + 2, + Err(ObjectClientError::ClientError(MockClientError( + "invalid range; length=42".into(), + ))), + ); fail_sequential_read_test(1024 * 1024 + 111, 1024 * 1024, config, get_failures); }