Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graph: Add new counter to EndpointMetrics #4490

Merged
merged 1 commit into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions chain/ethereum/examples/firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use graph::{
env::env_var,
firehose::SubgraphLimit,
log::logger,
prelude::{prost, tokio, tonic},
prelude::{prost, tokio, tonic, MetricsRegistry},
{firehose, firehose::FirehoseEndpoint},
};
use graph_chain_ethereum::codec;
Expand All @@ -24,7 +24,11 @@ async fn main() -> Result<(), Error> {

let logger = logger(false);
let host = "https://api.streamingfast.io:443".to_string();
let metrics = Arc::new(EndpointMetrics::new(logger, &[host.clone()]));
let metrics = Arc::new(EndpointMetrics::new(
logger,
&[host.clone()],
Arc::new(MetricsRegistry::mock()),
));

let firehose = Arc::new(FirehoseEndpoint::new(
"firehose",
Expand Down
6 changes: 5 additions & 1 deletion chain/substreams/examples/substreams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ async fn main() -> Result<(), Error> {
prometheus_registry.clone(),
));

let endpoint_metrics = EndpointMetrics::new(logger.clone(), &[endpoint.clone()]);
let endpoint_metrics = EndpointMetrics::new(
logger.clone(),
&[endpoint.clone()],
Arc::new(MetricsRegistry::mock()),
);

let firehose = Arc::new(FirehoseEndpoint::new(
"substreams",
Expand Down
14 changes: 13 additions & 1 deletion graph/src/components/metrics/registry.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use prometheus::{labels, Histogram};
use prometheus::{labels, Histogram, IntCounterVec};

use crate::components::metrics::{counter_with_labels, gauge_with_labels};
use crate::prelude::Collector;
Expand Down Expand Up @@ -386,6 +386,18 @@ impl MetricsRegistry {
Ok(counter)
}

pub fn new_int_counter_vec(
&self,
name: &str,
help: &str,
variable_labels: &[&str],
) -> Result<Box<IntCounterVec>, PrometheusError> {
let opts = Opts::new(name, help);
let counters = Box::new(IntCounterVec::new(opts, &variable_labels)?);
self.register(name, counters.clone());
Ok(counters)
}

pub fn new_counter_vec(
&self,
name: &str,
Expand Down
140 changes: 118 additions & 22 deletions graph/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ use std::{
},
};

use prometheus::IntCounterVec;
use slog::{warn, Logger};

use crate::data::value::Word;
use crate::{components::metrics::MetricsRegistry, data::value::Word};

/// HostCount is the underlying structure to keep the count,
/// we require that all the hosts are known ahead of time, this way we can
Expand All @@ -20,50 +21,140 @@ type HostCount = Arc<HashMap<Host, AtomicU64>>;
/// adapters if they share the same endpoint.
pub type Host = Word;

/// This struct represents all the current labels except for the result
/// which is added separately. If any new labels are necessary they should
/// remain in the same order as added in [`EndpointMetrics::new`]
#[derive(Clone)]
pub struct RequestLabels {
pub host: Host,
pub req_type: Word,
pub conn_type: ConnectionType,
}

/// The type of underlying connection we are reporting for.
#[derive(Clone)]
pub enum ConnectionType {
Firehose,
Substreams,
Rpc,
}

impl Into<&str> for &ConnectionType {
fn into(self) -> &'static str {
match self {
ConnectionType::Firehose => "firehose",
ConnectionType::Substreams => "substreams",
ConnectionType::Rpc => "rpc",
}
}
}

impl RequestLabels {
fn to_slice(&self, is_success: bool) -> Box<[&str]> {
Box::new([
(&self.conn_type).into(),
self.req_type.as_str(),
self.host.as_str(),
match is_success {
true => "success",
false => "failure",
},
])
}
}

/// EndpointMetrics keeps track of calls success rate for specific calls,
/// a success call to a host will clear the error count.
#[derive(Debug)]
pub struct EndpointMetrics {
logger: Logger,
hosts: HostCount,
counter: Box<IntCounterVec>,
}

impl std::fmt::Debug for EndpointMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("{:?}", self.hosts))
}
}

