diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index ced2ec86e34..c70aab97cae 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -13,7 +13,7 @@ use remote_provider::{ choose_byte_store_provider, ByteStoreProvider, LoadDestination, RemoteStoreOptions, }; use tokio::fs::File; -use workunit_store::{in_workunit, ObservationMetric}; +use workunit_store::{in_workunit, Metric, ObservationMetric}; #[derive(Clone)] pub struct ByteStore { @@ -75,14 +75,20 @@ impl ByteStore { Level::Trace, desc = Some(format!("Storing {digest:?}")), |workunit| async move { + workunit.increment_counter(Metric::RemoteStoreWriteAttempts, 1); let result = do_store().await; - if result.is_ok() { - workunit.record_observation( - ObservationMetric::RemoteStoreBlobBytesUploaded, - digest.size_bytes as u64, - ); - } + let result_metric = match result { + Ok(()) => { + workunit.record_observation( + ObservationMetric::RemoteStoreBlobBytesUploaded, + digest.size_bytes as u64, + ); + Metric::RemoteStoreWriteSuccesses + } + Err(_) => Metric::RemoteStoreWriteErrors, + }; + workunit.increment_counter(result_metric, 1); result } @@ -108,17 +114,25 @@ impl ByteStore { Level::Trace, desc = Some(workunit_desc), |workunit| async move { + workunit.increment_counter(Metric::RemoteStoreReadAttempts, 1); let result = self.provider.load(digest, destination).await; workunit.record_observation( ObservationMetric::RemoteStoreReadBlobTimeMicros, start.elapsed().as_micros() as u64, ); - if result.is_ok() { - workunit.record_observation( - ObservationMetric::RemoteStoreBlobBytesDownloaded, - digest.size_bytes as u64, - ); - } + let result_metric = match result { + Ok(true) => { + workunit.record_observation( + ObservationMetric::RemoteStoreBlobBytesDownloaded, + digest.size_bytes as u64, + ); + Metric::RemoteStoreReadCached + } + Ok(false) => Metric::RemoteStoreReadUncached, + Err(_) => Metric::RemoteStoreReadErrors, + }; + workunit.increment_counter(result_metric, 1); + result }, ) diff --git a/src/rust/engine/remote_provider/remote_provider_opendal/src/lib.rs b/src/rust/engine/remote_provider/remote_provider_opendal/src/lib.rs index 67d5bb3ba85..9398e6557db 100644 --- a/src/rust/engine/remote_provider/remote_provider_opendal/src/lib.rs +++ b/src/rust/engine/remote_provider/remote_provider_opendal/src/lib.rs @@ -40,7 +40,7 @@ use prost::Message; use protos::gen::build::bazel::remote::execution::v2 as remexec; use remexec::ActionResult; use tokio::fs::File; -use workunit_store::ObservationMetric; +use workunit_store::{Metric, ObservationMetric}; use remote_provider_traits::{ ActionCacheProvider, ByteStoreProvider, LoadDestination, RemoteStoreOptions, @@ -296,7 +296,18 @@ impl ByteStoreProvider for Provider { } let path = self.path(digest.hash); - match self.operator.is_exist(&path).await { + + workunit_store::increment_counter_if_in_workunit(Metric::RemoteStoreExistsAttempts, 1); + + let result = self.operator.is_exist(&path).await; + + let metric = match result { + Ok(_) => Metric::RemoteStoreExistsSuccesses, + Err(_) => Metric::RemoteStoreExistsErrors, + }; + workunit_store::increment_counter_if_in_workunit(metric, 1); + + match result { Ok(true) => Ok(None), Ok(false) => Ok(Some(digest)), Err(e) => Err(format!("failed to query {}: {}", path, e)), diff --git a/src/rust/engine/remote_provider/remote_provider_reapi/src/byte_store.rs b/src/rust/engine/remote_provider/remote_provider_reapi/src/byte_store.rs index e3017f35c8f..839ee07d266 100644 --- a/src/rust/engine/remote_provider/remote_provider_reapi/src/byte_store.rs +++ b/src/rust/engine/remote_provider/remote_provider_reapi/src/byte_store.rs @@ -399,7 +399,9 @@ impl ByteStoreProvider for Provider { }; let client = self.cas_client.as_ref().clone(); - let response = retry_call( + + workunit_store::increment_counter_if_in_workunit(Metric::RemoteStoreExistsAttempts, 1); + let result = retry_call( client, move |mut client, _| { let request = request.clone(); @@ -408,7 +410,15 @@ impl ByteStoreProvider for Provider { status_is_retryable, ) .await - .map_err(status_to_str)?; + .map_err(status_to_str); + + let metric = match result { + Ok(_) => Metric::RemoteStoreExistsSuccesses, + Err(_) => Metric::RemoteStoreExistsErrors, + }; + workunit_store::increment_counter_if_in_workunit(metric, 1); + + let response = result?; response .into_inner() diff --git a/src/rust/engine/remote_provider/remote_provider_traits/src/lib.rs b/src/rust/engine/remote_provider/remote_provider_traits/src/lib.rs index 7ffa86fdb95..837f4b34d23 100644 --- a/src/rust/engine/remote_provider/remote_provider_traits/src/lib.rs +++ b/src/rust/engine/remote_provider/remote_provider_traits/src/lib.rs @@ -55,14 +55,21 @@ pub struct RemoteStoreOptions { #[async_trait] pub trait ByteStoreProvider: Sync + Send + 'static { /// Store the bytes readable from `file` into the remote store + /// + /// NB. this does not need to update any observations or counters. async fn store_file(&self, digest: Digest, file: File) -> Result<(), String>; /// Store the bytes in `bytes` into the remote store, as an optimisation of `store_file` when the /// bytes are already in memory + /// + /// NB. this does not need to update any observations or counters. async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String>; /// Load the data stored (if any) in the remote store for `digest` into `destination`. Returns /// true when found, false when not. + /// + /// NB. this should update the + /// workunit_store::ObservationMetric::RemoteStoreTimeToFirstByteMicros observation. async fn load( &self, digest: Digest, @@ -70,6 +77,9 @@ pub trait ByteStoreProvider: Sync + Send + 'static { ) -> Result; /// Return any digests from `digests` that are not (currently) available in the remote store. + /// + /// NB. this should update the workunit_store::Metric::RemoteStoreExists... counters, based on + /// the requests it runs. async fn list_missing_digests( &self, digests: &mut (dyn Iterator + Send), diff --git a/src/rust/engine/workunit_store/src/metrics.rs b/src/rust/engine/workunit_store/src/metrics.rs index 4c8ce2184ca..748618c8713 100644 --- a/src/rust/engine/workunit_store/src/metrics.rs +++ b/src/rust/engine/workunit_store/src/metrics.rs @@ -49,6 +49,18 @@ pub enum Metric { RemoteExecutionRPCWaitExecution, RemoteExecutionSuccess, RemoteExecutionTimeouts, + RemoteStoreReadAttempts, + RemoteStoreReadCached, + RemoteStoreReadUncached, + RemoteStoreReadErrors, + RemoteStoreWriteAttempts, + RemoteStoreWriteSuccesses, + RemoteStoreWriteErrors, + /// Number of network requests to check existance of digests, not the total number of digests + /// that were checked (if bulk query) + RemoteStoreExistsAttempts, + RemoteStoreExistsSuccesses, + RemoteStoreExistsErrors, RemoteStoreMissingDigest, RemoteStoreRequestTimeouts, /// Number of times that we backtracked due to missing digests.