Skip to content

Commit

Permalink
Revert "Allow for higher s3 concurrency (#4292)" (#4356)
Browse files Browse the repository at this point in the history
This reverts commit 024109f for it
failing to be speed up anything, but run into more errors.

See: #3698.
  • Loading branch information
koivunej authored May 26, 2023
1 parent 339a3e3 commit be177f8
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 42 deletions.
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions libs/remote_storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ utils.workspace = true
pin-project-lite.workspace = true
workspace_hack.workspace = true

leaky-bucket = "1.0"

[dev-dependencies]
tempfile.workspace = true
test-context.workspace = true
2 changes: 0 additions & 2 deletions libs/remote_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
/// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html
/// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
/// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/
///
/// IAM ratelimit should never be observed with caching credentials provider.
pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
/// No limits on the client side, which currenltly means 1000 for AWS S3.
/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax
Expand Down
85 changes: 59 additions & 26 deletions libs/remote_storage/src/s3_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use aws_sdk_s3::{
};
use aws_smithy_http::body::SdkBody;
use hyper::Body;
use tokio::io;
use tokio::{
io::{self, AsyncRead},
sync::Semaphore,
};
use tokio_util::io::ReaderStream;
use tracing::debug;

Expand Down Expand Up @@ -102,8 +105,9 @@ pub struct S3Bucket {
prefix_in_bucket: Option<String>,
max_keys_per_list_response: Option<i32>,
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
// Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
// The helps to ensure we don't exceed the thresholds.
concurrency_limiter: Arc<leaky_bucket::RateLimiter>,
concurrency_limiter: Arc<Semaphore>,
}

#[derive(Default)]
Expand Down Expand Up @@ -154,24 +158,12 @@ impl S3Bucket {
}
prefix
});

let rps = aws_config.concurrency_limit.get();
let concurrency_limiter = leaky_bucket::RateLimiter::builder()
.max(rps)
.initial(0)
// refill it by rps every second. this means the (rps+1)th request will have to wait for
// 1 second from earliest.
.refill(rps)
.interval(std::time::Duration::from_secs(1))
.fair(true)
.build();

Ok(Self {
client,
bucket_name: aws_config.bucket_name.clone(),
max_keys_per_list_response: aws_config.max_keys_per_list_response,
prefix_in_bucket,
concurrency_limiter: Arc::new(concurrency_limiter),
concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())),
})
}

Expand Down Expand Up @@ -203,10 +195,13 @@ impl S3Bucket {
}

async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
// while the download could take a long time with `leaky_bucket` we have nothing to release
// once the download is done. this is because with "requests per second" rate limiting on
// s3, there should be no meaning for the long requests.
self.concurrency_limiter.clone().acquire_owned(1).await;
let permit = self
.concurrency_limiter
.clone()
.acquire_owned()
.await
.context("Concurrency limiter semaphore got closed during S3 download")
.map_err(DownloadError::Other)?;

metrics::inc_get_object();

Expand All @@ -224,9 +219,10 @@ impl S3Bucket {
let metadata = object_output.metadata().cloned().map(StorageMetadata);
Ok(Download {
metadata,
download_stream: Box::pin(io::BufReader::new(
download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new(
permit,
object_output.body.into_async_read(),
)),
))),
})
}
Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
Expand All @@ -242,6 +238,32 @@ impl S3Bucket {
}
}

pin_project_lite::pin_project! {
/// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
struct RatelimitedAsyncRead<S> {
permit: tokio::sync::OwnedSemaphorePermit,
#[pin]
inner: S,
}
}

impl<S: AsyncRead> RatelimitedAsyncRead<S> {
fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
RatelimitedAsyncRead { permit, inner }
}
}

impl<S: AsyncRead> AsyncRead for RatelimitedAsyncRead<S> {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let this = self.project();
this.inner.poll_read(cx, buf)
}
}

#[async_trait::async_trait]
impl RemoteStorage for S3Bucket {
/// See the doc for `RemoteStorage::list_prefixes`
Expand All @@ -267,7 +289,12 @@ impl RemoteStorage for S3Bucket {

let mut continuation_token = None;
loop {
self.concurrency_limiter.acquire_one().await;
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 list")
.map_err(DownloadError::Other)?;

metrics::inc_list_objects();

Expand Down Expand Up @@ -312,9 +339,11 @@ impl RemoteStorage for S3Bucket {
to: &RemotePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()> {
// similarly to downloads, the permit does not have live through the upload, but instead we
// are rate limiting requests per second.
self.concurrency_limiter.acquire_one().await;
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 upload")?;

metrics::inc_put_object();

Expand Down Expand Up @@ -369,7 +398,11 @@ impl RemoteStorage for S3Bucket {
}

async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
self.concurrency_limiter.acquire_one().await;
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 delete")?;

metrics::inc_delete_object();

Expand Down

1 comment on commit be177f8

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1071 tests run: 1024 passed, 0 failed, 47 skipped (full report)


Flaky tests (1)

Postgres 15

The comment gets automatically updated with the latest test results
be177f8 at 2023-05-26T16:39:54.604Z :recycle:

Please sign in to comment.