Skip to content

Commit

Permalink
Make metrics value composable (#648)
Browse files Browse the repository at this point in the history
* Make metrics value composable

* Allow any metric value that can be casted to record

* Move crawler metrics to crawler client
  • Loading branch information
aterentic-ethernal authored Jul 31, 2024
1 parent ace06c8 commit 2fbed04
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 57 deletions.
50 changes: 45 additions & 5 deletions core/src/crawl_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::{
p2p::Client,
rpc::{self, Event},
},
telemetry::{MetricValue, Metrics},
types::{self, block_matrix_partition_format, BlockVerified, Delay},
telemetry::{metric, otlp::Record, MetricName, Metrics},
types::{self, block_matrix_partition_format, BlockVerified, Delay, Origin},
};
use kate_recovery::matrix::Partition;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -53,6 +53,46 @@ impl Default for CrawlConfig {
}
}

#[derive(Clone)]
enum CrawlMetricValue {
CellsSuccessRate(f64),
RowsSuccessRate(f64),
BlockDelay(f64),
}

impl MetricName for CrawlMetricValue {
fn name(&self) -> &'static str {
use CrawlMetricValue::*;
match self {
CellsSuccessRate(_) => "avail.light.crawl.cells_success_rate",
RowsSuccessRate(_) => "avail.light.crawl.rows_success_rate",
BlockDelay(_) => "avail.light.crawl.block_delay",
}
}
}
impl From<CrawlMetricValue> for Record {
fn from(value: CrawlMetricValue) -> Self {
use CrawlMetricValue::*;
use Record::*;

let name = value.name();

match value {
CellsSuccessRate(number) => AvgF64(name, number),
RowsSuccessRate(number) => AvgF64(name, number),
BlockDelay(number) => AvgF64(name, number),
}
}
}

impl metric::Value for CrawlMetricValue {
// Metric filter for external peers
// Only the metrics we wish to send to OTel should be in this list
fn is_allowed(&self, origin: &Origin) -> bool {
matches!(origin, Origin::Internal)
}
}

pub async fn run(
mut message_rx: broadcast::Receiver<Event>,
network_client: Client,
Expand Down Expand Up @@ -88,7 +128,7 @@ pub async fn run(
info!("Sleeping for {seconds:?} seconds");
tokio::time::sleep(seconds).await;
let _ = metrics
.record(MetricValue::CrawlBlockDelay(seconds.as_secs() as f64))
.record(CrawlMetricValue::BlockDelay(seconds.as_secs() as f64))
.await;
}
let block_number = block.block_num;
Expand Down Expand Up @@ -116,7 +156,7 @@ pub async fn run(
partition, success_rate, total, fetched, "Fetched block cells",
);
let _ = metrics
.record(MetricValue::CrawlCellsSuccessRate(success_rate))
.record(CrawlMetricValue::CellsSuccessRate(success_rate))
.await;
}

Expand All @@ -138,7 +178,7 @@ pub async fn run(
success_rate, total, fetched, "Fetched block rows"
);
let _ = metrics
.record(MetricValue::CrawlRowsSuccessRate(success_rate))
.record(CrawlMetricValue::RowsSuccessRate(success_rate))
.await;
}

