Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make metrics value composable #648

Merged
merged 3 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions core/src/crawl_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::{
p2p::Client,
rpc::{self, Event},
},
telemetry::{MetricValue, Metrics},
types::{self, block_matrix_partition_format, BlockVerified, Delay},
telemetry::{metric, otlp::Record, MetricName, Metrics},
types::{self, block_matrix_partition_format, BlockVerified, Delay, Origin},
};
use kate_recovery::matrix::Partition;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -53,6 +53,46 @@ impl Default for CrawlConfig {
}
}

#[derive(Clone)]
enum CrawlMetricValue {
CellsSuccessRate(f64),
RowsSuccessRate(f64),
BlockDelay(f64),
}

impl MetricName for CrawlMetricValue {
fn name(&self) -> &'static str {
use CrawlMetricValue::*;
match self {
CellsSuccessRate(_) => "avail.light.crawl.cells_success_rate",
RowsSuccessRate(_) => "avail.light.crawl.rows_success_rate",
BlockDelay(_) => "avail.light.crawl.block_delay",
}
}
}
impl From<CrawlMetricValue> for Record {
fn from(value: CrawlMetricValue) -> Self {
use CrawlMetricValue::*;
use Record::*;

let name = value.name();

match value {
CellsSuccessRate(number) => AvgF64(name, number),
RowsSuccessRate(number) => AvgF64(name, number),
BlockDelay(number) => AvgF64(name, number),
}
}
}

impl metric::Value for CrawlMetricValue {
// Metric filter for external peers
// Only the metrics we wish to send to OTel should be in this list
fn is_allowed(&self, origin: &Origin) -> bool {
matches!(origin, Origin::Internal)
}
}

pub async fn run(
mut message_rx: broadcast::Receiver<Event>,
network_client: Client,
Expand Down Expand Up @@ -88,7 +128,7 @@ pub async fn run(
info!("Sleeping for {seconds:?} seconds");
tokio::time::sleep(seconds).await;
let _ = metrics
.record(MetricValue::CrawlBlockDelay(seconds.as_secs() as f64))
.record(CrawlMetricValue::BlockDelay(seconds.as_secs() as f64))
.await;
}
let block_number = block.block_num;
Expand Down Expand Up @@ -116,7 +156,7 @@ pub async fn run(
partition, success_rate, total, fetched, "Fetched block cells",
);
let _ = metrics
.record(MetricValue::CrawlCellsSuccessRate(success_rate))
.record(CrawlMetricValue::CellsSuccessRate(success_rate))
.await;
}

Expand All @@ -138,7 +178,7 @@ pub async fn run(
success_rate, total, fetched, "Fetched block rows"
);
let _ = metrics
.record(MetricValue::CrawlRowsSuccessRate(success_rate))
.record(CrawlMetricValue::RowsSuccessRate(success_rate))
.await;
}

