Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Allow for higher s3 concurrency (#4292)" #4356

Merged
merged 1 commit into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions libs/remote_storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ utils.workspace = true
pin-project-lite.workspace = true
workspace_hack.workspace = true

leaky-bucket = "1.0"

[dev-dependencies]
tempfile.workspace = true
test-context.workspace = true
2 changes: 0 additions & 2 deletions libs/remote_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
/// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html
/// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
/// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/
///
/// IAM ratelimit should never be observed with caching credentials provider.
pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
/// No limits on the client side, which currenltly means 1000 for AWS S3.
/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax
Expand Down
85 changes: 59 additions & 26 deletions libs/remote_storage/src/s3_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use aws_sdk_s3::{
};
use aws_smithy_http::body::SdkBody;
use hyper::Body;
use tokio::io;
use tokio::{
io::{self, AsyncRead},
sync::Semaphore,
};
use tokio_util::io::ReaderStream;
use tracing::debug;

Expand Down Expand Up @@ -102,8 +105,9 @@ pub struct S3Bucket {
prefix_in_bucket: Option<String>,
max_keys_per_list_response: Option<i32>,
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
// Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
// The helps to ensure we don't exceed the thresholds.
concurrency_limiter: Arc<leaky_bucket::RateLimiter>,
concurrency_limiter: Arc<Semaphore>,
}

#[derive(Default)]
Expand Down Expand Up @@ -154,24 +158,12 @@ impl S3Bucket {
}
prefix
});

let rps = aws_config.concurrency_limit.get();
let concurrency_limiter = leaky_bucket::RateLimiter::builder()
.max(rps)
.initial(0)
// refill it by rps every second. this means the (rps+1)th request will have to wait for
// 1 second from earliest.
.refill(rps)
.interval(std::time::Duration::from_secs(1))
.fair(true)
.build();

Ok(Self {
client,
bucket_name: aws_config.bucket_name.clone(),
max_keys_per_list_response: aws_config.max_keys_per_list_response,
prefix_in_bucket,
concurrency_limiter: Arc::new(concurrency_limiter),
concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())),
})
}

Expand Down Expand Up @@ -203,10 +195,13 @@ impl S3Bucket {
}

async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
// while the download could take a long time with `leaky_bucket` we have nothing to release
// once the download is done. this is because with "requests per second" rate limiting on
// s3, there should be no meaning for the long requests.
self.concurrency_limiter.clone().acquire_owned(1).await;
let permit = self
.concurrency_limiter
.clone()
.acquire_owned()
.await
.context("Concurrency limiter semaphore got closed during S3 download")
.map_err(DownloadError::Other)?;

metrics::inc_get_object();

Expand All @@ -224,9 +219,10 @@ impl S3Bucket {
let metadata = object_output.metadata().cloned().map(StorageMetadata);
Ok(Download {
metadata,
download_stream: Box::pin(io::BufReader::new(
download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new(
permit,
object_output.body.into_async_read(),
)),
))),
})
}
Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
Expand All @@ -242,6 +238,32 @@ impl S3Bucket {
}
}

pin_project_lite::pin_project! {
/// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
struct RatelimitedAsyncRead<S> {
permit: tokio::sync::OwnedSemaphorePermit,
#[pin]
inner: S,
}
}

impl<S: AsyncRead> RatelimitedAsyncRead<S> {
fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
RatelimitedAsyncRead { permit, inner }
}
}

impl<S: AsyncRead> AsyncRead for RatelimitedAsyncRead<S> {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let this = self.project();
this.inner.poll_read(cx, buf)
}
}

#[async_trait::async_trait]
impl RemoteStorage for S3Bucket {
/// See the doc for `RemoteStorage::list_prefixes`
Expand All @@ -267,7 +289,12 @@ impl RemoteStorage for S3Bucket {

let mut continuation_token = None;
loop {
self.concurrency_limiter.acquire_one().await;
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 list")
.map_err(DownloadError::Other)?;

metrics::inc_list_objects();

Expand Down Expand Up @@ -312,9 +339,11 @@ impl RemoteStorage for S3Bucket {
to: &RemotePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()> {
// similarly to downloads, the permit does not have live through the upload, but instead we
// are rate limiting requests per second.
self.concurrency_limiter.acquire_one().await;
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 upload")?;

metrics::inc_put_object();

Expand Down Expand Up @@ -369,7 +398,11 @@ impl RemoteStorage for S3Bucket {
}

async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
self.concurrency_limiter.acquire_one().await;
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 delete")?;

metrics::inc_delete_object();

Expand Down