Expand Down
8 changes: 2 additions & 6 deletions core/src/fat_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ pub async fn run(
#[cfg(test)]
mod tests {
use super::*;
use crate::{data, telemetry, types::RuntimeConfig};
use crate::{data, telemetry::metric::tests, types::RuntimeConfig};
use avail_subxt::{
api::runtime_types::avail_core::{
data_lookup::compact::CompactDataLookup,
Expand Down Expand Up @@ -379,14 +379,10 @@ mod tests {
.expect_insert_cells_into_dht()
.returning(|_, _| Box::pin(async move { Ok(()) }));

let mut mock_metrics = telemetry::MockMetrics::new();
mock_metrics.expect_count().returning(|_| ());
mock_metrics.expect_record().returning(|_| ());

process_block(
&mock_client,
db,
&Arc::new(mock_metrics),
&Arc::new(tests::MockMetrics {}),
&FatClientConfig::from(&RuntimeConfig::default()),
&default_header(),
Instant::now(),
Expand Down
7 changes: 2 additions & 5 deletions core/src/light_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ mod tests {
use crate::{
data,
network::rpc::{cell_count_for_confidence, CELL_COUNT_99_99},
telemetry,
telemetry::metric::tests,
types::RuntimeConfig,
};
use avail_subxt::{
Expand Down Expand Up @@ -348,13 +348,10 @@ mod tests {
Box::pin(async move { Ok((fetched, unfetched, stats)) })
});

let mut mock_metrics = telemetry::MockMetrics::new();
mock_metrics.expect_count().returning(|_| ());
mock_metrics.expect_record().returning(|_| ());
process_block(
db,
&mock_network_client,
&Arc::new(mock_metrics),
&Arc::new(tests::MockMetrics {}),
&cfg,
header,
recv,
Expand Down
30 changes: 30 additions & 0 deletions core/src/telemetry/metric.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use crate::types::Origin;

pub trait Value: Send + Clone {
fn is_allowed(&self, origin: &Origin) -> bool;
}

#[cfg(test)]
pub mod tests {
use crate::telemetry::{metric, MetricCounter, Metrics, Record};
use async_trait::async_trait;
use color_eyre::eyre;
use libp2p::{kad::Mode, Multiaddr};

pub struct MockMetrics {}

#[async_trait]
impl Metrics for MockMetrics {
async fn count(&self, _: MetricCounter) {}
async fn record<T>(&self, _: T)
where
T: metric::Value + Into<Record> + Send,
{
}
async fn flush(&self) -> eyre::Result<()> {
Ok(())
}
async fn update_operating_mode(&self, _: Mode) {}
async fn update_multiaddress(&self, _: Multiaddr) {}
}
}
24 changes: 6 additions & 18 deletions core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use crate::types::Origin;
use async_trait::async_trait;
use color_eyre::Result;
use libp2p::{kad::Mode, Multiaddr};
use mockall::automock;
use otlp::Record;

pub mod metric;
pub mod otlp;

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -81,13 +82,6 @@ pub enum MetricValue {
RPCFetched(f64),
RPCFetchDuration(f64),
RPCCallDuration(f64),

#[cfg(feature = "crawl")]
CrawlCellsSuccessRate(f64),
#[cfg(feature = "crawl")]
CrawlRowsSuccessRate(f64),
#[cfg(feature = "crawl")]
CrawlBlockDelay(f64),
}

impl MetricName for MetricValue {
Expand All @@ -114,18 +108,11 @@ impl MetricName for MetricValue {
RPCFetched(_) => "avail.light.rpc.fetched",
RPCFetchDuration(_) => "avail.light.rpc.fetch_duration",
RPCCallDuration(_) => "avail.light.rpc.call_duration",

#[cfg(feature = "crawl")]
CrawlCellsSuccessRate(_) => "avail.light.crawl.cells_success_rate",
#[cfg(feature = "crawl")]
CrawlRowsSuccessRate(_) => "avail.light.crawl.rows_success_rate",
#[cfg(feature = "crawl")]
CrawlBlockDelay(_) => "avail.light.crawl.block_delay",
}
}
}

impl MetricValue {
impl metric::Value for MetricValue {
// Metric filter for external peers
// Only the metrics we wish to send to OTel should be in this list
fn is_allowed(&self, origin: &Origin) -> bool {
Expand All @@ -139,11 +126,12 @@ impl MetricValue {
}
}

#[automock]
#[async_trait]
pub trait Metrics {
async fn count(&self, counter: MetricCounter);
async fn record(&self, value: MetricValue);
async fn record<T>(&self, value: T)
where
T: metric::Value + Into<Record> + Send;
async fn flush(&self) -> Result<()>;
async fn update_operating_mode(&self, mode: Mode);
async fn update_multiaddress(&self, mode: Multiaddr);
Expand Down
47 changes: 24 additions & 23 deletions core/src/telemetry/otlp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{MetricCounter, MetricValue};
use super::{metric, MetricCounter, MetricValue};
use crate::{
telemetry::MetricName,
types::{Origin, OtelConfig},
Expand All @@ -24,7 +24,7 @@ pub struct Metrics {
meter: Meter,
counters: HashMap<&'static str, Counter<u64>>,
attributes: RwLock<MetricAttributes>,
metric_buffer: Arc<Mutex<Vec<MetricValue>>>,
metric_buffer: Arc<Mutex<Vec<Record>>>,
counter_buffer: Arc<Mutex<Vec<MetricCounter>>>,
}

Expand Down Expand Up @@ -84,7 +84,8 @@ impl Metrics {
}
}

enum Record {
#[derive(Debug)]
pub enum Record {
MaxU64(&'static str, u64),
AvgF64(&'static str, f64),
}
Expand Down Expand Up @@ -117,13 +118,6 @@ impl From<MetricValue> for Record {
RPCFetched(number) => AvgF64(name, number),
RPCFetchDuration(number) => AvgF64(name, number),
RPCCallDuration(number) => AvgF64(name, number),

#[cfg(feature = "crawl")]
CrawlCellsSuccessRate(number) => AvgF64(name, number),
#[cfg(feature = "crawl")]
CrawlRowsSuccessRate(number) => AvgF64(name, number),
#[cfg(feature = "crawl")]
CrawlBlockDelay(number) => AvgF64(name, number),
}
}
}
Expand All @@ -149,16 +143,14 @@ fn flatten_counters(buffer: &[MetricCounter]) -> HashMap<&'static str, u64> {
/// Aggregates buffered metrics into `u64` or `f64` values, depending on the metric.
/// Returned values are a `HashMap`s where the keys are the metric name,
/// and values are the aggregations (avg, max, etc.) of those metrics.
fn flatten_metrics(
buffer: &[impl Into<Record> + Clone],
) -> (HashMap<&'static str, u64>, HashMap<&'static str, f64>) {
fn flatten_metrics(buffer: &[Record]) -> (HashMap<&'static str, u64>, HashMap<&'static str, f64>) {
let mut u64_maximums: HashMap<&'static str, Vec<u64>> = HashMap::new();
let mut f64_averages: HashMap<&'static str, Vec<f64>> = HashMap::new();

for value in buffer {
match value.clone().into() {
Record::MaxU64(name, number) => u64_maximums.entry(name).or_default().push(number),
Record::AvgF64(name, number) => f64_averages.entry(name).or_default().push(number),
match value {
Record::MaxU64(name, number) => u64_maximums.entry(name).or_default().push(*number),
Record::AvgF64(name, number) => f64_averages.entry(name).or_default().push(*number),
}
}

Expand Down Expand Up @@ -193,14 +185,17 @@ impl super::Metrics for Metrics {
}

/// Puts metric to the metric buffer if it is allowed.
async fn record(&self, value: super::MetricValue) {
async fn record<T>(&self, value: T)
where
T: metric::Value + Into<Record> + Send,
{
let attributes = self.attributes.read().await;
if !value.is_allowed(&attributes.origin) {
return;
}

let mut metric_buffer = self.metric_buffer.lock().await;
metric_buffer.push(value);
metric_buffer.push(value.into());
}

/// Calculates counters and average metrics, and flushes buffers to the collector.
Expand Down Expand Up @@ -341,19 +336,25 @@ mod tests {
assert_eq!(result, expected);
}

fn flatten_metrics(
values: Vec<MetricValue>,
) -> (HashMap<&'static str, u64>, HashMap<&'static str, f64>) {
super::flatten_metrics(&values.into_iter().map(Into::into).collect::<Vec<Record>>())
}

#[test]
fn test_flatten_metrics() {
let (m_u64, m_f64) = flatten_metrics(&[] as &[MetricValue]);
let (m_u64, m_f64) = flatten_metrics(vec![]);
assert!(m_u64.is_empty());
assert!(m_f64.is_empty());

let buffer = &[MetricValue::BlockConfidence(90.0)];
let buffer = vec![MetricValue::BlockConfidence(90.0)];
let (m_u64, m_f64) = flatten_metrics(buffer);
assert!(m_u64.is_empty());
assert_eq!(m_f64.len(), 1);
assert_eq!(m_f64.get("avail.light.block.confidence"), Some(&90.0));

let buffer = &[
let buffer = vec![
MetricValue::BlockConfidence(90.0),
MetricValue::BlockHeight(1),
MetricValue::BlockConfidence(93.0),
Expand All @@ -364,7 +365,7 @@ mod tests {
assert_eq!(m_f64.len(), 1);
assert_eq!(m_f64.get("avail.light.block.confidence"), Some(&91.5));

let buffer = &[
let buffer = vec![
MetricValue::BlockConfidence(90.0),
MetricValue::BlockHeight(1),
MetricValue::BlockConfidence(93.0),
Expand All @@ -379,7 +380,7 @@ mod tests {
assert_eq!(m_f64.len(), 1);
assert_eq!(m_f64.get("avail.light.block.confidence"), Some(&93.75));

let buffer = &[
let buffer = vec![
MetricValue::DHTConnectedPeers(90),
MetricValue::DHTFetchDuration(1.0),
MetricValue::DHTPutSuccess(10.0),
Expand Down

0 comments on commit 2fbed04

Please sign in to comment.