Skip to content

Commit

Permalink
Move crawler metrics to crawler client
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Jul 31, 2024
1 parent f7f659e commit 5f80f89
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 33 deletions.
50 changes: 45 additions & 5 deletions core/src/crawl_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::{
p2p::Client,
rpc::{self, Event},
},
telemetry::{MetricValue, Metrics},
types::{self, block_matrix_partition_format, BlockVerified, Delay},
telemetry::{metric, otlp::Record, MetricName, Metrics},
types::{self, block_matrix_partition_format, BlockVerified, Delay, Origin},
};
use kate_recovery::matrix::Partition;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -53,6 +53,46 @@ impl Default for CrawlConfig {
}
}

#[derive(Clone)]
enum CrawlMetricValue {
CellsSuccessRate(f64),
RowsSuccessRate(f64),
BlockDelay(f64),
}

impl MetricName for CrawlMetricValue {
fn name(&self) -> &'static str {
use CrawlMetricValue::*;
match self {
CellsSuccessRate(_) => "avail.light.crawl.cells_success_rate",
RowsSuccessRate(_) => "avail.light.crawl.rows_success_rate",
BlockDelay(_) => "avail.light.crawl.block_delay",
}
}
}
impl From<CrawlMetricValue> for Record {
fn from(value: CrawlMetricValue) -> Self {
use CrawlMetricValue::*;
use Record::*;

let name = value.name();

match value {
CellsSuccessRate(number) => AvgF64(name, number),
RowsSuccessRate(number) => AvgF64(name, number),
BlockDelay(number) => AvgF64(name, number),
}
}
}

impl metric::Value for CrawlMetricValue {
// Metric filter for external peers
// Only the metrics we wish to send to OTel should be in this list
fn is_allowed(&self, origin: &Origin) -> bool {
matches!(origin, Origin::Internal)
}
}

pub async fn run(
mut message_rx: broadcast::Receiver<Event>,
network_client: Client,
Expand Down Expand Up @@ -88,7 +128,7 @@ pub async fn run(
info!("Sleeping for {seconds:?} seconds");
tokio::time::sleep(seconds).await;
let _ = metrics
.record(MetricValue::CrawlBlockDelay(seconds.as_secs() as f64))
.record(CrawlMetricValue::BlockDelay(seconds.as_secs() as f64))
.await;
}
let block_number = block.block_num;
Expand Down Expand Up @@ -116,7 +156,7 @@ pub async fn run(
partition, success_rate, total, fetched, "Fetched block cells",
);
let _ = metrics
.record(MetricValue::CrawlCellsSuccessRate(success_rate))
.record(CrawlMetricValue::CellsSuccessRate(success_rate))
.await;
}

Expand All @@ -138,7 +178,7 @@ pub async fn run(
success_rate, total, fetched, "Fetched block rows"
);
let _ = metrics
.record(MetricValue::CrawlRowsSuccessRate(success_rate))
.record(CrawlMetricValue::RowsSuccessRate(success_rate))
.await;
}

Expand Down
14 changes: 0 additions & 14 deletions core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,6 @@ pub enum MetricValue {
RPCFetched(f64),
RPCFetchDuration(f64),
RPCCallDuration(f64),

#[cfg(feature = "crawl")]
CrawlCellsSuccessRate(f64),
#[cfg(feature = "crawl")]
CrawlRowsSuccessRate(f64),
#[cfg(feature = "crawl")]
CrawlBlockDelay(f64),
}

impl MetricName for MetricValue {
Expand All @@ -115,13 +108,6 @@ impl MetricName for MetricValue {
RPCFetched(_) => "avail.light.rpc.fetched",
RPCFetchDuration(_) => "avail.light.rpc.fetch_duration",
RPCCallDuration(_) => "avail.light.rpc.call_duration",

#[cfg(feature = "crawl")]
CrawlCellsSuccessRate(_) => "avail.light.crawl.cells_success_rate",
#[cfg(feature = "crawl")]
CrawlRowsSuccessRate(_) => "avail.light.crawl.rows_success_rate",
#[cfg(feature = "crawl")]
CrawlBlockDelay(_) => "avail.light.crawl.block_delay",
}
}
}
Expand Down
21 changes: 7 additions & 14 deletions core/src/telemetry/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,6 @@ impl From<MetricValue> for Record {
RPCFetched(number) => AvgF64(name, number),
RPCFetchDuration(number) => AvgF64(name, number),
RPCCallDuration(number) => AvgF64(name, number),

#[cfg(feature = "crawl")]
CrawlCellsSuccessRate(number) => AvgF64(name, number),
#[cfg(feature = "crawl")]
CrawlRowsSuccessRate(number) => AvgF64(name, number),
#[cfg(feature = "crawl")]
CrawlBlockDelay(number) => AvgF64(name, number),
}
}
}
Expand Down Expand Up @@ -343,20 +336,20 @@ mod tests {
assert_eq!(result, expected);
}

fn flatten_metric_values(
fn flatten_metrics(
values: Vec<MetricValue>,
) -> (HashMap<&'static str, u64>, HashMap<&'static str, f64>) {
flatten_metrics(&values.into_iter().map(Into::into).collect::<Vec<Record>>())
super::flatten_metrics(&values.into_iter().map(Into::into).collect::<Vec<Record>>())
}

#[test]
fn test_flatten_metrics() {
let (m_u64, m_f64) = flatten_metric_values(vec![]);
let (m_u64, m_f64) = flatten_metrics(vec![]);
assert!(m_u64.is_empty());
assert!(m_f64.is_empty());

let buffer = vec![MetricValue::BlockConfidence(90.0)];
let (m_u64, m_f64) = flatten_metric_values(buffer);
let (m_u64, m_f64) = flatten_metrics(buffer);
assert!(m_u64.is_empty());
assert_eq!(m_f64.len(), 1);
assert_eq!(m_f64.get("avail.light.block.confidence"), Some(&90.0));
Expand All @@ -366,7 +359,7 @@ mod tests {
MetricValue::BlockHeight(1),
MetricValue::BlockConfidence(93.0),
];
let (m_u64, m_f64) = flatten_metric_values(buffer);
let (m_u64, m_f64) = flatten_metrics(buffer);
assert_eq!(m_u64.len(), 1);
assert_eq!(m_u64.get("avail.light.block.height"), Some(&1));
assert_eq!(m_f64.len(), 1);
Expand All @@ -381,7 +374,7 @@ mod tests {
MetricValue::BlockHeight(10),
MetricValue::BlockHeight(1),
];
let (m_u64, m_f64) = flatten_metric_values(buffer);
let (m_u64, m_f64) = flatten_metrics(buffer);
assert_eq!(m_u64.len(), 1);
assert_eq!(m_u64.get("avail.light.block.height"), Some(&10));
assert_eq!(m_f64.len(), 1);
Expand All @@ -398,7 +391,7 @@ mod tests {
MetricValue::DHTConnectedPeers(80),
MetricValue::BlockConfidence(98.0),
];
let (m_u64, m_f64) = flatten_metric_values(buffer);
let (m_u64, m_f64) = flatten_metrics(buffer);
assert_eq!(m_u64.len(), 1);
assert_eq!(m_u64.get("avail.light.block.height"), Some(&999));
assert_eq!(m_f64.len(), 4);
Expand Down

0 comments on commit 5f80f89

Please sign in to comment.