From 2fbed040665f0de0457f7fddedc96f7bc7936c0d Mon Sep 17 00:00:00 2001 From: aterentic-ethernal <97872690+aterentic-ethernal@users.noreply.github.com> Date: Wed, 31 Jul 2024 16:10:05 +0200 Subject: [PATCH] Make metrics value composable (#648) * Make metrics value composable * Allow any metric value that can be casted to record * Move crawler metrics to crawler client --- core/src/crawl_client.rs | 50 ++++++++++++++++++++++++++++++++---- core/src/fat_client.rs | 8 ++---- core/src/light_client.rs | 7 ++--- core/src/telemetry/metric.rs | 30 ++++++++++++++++++++++ core/src/telemetry/mod.rs | 24 +++++------------ core/src/telemetry/otlp.rs | 47 ++++++++++++++++----------------- 6 files changed, 109 insertions(+), 57 deletions(-) create mode 100644 core/src/telemetry/metric.rs diff --git a/core/src/crawl_client.rs b/core/src/crawl_client.rs index f6f177010..193ef71f6 100644 --- a/core/src/crawl_client.rs +++ b/core/src/crawl_client.rs @@ -3,8 +3,8 @@ use crate::{ p2p::Client, rpc::{self, Event}, }, - telemetry::{MetricValue, Metrics}, - types::{self, block_matrix_partition_format, BlockVerified, Delay}, + telemetry::{metric, otlp::Record, MetricName, Metrics}, + types::{self, block_matrix_partition_format, BlockVerified, Delay, Origin}, }; use kate_recovery::matrix::Partition; use serde::{Deserialize, Serialize}; @@ -53,6 +53,46 @@ impl Default for CrawlConfig { } } +#[derive(Clone)] +enum CrawlMetricValue { + CellsSuccessRate(f64), + RowsSuccessRate(f64), + BlockDelay(f64), +} + +impl MetricName for CrawlMetricValue { + fn name(&self) -> &'static str { + use CrawlMetricValue::*; + match self { + CellsSuccessRate(_) => "avail.light.crawl.cells_success_rate", + RowsSuccessRate(_) => "avail.light.crawl.rows_success_rate", + BlockDelay(_) => "avail.light.crawl.block_delay", + } + } +} +impl From for Record { + fn from(value: CrawlMetricValue) -> Self { + use CrawlMetricValue::*; + use Record::*; + + let name = value.name(); + + match value { + CellsSuccessRate(number) => AvgF64(name, number), + RowsSuccessRate(number) => AvgF64(name, number), + BlockDelay(number) => AvgF64(name, number), + } + } +} + +impl metric::Value for CrawlMetricValue { + // Metric filter for external peers + // Only the metrics we wish to send to OTel should be in this list + fn is_allowed(&self, origin: &Origin) -> bool { + matches!(origin, Origin::Internal) + } +} + pub async fn run( mut message_rx: broadcast::Receiver, network_client: Client, @@ -88,7 +128,7 @@ pub async fn run( info!("Sleeping for {seconds:?} seconds"); tokio::time::sleep(seconds).await; let _ = metrics - .record(MetricValue::CrawlBlockDelay(seconds.as_secs() as f64)) + .record(CrawlMetricValue::BlockDelay(seconds.as_secs() as f64)) .await; } let block_number = block.block_num; @@ -116,7 +156,7 @@ pub async fn run( partition, success_rate, total, fetched, "Fetched block cells", ); let _ = metrics - .record(MetricValue::CrawlCellsSuccessRate(success_rate)) + .record(CrawlMetricValue::CellsSuccessRate(success_rate)) .await; } @@ -138,7 +178,7 @@ pub async fn run( success_rate, total, fetched, "Fetched block rows" ); let _ = metrics - .record(MetricValue::CrawlRowsSuccessRate(success_rate)) + .record(CrawlMetricValue::RowsSuccessRate(success_rate)) .await; } diff --git a/core/src/fat_client.rs b/core/src/fat_client.rs index ed460ed46..32d9852e1 100644 --- a/core/src/fat_client.rs +++ b/core/src/fat_client.rs @@ -266,7 +266,7 @@ pub async fn run( #[cfg(test)] mod tests { use super::*; - use crate::{data, telemetry, types::RuntimeConfig}; + use crate::{data, telemetry::metric::tests, types::RuntimeConfig}; use avail_subxt::{ api::runtime_types::avail_core::{ data_lookup::compact::CompactDataLookup, @@ -379,14 +379,10 @@ mod tests { .expect_insert_cells_into_dht() .returning(|_, _| Box::pin(async move { Ok(()) })); - let mut mock_metrics = telemetry::MockMetrics::new(); - mock_metrics.expect_count().returning(|_| ()); - mock_metrics.expect_record().returning(|_| ()); - process_block( &mock_client, db, - &Arc::new(mock_metrics), + &Arc::new(tests::MockMetrics {}), &FatClientConfig::from(&RuntimeConfig::default()), &default_header(), Instant::now(), diff --git a/core/src/light_client.rs b/core/src/light_client.rs index 6b0150c3b..1beabb832 100644 --- a/core/src/light_client.rs +++ b/core/src/light_client.rs @@ -258,7 +258,7 @@ mod tests { use crate::{ data, network::rpc::{cell_count_for_confidence, CELL_COUNT_99_99}, - telemetry, + telemetry::metric::tests, types::RuntimeConfig, }; use avail_subxt::{ @@ -348,13 +348,10 @@ mod tests { Box::pin(async move { Ok((fetched, unfetched, stats)) }) }); - let mut mock_metrics = telemetry::MockMetrics::new(); - mock_metrics.expect_count().returning(|_| ()); - mock_metrics.expect_record().returning(|_| ()); process_block( db, &mock_network_client, - &Arc::new(mock_metrics), + &Arc::new(tests::MockMetrics {}), &cfg, header, recv, diff --git a/core/src/telemetry/metric.rs b/core/src/telemetry/metric.rs new file mode 100644 index 000000000..ea1709e6c --- /dev/null +++ b/core/src/telemetry/metric.rs @@ -0,0 +1,30 @@ +use crate::types::Origin; + +pub trait Value: Send + Clone { + fn is_allowed(&self, origin: &Origin) -> bool; +} + +#[cfg(test)] +pub mod tests { + use crate::telemetry::{metric, MetricCounter, Metrics, Record}; + use async_trait::async_trait; + use color_eyre::eyre; + use libp2p::{kad::Mode, Multiaddr}; + + pub struct MockMetrics {} + + #[async_trait] + impl Metrics for MockMetrics { + async fn count(&self, _: MetricCounter) {} + async fn record(&self, _: T) + where + T: metric::Value + Into + Send, + { + } + async fn flush(&self) -> eyre::Result<()> { + Ok(()) + } + async fn update_operating_mode(&self, _: Mode) {} + async fn update_multiaddress(&self, _: Multiaddr) {} + } +} diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index 8c31f23af..fa2ab7645 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -2,8 +2,9 @@ use crate::types::Origin; use async_trait::async_trait; use color_eyre::Result; use libp2p::{kad::Mode, Multiaddr}; -use mockall::automock; +use otlp::Record; +pub mod metric; pub mod otlp; #[derive(Debug, PartialEq)] @@ -81,13 +82,6 @@ pub enum MetricValue { RPCFetched(f64), RPCFetchDuration(f64), RPCCallDuration(f64), - - #[cfg(feature = "crawl")] - CrawlCellsSuccessRate(f64), - #[cfg(feature = "crawl")] - CrawlRowsSuccessRate(f64), - #[cfg(feature = "crawl")] - CrawlBlockDelay(f64), } impl MetricName for MetricValue { @@ -114,18 +108,11 @@ impl MetricName for MetricValue { RPCFetched(_) => "avail.light.rpc.fetched", RPCFetchDuration(_) => "avail.light.rpc.fetch_duration", RPCCallDuration(_) => "avail.light.rpc.call_duration", - - #[cfg(feature = "crawl")] - CrawlCellsSuccessRate(_) => "avail.light.crawl.cells_success_rate", - #[cfg(feature = "crawl")] - CrawlRowsSuccessRate(_) => "avail.light.crawl.rows_success_rate", - #[cfg(feature = "crawl")] - CrawlBlockDelay(_) => "avail.light.crawl.block_delay", } } } -impl MetricValue { +impl metric::Value for MetricValue { // Metric filter for external peers // Only the metrics we wish to send to OTel should be in this list fn is_allowed(&self, origin: &Origin) -> bool { @@ -139,11 +126,12 @@ impl MetricValue { } } -#[automock] #[async_trait] pub trait Metrics { async fn count(&self, counter: MetricCounter); - async fn record(&self, value: MetricValue); + async fn record(&self, value: T) + where + T: metric::Value + Into + Send; async fn flush(&self) -> Result<()>; async fn update_operating_mode(&self, mode: Mode); async fn update_multiaddress(&self, mode: Multiaddr); diff --git a/core/src/telemetry/otlp.rs b/core/src/telemetry/otlp.rs index 652c2cbd5..a9db7209e 100644 --- a/core/src/telemetry/otlp.rs +++ b/core/src/telemetry/otlp.rs @@ -1,4 +1,4 @@ -use super::{MetricCounter, MetricValue}; +use super::{metric, MetricCounter, MetricValue}; use crate::{ telemetry::MetricName, types::{Origin, OtelConfig}, @@ -24,7 +24,7 @@ pub struct Metrics { meter: Meter, counters: HashMap<&'static str, Counter>, attributes: RwLock, - metric_buffer: Arc>>, + metric_buffer: Arc>>, counter_buffer: Arc>>, } @@ -84,7 +84,8 @@ impl Metrics { } } -enum Record { +#[derive(Debug)] +pub enum Record { MaxU64(&'static str, u64), AvgF64(&'static str, f64), } @@ -117,13 +118,6 @@ impl From for Record { RPCFetched(number) => AvgF64(name, number), RPCFetchDuration(number) => AvgF64(name, number), RPCCallDuration(number) => AvgF64(name, number), - - #[cfg(feature = "crawl")] - CrawlCellsSuccessRate(number) => AvgF64(name, number), - #[cfg(feature = "crawl")] - CrawlRowsSuccessRate(number) => AvgF64(name, number), - #[cfg(feature = "crawl")] - CrawlBlockDelay(number) => AvgF64(name, number), } } } @@ -149,16 +143,14 @@ fn flatten_counters(buffer: &[MetricCounter]) -> HashMap<&'static str, u64> { /// Aggregates buffered metrics into `u64` or `f64` values, depending on the metric. /// Returned values are a `HashMap`s where the keys are the metric name, /// and values are the aggregations (avg, max, etc.) of those metrics. -fn flatten_metrics( - buffer: &[impl Into + Clone], -) -> (HashMap<&'static str, u64>, HashMap<&'static str, f64>) { +fn flatten_metrics(buffer: &[Record]) -> (HashMap<&'static str, u64>, HashMap<&'static str, f64>) { let mut u64_maximums: HashMap<&'static str, Vec> = HashMap::new(); let mut f64_averages: HashMap<&'static str, Vec> = HashMap::new(); for value in buffer { - match value.clone().into() { - Record::MaxU64(name, number) => u64_maximums.entry(name).or_default().push(number), - Record::AvgF64(name, number) => f64_averages.entry(name).or_default().push(number), + match value { + Record::MaxU64(name, number) => u64_maximums.entry(name).or_default().push(*number), + Record::AvgF64(name, number) => f64_averages.entry(name).or_default().push(*number), } } @@ -193,14 +185,17 @@ impl super::Metrics for Metrics { } /// Puts metric to the metric buffer if it is allowed. - async fn record(&self, value: super::MetricValue) { + async fn record(&self, value: T) + where + T: metric::Value + Into + Send, + { let attributes = self.attributes.read().await; if !value.is_allowed(&attributes.origin) { return; } let mut metric_buffer = self.metric_buffer.lock().await; - metric_buffer.push(value); + metric_buffer.push(value.into()); } /// Calculates counters and average metrics, and flushes buffers to the collector. @@ -341,19 +336,25 @@ mod tests { assert_eq!(result, expected); } + fn flatten_metrics( + values: Vec, + ) -> (HashMap<&'static str, u64>, HashMap<&'static str, f64>) { + super::flatten_metrics(&values.into_iter().map(Into::into).collect::>()) + } + #[test] fn test_flatten_metrics() { - let (m_u64, m_f64) = flatten_metrics(&[] as &[MetricValue]); + let (m_u64, m_f64) = flatten_metrics(vec![]); assert!(m_u64.is_empty()); assert!(m_f64.is_empty()); - let buffer = &[MetricValue::BlockConfidence(90.0)]; + let buffer = vec![MetricValue::BlockConfidence(90.0)]; let (m_u64, m_f64) = flatten_metrics(buffer); assert!(m_u64.is_empty()); assert_eq!(m_f64.len(), 1); assert_eq!(m_f64.get("avail.light.block.confidence"), Some(&90.0)); - let buffer = &[ + let buffer = vec![ MetricValue::BlockConfidence(90.0), MetricValue::BlockHeight(1), MetricValue::BlockConfidence(93.0), @@ -364,7 +365,7 @@ mod tests { assert_eq!(m_f64.len(), 1); assert_eq!(m_f64.get("avail.light.block.confidence"), Some(&91.5)); - let buffer = &[ + let buffer = vec![ MetricValue::BlockConfidence(90.0), MetricValue::BlockHeight(1), MetricValue::BlockConfidence(93.0), @@ -379,7 +380,7 @@ mod tests { assert_eq!(m_f64.len(), 1); assert_eq!(m_f64.get("avail.light.block.confidence"), Some(&93.75)); - let buffer = &[ + let buffer = vec![ MetricValue::DHTConnectedPeers(90), MetricValue::DHTFetchDuration(1.0), MetricValue::DHTPutSuccess(10.0),