Skip to content

Commit

Permalink
rpc server metrics impl (#3913)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
0xprames and mattsse authored Jul 26, 2023
1 parent d5ea168 commit caa2683
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/rpc/rpc-builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ reth-rpc-engine-api = { path = "../rpc-engine-api" }
reth-rpc-types = { workspace = true }
reth-tasks = { workspace = true }
reth-transaction-pool = { workspace = true }
reth-metrics = { workspace = true, features = ["common"] }

# rpc/net
jsonrpsee = { version = "0.18", features = ["server"] }
Expand Down
17 changes: 13 additions & 4 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
//! }
//! ```
use crate::{auth::AuthRpcModule, error::WsHttpSamePortError};
use crate::{auth::AuthRpcModule, error::WsHttpSamePortError, metrics::RpcServerMetrics};
use constants::*;
use error::{RpcError, ServerKind};
use jsonrpsee::{
Expand Down Expand Up @@ -157,6 +157,9 @@ pub mod constants;
/// Additional support for tracing related rpc calls
pub mod tracing_pool;

// Rpc server metrics
mod metrics;

// re-export for convenience
pub use crate::eth::{EthConfig, EthHandlers};
pub use jsonrpsee::server::ServerBuilder;
Expand Down Expand Up @@ -1232,7 +1235,7 @@ impl RpcServerConfig {
let ws_socket_addr = self
.ws_addr
.unwrap_or(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, DEFAULT_WS_RPC_PORT)));

let metrics = RpcServerMetrics::default();
// If both are configured on the same port, we combine them into one server.
if self.http_addr == self.ws_addr &&
self.http_server_config.is_some() &&
Expand Down Expand Up @@ -1264,6 +1267,7 @@ impl RpcServerConfig {
http_socket_addr,
cors,
ServerKind::WsHttp(http_socket_addr),
metrics.clone(),
)
.await?;
return Ok(WsHttpServer {
Expand All @@ -1285,6 +1289,7 @@ impl RpcServerConfig {
ws_socket_addr,
self.ws_cors_domains.take(),
ServerKind::WS(ws_socket_addr),
metrics.clone(),
)
.await?;
ws_local_addr = Some(addr);
Expand All @@ -1298,6 +1303,7 @@ impl RpcServerConfig {
http_socket_addr,
self.http_cors_domains.take(),
ServerKind::Http(http_socket_addr),
metrics.clone(),
)
.await?;
http_local_addr = Some(addr);
Expand Down Expand Up @@ -1529,9 +1535,9 @@ impl Default for WsHttpServers {
/// Http Servers Enum
enum WsHttpServerKind {
/// Http server
Plain(Server),
Plain(Server<Identity, RpcServerMetrics>),
/// Http server with cors
WithCors(Server<Stack<CorsLayer, Identity>>),
WithCors(Server<Stack<CorsLayer, Identity>, RpcServerMetrics>),
}

// === impl WsHttpServerKind ===
Expand All @@ -1551,12 +1557,14 @@ impl WsHttpServerKind {
socket_addr: SocketAddr,
cors_domains: Option<String>,
server_kind: ServerKind,
metrics: RpcServerMetrics,
) -> Result<(Self, SocketAddr), RpcError> {
if let Some(cors) = cors_domains.as_deref().map(cors::create_cors_layer) {
let cors = cors.map_err(|err| RpcError::Custom(err.to_string()))?;
let middleware = tower::ServiceBuilder::new().layer(cors);
let server = builder
.set_middleware(middleware)
.set_logger(metrics)
.build(socket_addr)
.await
.map_err(|err| RpcError::from_jsonrpsee_error(err, server_kind))?;
Expand All @@ -1565,6 +1573,7 @@ impl WsHttpServerKind {
Ok((server, local_addr))
} else {
let server = builder
.set_logger(metrics)
.build(socket_addr)
.await
.map_err(|err| RpcError::from_jsonrpsee_error(err, server_kind))?;
Expand Down
84 changes: 84 additions & 0 deletions crates/rpc/rpc-builder/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use jsonrpsee::server::logger::{HttpRequest, Logger, MethodKind, Params, TransportProtocol};
use reth_metrics::{
metrics::{self, Counter, Histogram},
Metrics,
};
use std::{net::SocketAddr, time::Instant};

/// Metrics for the rpc server
#[derive(Metrics, Clone)]
#[metrics(scope = "rpc_server")]
pub(crate) struct RpcServerMetrics {
/// The number of calls started
calls_started: Counter,
/// The number of successful calls
successful_calls: Counter,
/// The number of failed calls
failed_calls: Counter,
/// The number of requests started
requests_started: Counter,
/// The number of requests finished
requests_finished: Counter,
/// The number of ws sessions opened
ws_session_opened: Counter,
/// The number of ws sessions closed
ws_session_closed: Counter,
/// Latency for a single request/response pair
request_latency: Histogram,
/// Latency for a single call
call_latency: Histogram,
}

impl Logger for RpcServerMetrics {
type Instant = Instant;
fn on_connect(
&self,
_remote_addr: SocketAddr,
_request: &HttpRequest,
transport: TransportProtocol,
) {
match transport {
TransportProtocol::Http => {}
TransportProtocol::WebSocket => self.ws_session_opened.increment(1),
}
}
fn on_request(&self, _transport: TransportProtocol) -> Self::Instant {
self.requests_started.increment(1);
Instant::now()
}
fn on_call(
&self,
_method_name: &str,
_params: Params<'_>,
_kind: MethodKind,
_transport: TransportProtocol,
) {
self.calls_started.increment(1);
}
fn on_result(
&self,
_method_name: &str,
success: bool,
started_at: Self::Instant,
_transport: TransportProtocol,
) {
// capture call duration
self.call_latency.record(started_at.elapsed().as_millis() as f64);
if !success {
self.failed_calls.increment(1);
} else {
self.successful_calls.increment(1);
}
}
fn on_response(&self, _result: &str, started_at: Self::Instant, _transport: TransportProtocol) {
// capture request latency for this request/response pair
self.request_latency.record(started_at.elapsed().as_millis() as f64);
self.requests_finished.increment(1);
}
fn on_disconnect(&self, _remote_addr: SocketAddr, transport: TransportProtocol) {
match transport {
TransportProtocol::Http => {}
TransportProtocol::WebSocket => self.ws_session_closed.increment(1),
}
}
}

0 comments on commit caa2683

Please sign in to comment.