impl EndpointMetrics {
pub fn new(logger: Logger, hosts: &[impl AsRef<str>]) -> Self {
pub fn new(logger: Logger, hosts: &[impl AsRef<str>], registry: Arc<MetricsRegistry>) -> Self {
let hosts = Arc::new(HashMap::from_iter(
hosts
.iter()
.map(|h| (Host::from(h.as_ref()), AtomicU64::new(0))),
));

Self { logger, hosts }
let counter = registry
.new_int_counter_vec(
"endpoint_request",
"successfull request",
&["conn_type", "req_type", "host", "result"],
)
.expect("unable to create endpoint_request counter_vec");

Self {
logger,
hosts,
counter,
}
}

/// This should only be used for testing.
pub fn mock() -> Self {
use slog::{o, Discard};
Self {
logger: Logger::root(Discard, o!()),
hosts: Arc::new(HashMap::default()),
let hosts: &[&str] = &[];
Self::new(
Logger::root(Discard, o!()),
hosts,
Arc::new(MetricsRegistry::mock()),
)
}

#[cfg(debug_assertions)]
pub fn report_for_test(&self, host: &Host, success: bool) {
match success {
true => self.success(&RequestLabels {
host: host.clone(),
req_type: "".into(),
conn_type: ConnectionType::Firehose,
}),
false => self.failure(&RequestLabels {
host: host.clone(),
req_type: "".into(),
conn_type: ConnectionType::Firehose,
}),
}
}

pub fn success(&self, host: &Host) {
match self.hosts.get(host) {
pub fn success(&self, labels: &RequestLabels) {
match self.hosts.get(&labels.host) {
Some(count) => {
count.store(0, Ordering::Relaxed);
}
None => warn!(&self.logger, "metrics not available for host {}", host),
}
None => warn!(
&self.logger,
"metrics not available for host {}", labels.host
),
};

self.counter.with_label_values(&labels.to_slice(true)).inc();
}

pub fn failure(&self, host: &Host) {
match self.hosts.get(host) {
pub fn failure(&self, labels: &RequestLabels) {
match self.hosts.get(&labels.host) {
Some(count) => {
count.fetch_add(1, Ordering::Relaxed);
}
None => warn!(&self.logger, "metrics not available for host {}", host),
}
None => warn!(
&self.logger,
"metrics not available for host {}", &labels.host
),
};

self.counter
.with_label_values(&labels.to_slice(false))
.inc();
}

/// Returns the current error count of a host or 0 if the host
Expand All @@ -78,23 +169,28 @@ impl EndpointMetrics {

#[cfg(test)]
mod test {
use std::sync::Arc;

use slog::{o, Discard, Logger};

use crate::endpoint::{EndpointMetrics, Host};
use crate::{
components::metrics::MetricsRegistry,
endpoint::{EndpointMetrics, Host},
};

#[tokio::test]
async fn should_increment_and_reset() {
let (a, b, c): (Host, Host, Host) = ("a".into(), "b".into(), "c".into());
let hosts: &[&str] = &[&a, &b, &c];
let logger = Logger::root(Discard, o!());

let metrics = EndpointMetrics::new(logger, hosts);
let metrics = EndpointMetrics::new(logger, hosts, Arc::new(MetricsRegistry::mock()));

metrics.success(&a);
metrics.failure(&a);
metrics.failure(&b);
metrics.failure(&b);
metrics.success(&c);
metrics.report_for_test(&a, true);
metrics.report_for_test(&a, false);
metrics.report_for_test(&b, false);
metrics.report_for_test(&b, false);
metrics.report_for_test(&c, true);

assert_eq!(metrics.get_count(&a), 1);
assert_eq!(metrics.get_count(&b), 2);
Expand Down
27 changes: 21 additions & 6 deletions graph/src/firehose/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
blockchain::BlockPtr,
cheap_clone::CheapClone,
components::store::BlockNumber,
endpoint::{EndpointMetrics, Host},
endpoint::{ConnectionType, EndpointMetrics, Host, RequestLabels},
firehose::decode_firehose_block,
prelude::{anyhow, debug, info},
substreams,
Expand Down Expand Up @@ -196,7 +196,11 @@ impl FirehoseEndpoint {
let metrics = MetricsInterceptor {
metrics: self.endpoint_metrics.cheap_clone(),
service: self.channel.cheap_clone(),
host: self.host.clone(),
labels: RequestLabels {
host: self.host.clone(),
req_type: "unknown".into(),
conn_type: ConnectionType::Firehose,
},
};

let mut client: FetchClient<
Expand All @@ -219,7 +223,11 @@ impl FirehoseEndpoint {
let metrics = MetricsInterceptor {
metrics: self.endpoint_metrics.cheap_clone(),
service: self.channel.cheap_clone(),
host: self.host.clone(),
labels: RequestLabels {
host: self.host.clone(),
req_type: "unknown".into(),
conn_type: ConnectionType::Firehose,
},
};

let mut client = StreamClient::with_interceptor(metrics, self.auth.clone())
Expand All @@ -240,7 +248,11 @@ impl FirehoseEndpoint {
let metrics = MetricsInterceptor {
metrics: self.endpoint_metrics.cheap_clone(),
service: self.channel.cheap_clone(),
host: self.host.clone(),
labels: RequestLabels {
host: self.host.clone(),
req_type: "unknown".into(),
conn_type: ConnectionType::Substreams,
},
};

let mut client =
Expand Down Expand Up @@ -505,7 +517,9 @@ mod test {

use slog::{o, Discard, Logger};

use crate::{endpoint::EndpointMetrics, firehose::SubgraphLimit};
use crate::{
components::metrics::MetricsRegistry, endpoint::EndpointMetrics, firehose::SubgraphLimit,
};

use super::{AvailableCapacity, FirehoseEndpoint, FirehoseEndpoints, SUBGRAPHS_PER_CONN};

Expand Down Expand Up @@ -607,6 +621,7 @@ mod test {
"http://127.0.0.2/",
"http://127.0.0.3/",
],
Arc::new(MetricsRegistry::mock()),
));

let high_error_adapter1 = Arc::new(FirehoseEndpoint::new(
Expand Down Expand Up @@ -646,7 +661,7 @@ mod test {
endpoint_metrics.clone(),
));

endpoint_metrics.failure(&high_error_adapter1.host);
endpoint_metrics.report_for_test(&high_error_adapter1.host, false);

let mut endpoints = FirehoseEndpoints::from(vec![
high_error_adapter1.clone(),
Expand Down
10 changes: 5 additions & 5 deletions graph/src/firehose/interceptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tonic::{
service::Interceptor,
};

use crate::endpoint::{EndpointMetrics, Host};
use crate::endpoint::{EndpointMetrics, RequestLabels};

#[derive(Clone)]
pub struct AuthInterceptor {
Expand Down Expand Up @@ -37,7 +37,7 @@ impl Interceptor for AuthInterceptor {
pub struct MetricsInterceptor<S> {
pub(crate) metrics: Arc<EndpointMetrics>,
pub(crate) service: S,
pub(crate) host: Host,
pub(crate) labels: RequestLabels,
}

impl<S, Request> Service<Request> for MetricsInterceptor<S>
Expand All @@ -60,16 +60,16 @@ where
}

fn call(&mut self, req: Request) -> Self::Future {
let host = self.host.clone();
let labels = self.labels.clone();
let metrics = self.metrics.clone();

let fut = self.service.call(req);
let res = async move {
let res = fut.await;
if res.is_ok() {
metrics.success(&host);
metrics.success(&labels);
} else {
metrics.failure(&host);
metrics.failure(&labels);
}
res
};
Expand Down
1 change: 1 addition & 0 deletions node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ async fn main() {
let endpoint_metrics = Arc::new(EndpointMetrics::new(
logger.clone(),
&config.chains.provider_urls(),
metrics_registry.cheap_clone(),
));

// Ethereum clients; query nodes ignore all ethereum clients and never
Expand Down
1 change: 1 addition & 0 deletions node/src/manager/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub async fn run(
let endpoint_metrics = Arc::new(EndpointMetrics::new(
logger.clone(),
&config.chains.provider_urls(),
metrics_registry.cheap_clone(),
));

// Convert the clients into a link resolver. Since we want to get past
Expand Down