Skip to content

Commit 8b1a524

Browse files
authored
graph: Add endpoint metrics (#4430)
Add a generic mechanism to count errors based on host url * add and wire interceptor * Add interceptors for metrics and auth * use error code to get firehose adapter
1 parent 94e93b0 commit 8b1a524

File tree

17 files changed

+647
-168
lines changed

17 files changed

+647
-168
lines changed

Cargo.lock

+48-44
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chain/ethereum/examples/firehose.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use anyhow::Error;
22
use graph::{
3+
endpoint::EndpointMetrics,
34
env::env_var,
45
firehose::SubgraphLimit,
6+
log::logger,
57
prelude::{prost, tokio, tonic},
68
{firehose, firehose::FirehoseEndpoint},
79
};
@@ -20,13 +22,18 @@ async fn main() -> Result<(), Error> {
2022
token = Some(token_env);
2123
}
2224

25+
let logger = logger(false);
26+
let host = "https://api.streamingfast.io:443".to_string();
27+
let metrics = Arc::new(EndpointMetrics::new(logger, &[host.clone()]));
28+
2329
let firehose = Arc::new(FirehoseEndpoint::new(
2430
"firehose",
25-
"https://api.streamingfast.io:443",
31+
&host,
2632
token,
2733
false,
2834
false,
2935
SubgraphLimit::Unlimited,
36+
metrics,
3037
));
3138

3239
loop {

chain/ethereum/src/chain.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ impl Blockchain for Chain {
399399
) -> Result<BlockPtr, IngestorError> {
400400
match self.client.as_ref() {
401401
ChainClient::Firehose(endpoints) => endpoints
402-
.random()?
402+
.endpoint()?
403403
.block_ptr_for_number::<HeaderOnlyBlock>(logger, number)
404404
.await
405405
.map_err(IngestorError::Unknown),

chain/substreams/examples/substreams.rs

+4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use anyhow::{format_err, Context, Error};
22
use graph::blockchain::block_stream::BlockStreamEvent;
33
use graph::blockchain::substreams_block_stream::SubstreamsBlockStream;
4+
use graph::endpoint::EndpointMetrics;
45
use graph::firehose::SubgraphLimit;
56
use graph::prelude::{info, tokio, DeploymentHash, Registry};
67
use graph::tokio_stream::StreamExt;
@@ -41,13 +42,16 @@ async fn main() -> Result<(), Error> {
4142
prometheus_registry.clone(),
4243
));
4344

45+
let endpoint_metrics = EndpointMetrics::new(logger.clone(), &[endpoint.clone()]);
46+
4447
let firehose = Arc::new(FirehoseEndpoint::new(
4548
"substreams",
4649
&endpoint,
4750
token,
4851
false,
4952
false,
5053
SubgraphLimit::Unlimited,
54+
Arc::new(endpoint_metrics),
5155
));
5256

5357
let mut stream: SubstreamsBlockStream<graph_chain_substreams::Chain> =

graph/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ chrono = "0.4.23"
1717
envconfig = "0.10.0"
1818
Inflector = "0.11.3"
1919
isatty = "0.1.9"
20-
reqwest = { version = "0.11.2", features = ["json", "stream", "multipart"] }
20+
reqwest = { version = "0.11.14", features = ["json", "stream", "multipart"] }
2121
ethabi = "17.2"
2222
hex = "0.4.3"
2323
http = "0.2.3"

graph/src/blockchain/client.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ impl<C: Blockchain> ChainClient<C> {
2525
// adapter limits in the configuration can effectively disable firehose
2626
// by setting a limit to 0.
2727
// In this case we should fallback to an rpc client.
28-
let firehose_available = firehose_endpoints.random().is_ok();
28+
let firehose_available = firehose_endpoints.endpoint().is_ok();
2929

3030
match firehose_available {
3131
true => Self::Firehose(firehose_endpoints),
@@ -42,7 +42,7 @@ impl<C: Blockchain> ChainClient<C> {
4242

4343
pub fn firehose_endpoint(&self) -> anyhow::Result<Arc<FirehoseEndpoint>> {
4444
match self {
45-
ChainClient::Firehose(endpoints) => endpoints.random(),
45+
ChainClient::Firehose(endpoints) => endpoints.endpoint(),
4646
_ => Err(anyhow!("firehose endpoint requested on rpc chain client")),
4747
}
4848
}

graph/src/blockchain/firehose_block_stream.rs

+1
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ fn stream_blocks<C: Blockchain, F: FirehoseMapper<C>>(
226226
"start_block" => start_block_num,
227227
"subgraph" => &deployment,
228228
"cursor" => latest_cursor.to_string(),
229+
"provider_err_count" => endpoint.current_error_count(),
229230
);
230231

231232
// We just reconnected, assume that we want to back off on errors

0 commit comments

Comments
 (0)