Skip to content

Commit

Permalink
Remote storage metrics follow-up (#4957)
Browse files Browse the repository at this point in the history
#4942 left old metrics in place for migration purposes. It was noticed
that from new metrics the total number of deleted objects was forgotten,
add it.

While reviewing, it was noticed that the delete_object could just be
delete_objects of one.

---------

Co-authored-by: Arpad Müller <[email protected]>
  • Loading branch information
koivunej and arpad-m authored Aug 15, 2023
1 parent baf3959 commit 8198b86
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 115 deletions.
54 changes: 6 additions & 48 deletions libs/remote_storage/src/s3_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ impl S3Bucket {
let kind = RequestKind::Get;
let permit = self.owned_permit(kind).await;

metrics::inc_get_object();

let started_at = start_measuring_requests(kind);

let get_object = self
Expand All @@ -205,7 +203,6 @@ impl S3Bucket {
let started_at = ScopeGuard::into_inner(started_at);

if get_object.is_err() {
metrics::inc_get_object_fail();
metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
kind,
AttemptOutcome::Err,
Expand Down Expand Up @@ -337,7 +334,6 @@ impl RemoteStorage for S3Bucket {

loop {
let _guard = self.permit(kind).await;
metrics::inc_list_objects();
let started_at = start_measuring_requests(kind);

let fetch_response = self
Expand All @@ -350,10 +346,6 @@ impl RemoteStorage for S3Bucket {
.set_max_keys(self.max_keys_per_list_response)
.send()
.await
.map_err(|e| {
metrics::inc_list_objects_fail();
e
})
.context("Failed to list S3 prefixes")
.map_err(DownloadError::Other);

Expand Down Expand Up @@ -395,7 +387,6 @@ impl RemoteStorage for S3Bucket {
let mut all_files = vec![];
loop {
let _guard = self.permit(kind).await;
metrics::inc_list_objects();
let started_at = start_measuring_requests(kind);

let response = self
Expand All @@ -407,10 +398,6 @@ impl RemoteStorage for S3Bucket {
.set_max_keys(self.max_keys_per_list_response)
.send()
.await
.map_err(|e| {
metrics::inc_list_objects_fail();
e
})
.context("Failed to list files in S3 bucket");

let started_at = ScopeGuard::into_inner(started_at);
Expand Down Expand Up @@ -443,7 +430,6 @@ impl RemoteStorage for S3Bucket {
let kind = RequestKind::Put;
let _guard = self.permit(kind).await;

metrics::inc_put_object();
let started_at = start_measuring_requests(kind);

let body = Body::wrap_stream(ReaderStream::new(from));
Expand All @@ -458,11 +444,7 @@ impl RemoteStorage for S3Bucket {
.content_length(from_size_bytes.try_into()?)
.body(bytes_stream)
.send()
.await
.map_err(|e| {
metrics::inc_put_object_fail();
e
});
.await;

let started_at = ScopeGuard::into_inner(started_at);
metrics::BUCKET_METRICS
Expand Down Expand Up @@ -519,7 +501,6 @@ impl RemoteStorage for S3Bucket {
}

for chunk in delete_objects.chunks(MAX_DELETE_OBJECTS_REQUEST_SIZE) {
metrics::inc_delete_objects(chunk.len() as u64);
let started_at = start_measuring_requests(kind);

let resp = self
Expand All @@ -537,16 +518,17 @@ impl RemoteStorage for S3Bucket {

match resp {
Ok(resp) => {
metrics::BUCKET_METRICS
.deleted_objects_total
.inc_by(chunk.len() as u64);
if let Some(errors) = resp.errors {
metrics::inc_delete_objects_fail(errors.len() as u64);
return Err(anyhow::format_err!(
"Failed to delete {} objects",
errors.len()
));
}
}
Err(e) => {
metrics::inc_delete_objects_fail(chunk.len() as u64);
return Err(e.into());
}
}
Expand All @@ -555,32 +537,8 @@ impl RemoteStorage for S3Bucket {
}

async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
let kind = RequestKind::Delete;
let _guard = self.permit(kind).await;

metrics::inc_delete_object();
let started_at = start_measuring_requests(kind);

let res = self
.client
.delete_object()
.bucket(self.bucket_name.clone())
.key(self.relative_path_to_s3_object(path))
.send()
.await
.map_err(|e| {
metrics::inc_delete_object_fail();
e
});

let started_at = ScopeGuard::into_inner(started_at);
metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &res, started_at);

res?;

Ok(())
let paths = std::array::from_ref(path);
self.delete_objects(paths).await
}
}

Expand Down
82 changes: 15 additions & 67 deletions libs/remote_storage/src/s3_bucket/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use metrics::{register_histogram_vec, register_int_counter_vec, Histogram, IntCounter};
use metrics::{
register_histogram_vec, register_int_counter, register_int_counter_vec, Histogram, IntCounter,
};
use once_cell::sync::Lazy;

pub(super) static BUCKET_METRICS: Lazy<BucketMetrics> = Lazy::new(Default::default);
Expand Down Expand Up @@ -125,41 +127,22 @@ impl PassFailCancelledRequestTyped<Histogram> {
}

pub(super) struct BucketMetrics {
/// Total requests attempted
// TODO: remove after next release and migrate dashboards to `sum by (result) (remote_storage_s3_requests_count)`
requests: RequestTyped<IntCounter>,
/// Subset of attempted requests failed
// TODO: remove after next release and migrate dashboards to `remote_storage_s3_requests_count{result="err"}`
failed: RequestTyped<IntCounter>,

/// Full request duration until successful completion, error or cancellation.
pub(super) req_seconds: PassFailCancelledRequestTyped<Histogram>,
/// Total amount of seconds waited on queue.
pub(super) wait_seconds: RequestTyped<Histogram>,

/// Track how many semaphore awaits were cancelled per request type.
///
/// This is in case cancellations are happening more than expected.
pub(super) cancelled_waits: RequestTyped<IntCounter>,

/// Total amount of deleted objects in batches or single requests.
pub(super) deleted_objects_total: IntCounter,
}

impl Default for BucketMetrics {
fn default() -> Self {
let requests = register_int_counter_vec!(
"remote_storage_s3_requests_count",
"Number of s3 requests of particular type",
&["request_type"],
)
.expect("failed to define a metric");
let requests =
RequestTyped::build_with(|kind| requests.with_label_values(&[kind.as_str()]));

let failed = register_int_counter_vec!(
"remote_storage_s3_failures_count",
"Number of failed s3 requests of particular type",
&["request_type"],
)
.expect("failed to define a metric");
let failed = RequestTyped::build_with(|kind| failed.with_label_values(&[kind.as_str()]));

let buckets = [0.01, 0.10, 0.5, 1.0, 5.0, 10.0, 50.0, 100.0];

let req_seconds = register_histogram_vec!(
Expand Down Expand Up @@ -192,52 +175,17 @@ impl Default for BucketMetrics {
let cancelled_waits =
RequestTyped::build_with(|kind| cancelled_waits.with_label_values(&[kind.as_str()]));

let deleted_objects_total = register_int_counter!(
"remote_storage_s3_deleted_objects_total",
"Amount of deleted objects in total",
)
.unwrap();

Self {
requests,
failed,
req_seconds,
wait_seconds,
cancelled_waits,
deleted_objects_total,
}
}
}

pub fn inc_get_object() {
BUCKET_METRICS.requests.get(Get).inc()
}

pub fn inc_get_object_fail() {
BUCKET_METRICS.failed.get(Get).inc()
}

pub fn inc_put_object() {
BUCKET_METRICS.requests.get(Put).inc()
}

pub fn inc_put_object_fail() {
BUCKET_METRICS.failed.get(Put).inc()
}

pub fn inc_delete_object() {
BUCKET_METRICS.requests.get(Delete).inc()
}

pub fn inc_delete_objects(count: u64) {
BUCKET_METRICS.requests.get(Delete).inc_by(count)
}

pub fn inc_delete_object_fail() {
BUCKET_METRICS.failed.get(Delete).inc()
}

pub fn inc_delete_objects_fail(count: u64) {
BUCKET_METRICS.failed.get(Delete).inc_by(count)
}

pub fn inc_list_objects() {
BUCKET_METRICS.requests.get(List).inc()
}

pub fn inc_list_objects_fail() {
BUCKET_METRICS.failed.get(List).inc()
}

1 comment on commit 8198b86

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1656 tests run: 1575 passed, 0 failed, 81 skipped (full report)


The comment gets automatically updated with the latest test results
8198b86 at 2023-08-15T10:07:22.383Z :recycle:

Please sign in to comment.