Expand Down
8 changes: 2 additions & 6 deletions core/src/fat_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ pub async fn run(
#[cfg(test)]
mod tests {
use super::*;
use crate::{data, telemetry, types::RuntimeConfig};
use crate::{data, telemetry::metric::tests, types::RuntimeConfig};
use avail_subxt::{
api::runtime_types::avail_core::{
data_lookup::compact::CompactDataLookup,
Expand Down Expand Up @@ -379,14 +379,10 @@ mod tests {
.expect_insert_cells_into_dht()
.returning(|_, _| Box::pin(async move { Ok(()) }));

let mut mock_metrics = telemetry::MockMetrics::new();
mock_metrics.expect_count().returning(|_| ());
mock_metrics.expect_record().returning(|_| ());

process_block(
&mock_client,
db,
&Arc::new(mock_metrics),
&Arc::new(tests::MockMetrics {}),
&FatClientConfig::from(&RuntimeConfig::default()),
&default_header(),
Instant::now(),
Expand Down
7 changes: 2 additions & 5 deletions core/src/light_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ mod tests {
use crate::{
data,
network::rpc::{cell_count_for_confidence, CELL_COUNT_99_99},
telemetry,
telemetry::metric::tests,
types::RuntimeConfig,
};
use avail_subxt::{
Expand Down Expand Up @@ -348,13 +348,10 @@ mod tests {
Box::pin(async move { Ok((fetched, unfetched, stats)) })
});

let mut mock_metrics = telemetry::MockMetrics::new();
mock_metrics.expect_count().returning(|_| ());
mock_metrics.expect_record().returning(|_| ());
process_block(
db,
&mock_network_client,
&Arc::new(mock_metrics),
&Arc::new(tests::MockMetrics {}),
&cfg,
header,
recv,
Expand Down
30 changes: 30 additions & 0 deletions core/src/telemetry/metric.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use crate::types::Origin;

pub trait Value: Send + Clone {
fn is_allowed(&self, origin: &Origin) -> bool;
}

#[cfg(test)]
pub mod tests {
use crate::telemetry::{metric, MetricCounter, Metrics, Record};
use async_trait::async_trait;
use color_eyre::eyre;
use libp2p::{kad::Mode, Multiaddr};

pub struct MockMetrics {}

#[async_trait]
impl Metrics for MockMetrics {
async fn count(&self, _: MetricCounter) {}
async fn record<T>(&self, _: T)
where
T: metric::Value + Into<Record> + Send,
{
}
async fn flush(&self) -> eyre::Result<()> {
Ok(())
}
async fn update_operating_mode(&self, _: Mode) {}
async fn update_multiaddress(&self, _: Multiaddr) {}
}
}
24 changes: 6 additions & 18 deletions core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use crate::types::Origin;
use async_trait::async_trait;
use color_eyre::Result;
use libp2p::{kad::Mode, Multiaddr};
use mockall::automock;
use otlp::Record;

pub mod metric;
pub mod otlp;

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -81,13 +82,6 @@ pub enum MetricValue {
RPCFetched(f64),
RPCFetchDuration(f64),
RPCCallDuration(f64),

#[cfg(feature = "crawl")]
CrawlCellsSuccessRate(f64),
#[cfg(feature = "crawl")]
CrawlRowsSuccessRate(f64),
#[cfg(feature = "crawl")]
CrawlBlockDelay(f64),
}

impl MetricName for MetricValue {
Expand All @@ -114,18 +108,11 @@ impl MetricName for MetricValue {
RPCFetched(_) => "avail.light.rpc.fetched",
RPCFetchDuration(_) => "avail.light.rpc.fetch_duration",
RPCCallDuration(_) => "avail.light.rpc.call_duration",

#[cfg(feature = "crawl")]
CrawlCellsSuccessRate(_) => "avail.light.crawl.cells_success_rate",
#[cfg(feature = "crawl")]
CrawlRowsSuccessRate(_) => "avail.light.crawl.rows_success_rate",
#[cfg(feature = "crawl")]
CrawlBlockDelay(_) => "avail.light.crawl.block_delay",
}
}
}

impl MetricValue {
impl metric::Value for MetricValue {
// Metric filter for external peers
// Only the metrics we wish to send to OTel should be in this list
fn is_allowed(&self, origin: &Origin) -> bool {
Expand All @@ -139,11 +126,12 @@ impl MetricValue {
}
}

#[automock]
#[async_trait]
pub trait Metrics {
async fn count(&self, counter: MetricCounter);
async fn record(&self, value: MetricValue);
async fn record<T>(&self, value: T)
where
T: metric::Value + Into<Record> + Send;
async fn flush(&self) -> Result<()>;
async fn update_operating_mode(&self, mode: Mode);
async fn update_multiaddress(&self, mode: Multiaddr);
Expand Down
47 changes: 24 additions & 23 deletions core/src/telemetry/otlp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{MetricCounter, MetricValue};
use super::{metric, MetricCounter, MetricValue};
use crate::{
telemetry::MetricName,
types::{Origin, OtelConfig},
Expand All @@ -24,7 +24,7 @@ pub struct Metrics {
meter: Meter,
counters: HashMap<&'static str, Counter<u64>>,
attributes: RwLock<MetricAttributes>,
metric_buffer: Arc<Mutex<Vec<MetricValue>>>,
metric_buffer: Arc<Mutex<Vec<Record>>>,
counter_buffer: Arc<Mutex<Vec<MetricCounter>>>,
}

Expand Down Expand Up @@ -84,7 +84,8 @@ impl Metrics {
}
}

enum Record {
#[derive(Debug)]
pub enum Record {
MaxU64(&'static str, u64),
AvgF64(&'static str, f64),
}
Expand Down Expand Up @@ -117,13 +118,6 @@ impl From<MetricValue> for Record {
RPCFetched(number) => AvgF64(name, number),
RPCFetchDuration(number) => AvgF64(name, number),
RPCCallDuration(number) => AvgF64(name, number),

#[cfg(feature = "crawl")]
CrawlCellsSuccessRate(number) => AvgF64(name, number),
#[cfg(feature = "crawl")]
CrawlRowsSuccessRate(number) => AvgF64(name, number),
#[cfg(feature = "crawl")]
CrawlBlockDelay(number) => AvgF64(name, number),
}
}
}
Expand All @@ -149,16 +143,14 @@ fn flatten_counters(buffer: &[MetricCounter]) -> HashMap<&'static str, u64> {
/// Aggregates buffered metrics into `u64` or `f64` values, depending on the metric.
/// Returned values are a `HashMap`s where the keys are the metric name,
/// and values are the aggregations (avg, max, etc.) of those metrics.
fn flatten_metrics(
buffer: &[impl Into<Record> + Clone],
) -> (HashMap<&'static str, u64>, HashMap<&'static str, f64>) {
fn flatten_metrics(buffer: &[Record]) -> (HashMap<&'static str, u64>, HashMap<&'static str, f64>) {
let mut u64_maximums: HashMap<&'static str, Vec<u64>> = HashMap::new();
let mut f64_averages: HashMap<&'static str, Vec<f64>> = HashMap::new();

for value in buffer {
match value.clone().into() {
Record::MaxU64(name, number) => u64_maximums.entry(name).or_default().push(number),
Record::AvgF64(name, number) => f64_averages.entry(name).or_default().push(number),
match value {
Record::MaxU64(name, number) => u64_maximums.entry(name).or_default().push(*number),
Record::AvgF64(name, number) => f64_averages.entry(name).or_default().push(*number),
}
}

Expand Down Expand Up @@ -193,14 +185,17 @@ impl super::Metrics for Metrics {
}

/// Puts metric to the metric buffer if it is allowed.
async fn record(&self, value: super::MetricValue) {
async fn record<T>(&self, value: T)
where
T: metric::Value + Into<Record> + Send,
{
let attributes = self.attributes.read().await;
if !value.is_allowed(&attributes.origin) {
return;
}

let mut metric_buffer = self.metric_buffer.lock().await;
metric_buffer.push(value);
metric_buffer.push(value.into());
}

/// Calculates counters and average metrics, and flushes buffers to the collector.
Expand Down Expand Up @@ -341,19 +336,25 @@ mod tests {
assert_eq!(result, expected);
}

fn flatten_metrics(
values: Vec<MetricValue>,
) -> (HashMap<&'static str, u64>, HashMap<&'static str, f64>) {
super::flatten_metrics(&values.into_iter().map(Into::into).collect::<Vec<Record>>())
}

#[test]
fn test_flatten_metrics() {
let (m_u64, m_f64) = flatten_metrics(&[] as &[MetricValue]);
let (m_u64, m_f64) = flatten_metrics(vec![]);
assert!(m_u64.is_empty());
assert!(m_f64.is_empty());

let buffer = &[MetricValue::BlockConfidence(90.0)];
let buffer = vec![MetricValue::BlockConfidence(90.0)];
let (m_u64, m_f64) = flatten_metrics(buffer);
assert!(m_u64.is_empty());
assert_eq!(m_f64.len(), 1);
assert_eq!(m_f64.get("avail.light.block.confidence"), Some(&90.0));

let buffer = &[
let buffer = vec![
MetricValue::BlockConfidence(90.0),
MetricValue::BlockHeight(1),
MetricValue::BlockConfidence(93.0),
Expand All @@ -364,7 +365,7 @@ mod tests {
assert_eq!(m_f64.len(), 1);
assert_eq!(m_f64.get("avail.light.block.confidence"), Some(&91.5));

let buffer = &[
let buffer = vec![
MetricValue::BlockConfidence(90.0),
MetricValue::BlockHeight(1),
MetricValue::BlockConfidence(93.0),
Expand All @@ -379,7 +380,7 @@ mod tests {
assert_eq!(m_f64.len(), 1);
assert_eq!(m_f64.get("avail.light.block.confidence"), Some(&93.75));

let buffer = &[
let buffer = vec![
MetricValue::DHTConnectedPeers(90),
MetricValue::DHTFetchDuration(1.0),
MetricValue::DHTPutSuccess(10.0),
Expand Down
Loading