Skip to content

Commit

Permalink
Add additional metrics for remote byte stores (#20138)
Browse files Browse the repository at this point in the history
This adds a bunch of additional metrics to provide more insight into the
byte store behaviour: for each of the "read", "write" and "list missing
digests" (aka "exists") operations, track:

- number of attempts
- number of successes (for "read", this is further split into "cached"
and "uncached", i.e. whether the successful network request found data
or not)
- number of errors

This mimics the metrics for the remote action cache.

The minor shuffling of code in this PR also fixes a bug with the
`RemoteStoreBlobBytesDownloaded` metric: before this PR, any successful
network request would count as bytes downloaded, even if the digest
didn't exist/wasn't downloaded (i.e. it successfully determined that the
digest wasn't cached). This PR restricts the metric to only count digest
bytes when the network request is successful _and_ the digest actually
existed/was downloaded.

I'm hopeful this will at least give more insight into the scale of the
problem in #20133, in addition to being generally useful as "what's
pants up to" reporting.
  • Loading branch information
huonw authored Nov 7, 2023
1 parent 0c433d0 commit eeb4905
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 17 deletions.
40 changes: 27 additions & 13 deletions src/rust/engine/fs/store/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use remote_provider::{
choose_byte_store_provider, ByteStoreProvider, LoadDestination, RemoteStoreOptions,
};
use tokio::fs::File;
use workunit_store::{in_workunit, ObservationMetric};
use workunit_store::{in_workunit, Metric, ObservationMetric};

#[derive(Clone)]
pub struct ByteStore {
Expand Down Expand Up @@ -75,14 +75,20 @@ impl ByteStore {
Level::Trace,
desc = Some(format!("Storing {digest:?}")),
|workunit| async move {
workunit.increment_counter(Metric::RemoteStoreWriteAttempts, 1);
let result = do_store().await;

if result.is_ok() {
workunit.record_observation(
ObservationMetric::RemoteStoreBlobBytesUploaded,
digest.size_bytes as u64,
);
}
let result_metric = match result {
Ok(()) => {
workunit.record_observation(
ObservationMetric::RemoteStoreBlobBytesUploaded,
digest.size_bytes as u64,
);
Metric::RemoteStoreWriteSuccesses
}
Err(_) => Metric::RemoteStoreWriteErrors,
};
workunit.increment_counter(result_metric, 1);

result
}
Expand All @@ -108,17 +114,25 @@ impl ByteStore {
Level::Trace,
desc = Some(workunit_desc),
|workunit| async move {
workunit.increment_counter(Metric::RemoteStoreReadAttempts, 1);
let result = self.provider.load(digest, destination).await;
workunit.record_observation(
ObservationMetric::RemoteStoreReadBlobTimeMicros,
start.elapsed().as_micros() as u64,
);
if result.is_ok() {
workunit.record_observation(
ObservationMetric::RemoteStoreBlobBytesDownloaded,
digest.size_bytes as u64,
);
}
let result_metric = match result {
Ok(true) => {
workunit.record_observation(
ObservationMetric::RemoteStoreBlobBytesDownloaded,
digest.size_bytes as u64,
);
Metric::RemoteStoreReadCached
}
Ok(false) => Metric::RemoteStoreReadUncached,
Err(_) => Metric::RemoteStoreReadErrors,
};
workunit.increment_counter(result_metric, 1);

result
},
)
Expand Down
15 changes: 13 additions & 2 deletions src/rust/engine/remote_provider/remote_provider_opendal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use prost::Message;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use remexec::ActionResult;
use tokio::fs::File;
use workunit_store::ObservationMetric;
use workunit_store::{Metric, ObservationMetric};

use remote_provider_traits::{
ActionCacheProvider, ByteStoreProvider, LoadDestination, RemoteStoreOptions,
Expand Down Expand Up @@ -296,7 +296,18 @@ impl ByteStoreProvider for Provider {
}

let path = self.path(digest.hash);
match self.operator.is_exist(&path).await {

workunit_store::increment_counter_if_in_workunit(Metric::RemoteStoreExistsAttempts, 1);

let result = self.operator.is_exist(&path).await;

let metric = match result {
Ok(_) => Metric::RemoteStoreExistsSuccesses,
Err(_) => Metric::RemoteStoreExistsErrors,
};
workunit_store::increment_counter_if_in_workunit(metric, 1);

match result {
Ok(true) => Ok(None),
Ok(false) => Ok(Some(digest)),
Err(e) => Err(format!("failed to query {}: {}", path, e)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,9 @@ impl ByteStoreProvider for Provider {
};

let client = self.cas_client.as_ref().clone();
let response = retry_call(

workunit_store::increment_counter_if_in_workunit(Metric::RemoteStoreExistsAttempts, 1);
let result = retry_call(
client,
move |mut client, _| {
let request = request.clone();
Expand All @@ -408,7 +410,15 @@ impl ByteStoreProvider for Provider {
status_is_retryable,
)
.await
.map_err(status_to_str)?;
.map_err(status_to_str);

let metric = match result {
Ok(_) => Metric::RemoteStoreExistsSuccesses,
Err(_) => Metric::RemoteStoreExistsErrors,
};
workunit_store::increment_counter_if_in_workunit(metric, 1);

let response = result?;

response
.into_inner()
Expand Down
10 changes: 10 additions & 0 deletions src/rust/engine/remote_provider/remote_provider_traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,31 @@ pub struct RemoteStoreOptions {
#[async_trait]
pub trait ByteStoreProvider: Sync + Send + 'static {
/// Store the bytes readable from `file` into the remote store
///
/// NB. this does not need to update any observations or counters.
async fn store_file(&self, digest: Digest, file: File) -> Result<(), String>;

/// Store the bytes in `bytes` into the remote store, as an optimisation of `store_file` when the
/// bytes are already in memory
///
/// NB. this does not need to update any observations or counters.
async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String>;

/// Load the data stored (if any) in the remote store for `digest` into `destination`. Returns
/// true when found, false when not.
///
/// NB. this should update the
/// workunit_store::ObservationMetric::RemoteStoreTimeToFirstByteMicros observation.
async fn load(
&self,
digest: Digest,
destination: &mut dyn LoadDestination,
) -> Result<bool, String>;

/// Return any digests from `digests` that are not (currently) available in the remote store.
///
/// NB. this should update the workunit_store::Metric::RemoteStoreExists... counters, based on
/// the requests it runs.
async fn list_missing_digests(
&self,
digests: &mut (dyn Iterator<Item = Digest> + Send),
Expand Down
12 changes: 12 additions & 0 deletions src/rust/engine/workunit_store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ pub enum Metric {
RemoteExecutionRPCWaitExecution,
RemoteExecutionSuccess,
RemoteExecutionTimeouts,
RemoteStoreReadAttempts,
RemoteStoreReadCached,
RemoteStoreReadUncached,
RemoteStoreReadErrors,
RemoteStoreWriteAttempts,
RemoteStoreWriteSuccesses,
RemoteStoreWriteErrors,
/// Number of network requests to check existance of digests, not the total number of digests
/// that were checked (if bulk query)
RemoteStoreExistsAttempts,
RemoteStoreExistsSuccesses,
RemoteStoreExistsErrors,
RemoteStoreMissingDigest,
RemoteStoreRequestTimeouts,
/// Number of times that we backtracked due to missing digests.
Expand Down

0 comments on commit eeb4905

Please sign in to comment.