From 885cc9ca00e0dee3eacdedf71d6966cb429e8672 Mon Sep 17 00:00:00 2001 From: Andy Date: Tue, 8 Oct 2024 15:03:43 -0400 Subject: [PATCH] feat(middleware): add response extractor. (#816) --- Cargo.lock | 12 ++ Cargo.toml | 1 - crates/pool/src/server/remote/server.rs | 8 +- crates/provider/Cargo.toml | 3 + crates/provider/src/alloy/metrics.rs | 168 ++++++++++++++++++-- crates/provider/src/alloy/mod.rs | 16 +- crates/provider/src/lib.rs | 1 - crates/rpc/Cargo.toml | 1 + crates/rpc/src/lib.rs | 1 + crates/rpc/src/rpc_metrics.rs | 154 ++++++++++++++++++- crates/rpc/src/task.rs | 13 +- crates/task/Cargo.toml | 1 + crates/task/src/grpc/grpc_metrics.rs | 122 ++++++++++++++- crates/task/src/lib.rs | 1 - crates/task/src/metrics.rs | 188 ----------------------- crates/types/Cargo.toml | 2 + crates/types/src/task/metric_recorder.rs | 114 ++++++++++++++ crates/types/src/task/mod.rs | 4 +- crates/types/src/task/status_code.rs | 64 ++++++++ crates/types/src/task/traits.rs | 19 --- 20 files changed, 643 insertions(+), 250 deletions(-) delete mode 100644 crates/task/src/metrics.rs create mode 100644 crates/types/src/task/metric_recorder.rs create mode 100644 crates/types/src/task/status_code.rs delete mode 100644 crates/types/src/task/traits.rs diff --git a/Cargo.lock b/Cargo.lock index b240c2708..78fe8c62f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1564,6 +1564,12 @@ dependencies = [ "serde", ] +[[package]] +name = "cargo-husky" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b02b629252fe8ef6460461409564e2c21d0c8e77e0944f3d189ff06c4e932ad" + [[package]] name = "cc" version = "1.1.15" @@ -4539,6 +4545,7 @@ dependencies = [ "anyhow", "async-trait", "auto_impl", + "futures-util", "mockall", "reqwest", "rundler-contracts", @@ -4547,6 +4554,7 @@ dependencies = [ "rundler-utils", "thiserror", "tokio", + "tower 0.4.13", "tracing", "url", ] @@ -4561,6 +4569,7 @@ dependencies = [ "anyhow", "async-trait", "futures-util", + "http 1.1.0", "jsonrpsee", "metrics", "mockall", @@ -4621,6 +4630,7 @@ dependencies = [ "async-trait", "futures", "metrics", + "pin-project", "reth-tasks", "rundler-provider", "rundler-types", @@ -4641,10 +4651,12 @@ dependencies = [ "alloy-sol-types", "anyhow", "async-trait", + "cargo-husky", "chrono", "const-hex", "constcat", "futures-util", + "metrics", "mockall", "num_enum", "parse-display", diff --git a/Cargo.toml b/Cargo.toml index 13513d616..0436ebc7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,7 +66,6 @@ serde = "1.0.210" serde_json = "1.0.128" rand = "0.8.5" reqwest = { version = "0.12.8", default-features = false, features = ["rustls-tls"] } -rustls = "0.23.13" thiserror = "1.0.64" tokio = { version = "1.39.3", default-features = false, features = ["rt", "sync", "time"]} tokio-util = "0.7.12" diff --git a/crates/pool/src/server/remote/server.rs b/crates/pool/src/server/remote/server.rs index edaf06e2d..5e64aade6 100644 --- a/crates/pool/src/server/remote/server.rs +++ b/crates/pool/src/server/remote/server.rs @@ -23,8 +23,7 @@ use alloy_primitives::{Address, B256}; use async_trait::async_trait; use futures_util::StreamExt; use rundler_task::{ - grpc::{grpc_metrics::HttpMethodExtractor, protos::from_bytes}, - metrics::MetricsLayer, + grpc::{grpc_metrics::GrpcMetricsLayer, protos::from_bytes}, GracefulShutdown, TaskSpawner, }; use rundler_types::{ @@ -83,10 +82,7 @@ pub(crate) async fn remote_mempool_server_task( .set_serving::>() .await; - let metrics_layer = MetricsLayer::::new( - "op_pool_service".to_string(), - "http-grpc".to_string(), - ); + let metrics_layer = GrpcMetricsLayer::new("op_pool_service".to_string()); if let Err(e) = Server::builder() .layer(metrics_layer) diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index 31f1452e3..30a6b390b 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -30,8 +30,11 @@ anyhow.workspace = true async-trait.workspace = true auto_impl.workspace = true thiserror.workspace = true +futures-util.workspace = true +tower.workspace = true tracing.workspace = true url.workspace = true + mockall = {workspace = true, optional = true } [features] diff --git a/crates/provider/src/alloy/metrics.rs b/crates/provider/src/alloy/metrics.rs index 23c92e6d2..1c87d921e 100644 --- a/crates/provider/src/alloy/metrics.rs +++ b/crates/provider/src/alloy/metrics.rs @@ -11,22 +11,160 @@ // You should have received a copy of the GNU General Public License along with Rundler. // If not, see https://www.gnu.org/licenses/. -use alloy_json_rpc::RequestPacket; -/// Method extractor -use rundler_types::task::traits::RequestExtractor; - -/// Method extractor for Alloy providers -#[derive(Clone, Copy)] -pub struct AlloyMethodExtractor; - -impl RequestExtractor for AlloyMethodExtractor { - fn get_method_name(req: &RequestPacket) -> String { - match req { - RequestPacket::Single(request) => request.method().to_string(), - _ => { - // can't extract method name for batch. - "batch".to_string() +use std::task::{Context, Poll}; + +use alloy_json_rpc::{RequestPacket, ResponsePacket}; +use alloy_transport::{BoxFuture, HttpError, TransportError, TransportErrorKind}; +use futures_util::FutureExt; +use rundler_types::task::{ + metric_recorder::MethodSessionLogger, + status_code::{HttpCode, RpcCode}, +}; +use tower::{Layer, Service}; + +/// Alloy provider metric layer. +#[derive(Default)] +pub(crate) struct AlloyMetricLayer {} + +impl AlloyMetricLayer {} + +impl Layer for AlloyMetricLayer +where + S: Service + Sync, +{ + type Service = AlloyMetricMiddleware; + + fn layer(&self, service: S) -> Self::Service { + AlloyMetricMiddleware::new(service) + } +} + +pub struct AlloyMetricMiddleware { + service: S, +} + +impl AlloyMetricMiddleware +where + S: Service + Sync, +{ + /// carete an alloy provider metric layer. + pub fn new(service: S) -> Self { + Self { service } + } +} + +impl Clone for AlloyMetricMiddleware +where + S: Clone, +{ + fn clone(&self) -> Self { + Self { + service: self.service.clone(), + } + } +} + +impl Service for AlloyMetricMiddleware +where + S: Service + + Sync + + Send + + Clone + + 'static, + S::Future: Send, +{ + type Response = ResponsePacket; + type Error = TransportError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx) + } + + fn call(&mut self, request: RequestPacket) -> Self::Future { + let method_name = get_method_name(&request); + let method_logger = MethodSessionLogger::start( + "alloy_provider_client".to_string(), + method_name, + "rpc".to_string(), + ); + let mut svc = self.service.clone(); + async move { + let response = svc.call(request).await; + method_logger.done(); + match &response { + Ok(resp) => { + method_logger.record_http(HttpCode::TwoHundreds); + method_logger.record_rpc(get_rpc_status_code(resp)); + } + Err(e) => match e { + alloy_json_rpc::RpcError::ErrorResp(rpc_error) => { + method_logger.record_http(HttpCode::TwoHundreds); + method_logger.record_rpc(get_rpc_status_from_code(rpc_error.code)); + } + alloy_json_rpc::RpcError::Transport(TransportErrorKind::HttpError( + HttpError { status, body: _ }, + )) => { + method_logger.record_http(get_http_status_from_code(*status)); + } + alloy_json_rpc::RpcError::NullResp => { + method_logger.record_http(HttpCode::TwoHundreds); + method_logger.record_rpc(RpcCode::Success); + } + _ => {} + }, } + response } + .boxed() } } + +/// Get the method name from the request +fn get_method_name(req: &RequestPacket) -> String { + match req { + RequestPacket::Single(request) => request.method().to_string(), + RequestPacket::Batch(_) => { + // can't extract method name for batch. + "batch".to_string() + } + } +} + +fn get_rpc_status_from_code(code: i64) -> RpcCode { + match code { + -32700 => RpcCode::ParseError, + -32600 => RpcCode::InvalidRequest, + -32601 => RpcCode::MethodNotFound, + -32602 => RpcCode::InvalidParams, + -32603 => RpcCode::InternalError, + x if (-32099..=-32000).contains(&x) => RpcCode::ServerError, + _ => RpcCode::Other, + } +} + +fn get_http_status_from_code(code: u16) -> HttpCode { + match code { + x if (200..=299).contains(&x) => HttpCode::TwoHundreds, + x if (400..=499).contains(&x) => HttpCode::FourHundreds, + x if (500..=599).contains(&x) => HttpCode::FiveHundreds, + _ => HttpCode::Other, + } +} + +fn get_rpc_status_code(response_packet: &ResponsePacket) -> RpcCode { + let response: &alloy_json_rpc::Response = match response_packet { + ResponsePacket::Batch(resps) => { + if resps.is_empty() { + return RpcCode::Success; + } + &resps[0] + } + ResponsePacket::Single(resp) => resp, + }; + let response_code: i64 = match &response.payload { + alloy_json_rpc::ResponsePayload::Success(_) => 0, + alloy_json_rpc::ResponsePayload::Failure(error_payload) => error_payload.code, + }; + get_rpc_status_from_code(response_code) +} diff --git a/crates/provider/src/alloy/mod.rs b/crates/provider/src/alloy/mod.rs index 9f1170914..4f894364a 100644 --- a/crates/provider/src/alloy/mod.rs +++ b/crates/provider/src/alloy/mod.rs @@ -13,9 +13,11 @@ use alloy_provider::{Provider as AlloyProvider, ProviderBuilder}; use alloy_rpc_client::ClientBuilder; +use alloy_transport::layers::RetryBackoffService; use alloy_transport_http::Http; use anyhow::Context; use evm::AlloyEvmProvider; +use metrics::{AlloyMetricLayer, AlloyMetricMiddleware}; use reqwest::Client; use url::Url; @@ -34,8 +36,16 @@ pub fn new_alloy_evm_provider(rpc_url: &str) -> anyhow::Result anyhow::Result> + Clone> { +) -> anyhow::Result< + impl AlloyProvider>>> + Clone, +> { let url = Url::parse(rpc_url).context("invalid rpc url")?; - let client = ClientBuilder::default().http(url); - Ok(ProviderBuilder::new().on_client(client)) + let metric_layer = AlloyMetricLayer::default(); + let retry_layer = alloy_transport::layers::RetryBackoffLayer::new(10, 500, 0); + let client = ClientBuilder::default() + .layer(retry_layer) + .layer(metric_layer) + .http(url); + let provider = ProviderBuilder::new().on_client(client); + Ok(provider) } diff --git a/crates/provider/src/lib.rs b/crates/provider/src/lib.rs index 0b39c210b..817b0f703 100644 --- a/crates/provider/src/lib.rs +++ b/crates/provider/src/lib.rs @@ -31,7 +31,6 @@ pub use alloy::{ }, }, evm::AlloyEvmProvider, - metrics::AlloyMethodExtractor, new_alloy_evm_provider, new_alloy_provider, }; diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 787660d4c..3057ebece 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -32,6 +32,7 @@ serde.workspace = true strum.workspace = true url.workspace = true futures-util.workspace = true +http = "1.1.0" [dev-dependencies] mockall.workspace = true diff --git a/crates/rpc/src/lib.rs b/crates/rpc/src/lib.rs index 4c983e400..350e1c84e 100644 --- a/crates/rpc/src/lib.rs +++ b/crates/rpc/src/lib.rs @@ -38,5 +38,6 @@ pub use rundler::{RundlerApiClient, Settings as RundlerApiSettings}; mod task; pub use task::{Args as RpcTaskArgs, RpcTask}; +mod rpc_metrics; mod types; mod utils; diff --git a/crates/rpc/src/rpc_metrics.rs b/crates/rpc/src/rpc_metrics.rs index a4015a606..27c6a05cd 100644 --- a/crates/rpc/src/rpc_metrics.rs +++ b/crates/rpc/src/rpc_metrics.rs @@ -11,13 +11,155 @@ // You should have received a copy of the GNU General Public License along with Rundler. // If not, see https://www.gnu.org/licenses/. -use jsonrpsee::types::Request; -use rundler_types::task::traits::RequestExtractor; +use futures_util::{future::BoxFuture, FutureExt}; +use http::{Request as httpRequest, Response as httpResponse}; +use jsonrpsee::{ + server::middleware::rpc::RpcServiceT, + types::{ErrorCode, Request}, + MethodResponse, +}; +use rundler_types::task::{ + metric_recorder::MethodSessionLogger, + status_code::{get_http_status_from_code, HttpCode, RpcCode}, +}; +use tower::{Layer, Service}; -pub struct RPCMethodExtractor; +#[derive(Clone)] +pub(crate) struct RpcMetricsMiddlewareLayer { + service_name: String, +} + +impl RpcMetricsMiddlewareLayer { + pub(crate) fn new(service_name: String) -> Self { + Self { service_name } + } +} + +impl Layer for RpcMetricsMiddlewareLayer { + type Service = RpcMetricsMiddleware; + + fn layer(&self, service: S) -> Self::Service { + RpcMetricsMiddleware { + service, + service_name: self.service_name.clone(), + } + } +} + +#[derive(Clone)] +pub(crate) struct RpcMetricsMiddleware { + service: S, + service_name: String, +} + +impl<'a, S> RpcServiceT<'a> for RpcMetricsMiddleware +where + S: RpcServiceT<'a> + Send + Sync + Clone + 'static, +{ + type Future = BoxFuture<'a, MethodResponse>; + + fn call(&self, req: Request<'a>) -> Self::Future { + let method_logger = MethodSessionLogger::start( + self.service_name.clone(), + req.method_name().to_string(), + "rpc".to_string(), + ); + + let svc = self.service.clone(); + + async move { + let rp = svc.call(req).await; + method_logger.done(); + + if rp.is_success() { + method_logger.record_http(HttpCode::TwoHundreds); + method_logger.record_rpc(RpcCode::Success); + } else if let Some(error) = rp.as_error_code() { + let error_code: ErrorCode = error.into(); + let rpc_code = match error_code { + ErrorCode::ParseError => RpcCode::ParseError, + ErrorCode::OversizedRequest => RpcCode::InvalidRequest, + ErrorCode::InvalidRequest => RpcCode::InvalidRequest, + ErrorCode::MethodNotFound => RpcCode::MethodNotFound, + ErrorCode::ServerIsBusy => RpcCode::ResourceExhausted, + ErrorCode::InvalidParams => RpcCode::InvalidParams, + ErrorCode::InternalError => RpcCode::InternalError, + ErrorCode::ServerError(_) => RpcCode::ServerError, + }; + method_logger.record_rpc(rpc_code); + } else { + method_logger.record_rpc(RpcCode::Other); + } + + rp + } + .boxed() + } +} + +#[derive(Clone)] +pub(crate) struct HttpMetricMiddlewareLayer { + service_name: String, +} + +impl HttpMetricMiddlewareLayer { + pub(crate) fn new(service_name: String) -> Self { + Self { service_name } + } +} + +impl Layer for HttpMetricMiddlewareLayer { + type Service = HttpMetricMiddleware; + + fn layer(&self, service: S) -> Self::Service { + HttpMetricMiddleware { + service, + service_name: self.service_name.clone(), + } + } +} + +#[derive(Clone)] +pub(crate) struct HttpMetricMiddleware { + service: S, + service_name: String, +} + +impl Service> for HttpMetricMiddleware +where + S: Service, Response = httpResponse> + Send + Sync + Clone + 'static, + R: 'static, + S::Future: Send, + R: Send, +{ + type Response = S::Response; + + type Error = S::Error; + + type Future = BoxFuture<'static, Result>; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.service.poll_ready(cx) + } -impl RequestExtractor> for RPCMethodExtractor { - fn get_method_name(req: &Request<'static>) -> String { - req.method_name().to_string() + fn call(&mut self, req: httpRequest) -> Self::Future { + let method_logger = MethodSessionLogger::new( + self.service_name.clone(), + "jsonrpc-method".to_string(), + "http".to_string(), + ); + let mut svc = self.service.clone(); + async move { + let rp = svc.call(req).await; + let http_status = rp.as_ref().ok().map(|rp| rp.status()); + if let Some(status_code) = http_status { + method_logger.record_http(get_http_status_from_code(status_code.as_u16())); + } + rp + } + .boxed() } } diff --git a/crates/rpc/src/task.rs b/crates/rpc/src/task.rs index c33bc4889..85e180851 100644 --- a/crates/rpc/src/task.rs +++ b/crates/rpc/src/task.rs @@ -16,7 +16,7 @@ use std::{net::SocketAddr, sync::Arc, time::Duration}; use anyhow::Context; use futures_util::FutureExt; use jsonrpsee::{ - server::{middleware::http::ProxyGetRequestLayer, ServerBuilder}, + server::{middleware::http::ProxyGetRequestLayer, RpcServiceBuilder, ServerBuilder}, RpcModule, }; use rundler_provider::{EntryPointProvider, EvmProvider}; @@ -42,6 +42,7 @@ use crate::{ EthApiSettings, UserOperationEventProviderV0_6, UserOperationEventProviderV0_7, }, health::{HealthChecker, SystemApiServer}, + rpc_metrics::{HttpMetricMiddlewareLayer, RpcMetricsMiddlewareLayer}, rundler::{RundlerApi, RundlerApiServer, Settings as RundlerApiSettings}, types::ApiNamespace, }; @@ -203,9 +204,17 @@ where let http_middleware = tower::ServiceBuilder::new() // Proxy `GET /health` requests to internal `system_health` method. .layer(ProxyGetRequestLayer::new("/health", "system_health")?) - .timeout(self.args.rpc_timeout); + .timeout(self.args.rpc_timeout) + .layer(HttpMetricMiddlewareLayer::new( + "rundler-rpc-service-http".to_string(), + )); + + let rpc_metric_middleware = RpcServiceBuilder::new().layer(RpcMetricsMiddlewareLayer::new( + "rundler-rpc-service".to_string(), + )); let server = ServerBuilder::default() + .set_rpc_middleware(rpc_metric_middleware) .set_http_middleware(http_middleware) .max_connections(self.args.max_connections) // Set max request body size to 2x the max transaction size as none of our diff --git a/crates/task/Cargo.toml b/crates/task/Cargo.toml index 9fd2031f2..2678d45cd 100644 --- a/crates/task/Cargo.toml +++ b/crates/task/Cargo.toml @@ -21,6 +21,7 @@ metrics.workspace = true reth-tasks.workspace = true tokio.workspace = true tokio-util.workspace = true +pin-project.workspace = true tonic.workspace = true tower.workspace = true thiserror.workspace = true diff --git a/crates/task/src/grpc/grpc_metrics.rs b/crates/task/src/grpc/grpc_metrics.rs index f166384f4..ec6fa29fc 100644 --- a/crates/task/src/grpc/grpc_metrics.rs +++ b/crates/task/src/grpc/grpc_metrics.rs @@ -11,15 +11,123 @@ // You should have received a copy of the GNU General Public License along with Rundler. // If not, see https://www.gnu.org/licenses/. -use rundler_types::task::traits::RequestExtractor; +//! Middleware for recording metrics for gRPC requests. + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project::pin_project; +use rundler_types::task::{ + metric_recorder::MethodSessionLogger, + status_code::{get_http_status_from_code, HttpCode}, +}; use tonic::codegen::http; +use tower::{Layer, Service}; + +/// A layer for recording metrics for gRPC requests. +#[derive(Debug, Clone)] +pub struct GrpcMetricsLayer { + scope: String, +} + +impl GrpcMetricsLayer { + /// Create a new `GrpcMetricsLayer` middleware layer + pub fn new(scope: String) -> Self { + GrpcMetricsLayer { scope } + } +} + +impl Layer for GrpcMetricsLayer { + type Service = GrpcMetrics; + + fn layer(&self, service: S) -> Self::Service { + GrpcMetrics::new(service, self.scope.clone()) + } +} + +/// Service for recording metrics for gRPC requests. +#[derive(Clone, Debug)] +pub struct GrpcMetrics { + inner: S, + scope: String, +} + +impl GrpcMetrics { + /// Create a new `GrpcMetrics` middleware service. + pub fn new(inner: S, scope: String) -> Self { + Self { inner, scope } + } +} + +impl Service> for GrpcMetrics +where + S: Service, Response = http::Response> + Sync, +{ + type Response = S::Response; + type Error = S::Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // Our middleware doesn't care about backpressure so its ready as long + // as the inner service is ready. + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: http::Request) -> Self::Future { + let uri = request.uri().clone(); + let method_name = uri.path().split('/').last().unwrap_or("unknown"); + let method_logger = MethodSessionLogger::start( + self.scope.clone(), + method_name.to_string(), + "grpc".to_string(), + ); + ResponseFuture { + response_future: self.inner.call(request), + method_logger, + } + } +} + +/// Future returned by the middleware. +// checkout: https://github.com/tower-rs/tower/blob/master/guides/building-a-middleware-from-scratch.md +// for details on the use of Pin here +#[pin_project] +pub struct ResponseFuture { + #[pin] + response_future: F, + + method_logger: MethodSessionLogger, +} + +impl Future for ResponseFuture +where + F: Future, Error>>, +{ + type Output = Result, Error>; -/// http request method extractor. -pub struct HttpMethodExtractor; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let res = this.response_future.poll(cx); + match &res { + Poll::Ready(result) => { + this.method_logger.done(); + match result { + Ok(response) => { + let http_status = response.status(); + this.method_logger + .record_http(get_http_status_from_code(http_status.as_u16())); + } + _ => { + this.method_logger.record_http(HttpCode::FiveHundreds); + } + } + } + Poll::Pending => {} + }; -impl RequestExtractor> for HttpMethodExtractor { - fn get_method_name(req: &http::Request) -> String { - let method_name = req.uri().path().split('/').last().unwrap_or("unknown"); - method_name.to_string() + res } } diff --git a/crates/task/src/lib.rs b/crates/task/src/lib.rs index 5fff8c992..e156eb532 100644 --- a/crates/task/src/lib.rs +++ b/crates/task/src/lib.rs @@ -21,7 +21,6 @@ pub mod block_watcher; pub mod grpc; -pub mod metrics; pub mod server; pub use reth_tasks::{ diff --git a/crates/task/src/metrics.rs b/crates/task/src/metrics.rs deleted file mode 100644 index 490f2a1b7..000000000 --- a/crates/task/src/metrics.rs +++ /dev/null @@ -1,188 +0,0 @@ -// This file is part of Rundler. -// -// Rundler is free software: you can redistribute it and/or modify it under the -// terms of the GNU Lesser General Public License as published by the Free Software -// Foundation, either version 3 of the License, or (at your option) any later version. -// -// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -// See the GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License along with Rundler. -// If not, see https://www.gnu.org/licenses/. - -//! Middleware for recording metrics for requests. - -use std::{ - marker::PhantomData, - task::{Context, Poll}, - time::{Duration, Instant}, -}; - -use futures::{future::BoxFuture, FutureExt}; -use rundler_types::task::traits::RequestExtractor; -use tower::{Layer, Service}; - -/// tower network layer: https://github.com/tower-rs/tower/blob/master/guides/building-a-middleware-from-scratch.md -#[derive(Debug)] -pub struct MetricsLayer { - service_name: String, - protocol: String, - _request_extractor: PhantomData, - _request_type: PhantomData, -} - -impl MetricsLayer -where - T: RequestExtractor, -{ - /// Initialize a network layer wrappers the metric middleware. - pub fn new(service_name: String, protocol: String) -> Self { - MetricsLayer { - service_name, - protocol, - _request_extractor: PhantomData, - _request_type: PhantomData, - } - } -} - -impl Clone for MetricsLayer -where - T: RequestExtractor, -{ - fn clone(&self) -> Self { - Self { - service_name: self.service_name.clone(), - protocol: self.protocol.clone(), - _request_extractor: PhantomData, - _request_type: PhantomData, - } - } -} - -impl Layer for MetricsLayer -where - T: RequestExtractor, -{ - type Service = MetricsMiddleware; - - fn layer(&self, service: S) -> Self::Service { - Self::Service::new(service, self.service_name.clone(), self.protocol.clone()) - } -} - -/// Middleware implementation. -pub struct MetricsMiddleware { - inner: S, - service_name: String, - protocol: String, - _request_extractor: PhantomData, - _request_type: PhantomData, -} - -impl Clone for MetricsMiddleware -where - S: Clone, -{ - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - service_name: self.service_name.clone(), - protocol: self.protocol.clone(), - _request_extractor: PhantomData, - _request_type: PhantomData, - } - } -} - -impl MetricsMiddleware -where - T: RequestExtractor, -{ - /// Initialize a middleware. - pub fn new(inner: S, service_name: String, protocol: String) -> Self { - Self { - inner, - service_name: service_name.clone(), - protocol, - _request_extractor: PhantomData, - _request_type: PhantomData, - } - } -} - -impl Service for MetricsMiddleware -where - S: Service + Send + Clone + 'static, - S::Future: Send + 'static, - T: RequestExtractor + 'static, - R: Send + 'static, -{ - type Response = S::Response; - type Error = S::Error; - type Future = BoxFuture<'static, Result>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx) - } - - fn call(&mut self, request: R) -> Self::Future { - let method_name = T::get_method_name(&request); - - MethodMetrics::increment_num_requests(&self.service_name, &method_name, &self.protocol); - MethodMetrics::increment_open_requests( - self.service_name.as_str(), - method_name.as_str(), - self.protocol.as_str(), - ); - - let start = Instant::now(); - let mut svc = self.inner.clone(); - let service_name = self.service_name.clone(); - let protocol = self.protocol.clone(); - async move { - let rsp = svc.call(request).await; - MethodMetrics::record_request_latency( - &method_name, - &service_name, - &protocol, - start.elapsed(), - ); - MethodMetrics::decrement_open_requests(&method_name, &service_name, &protocol); - if rsp.is_err() { - MethodMetrics::increment_error_count(&method_name, &service_name, &protocol); - } - rsp - } - .boxed() - } -} -struct MethodMetrics {} - -impl MethodMetrics { - fn increment_num_requests(method_name: &str, service_name: &str, protocol: &str) { - metrics::counter!("num_requests", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocol" => protocol.to_string()).increment(1) - } - - fn increment_open_requests(method_name: &str, service_name: &str, protocol: &str) { - metrics::gauge!("open_requests", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocol" => protocol.to_string()).increment(1_f64) - } - - fn decrement_open_requests(method_name: &str, service_name: &str, protocol: &str) { - metrics::gauge!("open_requests", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocol" => protocol.to_string()).decrement(1_f64) - } - - fn increment_error_count(method_name: &str, service_name: &str, protocol: &str) { - metrics::counter!("open_requests", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocol" => protocol.to_string()).increment(1) - } - - fn record_request_latency( - method_name: &str, - service_name: &str, - protocol: &str, - latency: Duration, - ) { - metrics::histogram!("request_latency", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocol" => protocol.to_string()).record(latency) - } -} diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index bd0ea4fbf..1344efd43 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -27,12 +27,14 @@ serde.workspace = true serde_json.workspace = true strum.workspace = true thiserror.workspace = true +metrics.workspace = true mockall = {workspace = true, optional = true } [dev-dependencies] rundler-types = { workspace = true, features = ["test-utils"] } alloy-primitives = { workspace = true, features = ["rand"] } +cargo-husky.workspace = true [features] test-utils = [ "mockall" ] diff --git a/crates/types/src/task/metric_recorder.rs b/crates/types/src/task/metric_recorder.rs new file mode 100644 index 000000000..81815c782 --- /dev/null +++ b/crates/types/src/task/metric_recorder.rs @@ -0,0 +1,114 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. +use std::time::{Duration, Instant}; + +use metrics::{counter, gauge, histogram}; + +use super::status_code::{HttpCode, RpcCode}; + +/// method logger to log one method invoke session. +pub struct MethodSessionLogger { + start_time: Instant, + service_name: String, + method_name: String, + protocol: String, +} + +impl MethodSessionLogger { + /// create a session logger. + pub fn new(service_name: String, method_name: String, protocol: String) -> Self { + Self { + start_time: Instant::now(), + method_name, + service_name, + protocol, + } + } + + /// start the session. time will be initialized. + pub fn start(service_name: String, method_name: String, protocol: String) -> Self { + MethodMetrics::increment_num_requests(&method_name, &service_name, &protocol); + MethodMetrics::increment_open_requests(&method_name, &service_name, &protocol); + Self::new(service_name, method_name, protocol) + } + + /// record a rpc status code. + pub fn record_rpc(&self, rpc_code: RpcCode) { + MethodMetrics::increment_rpc_response_code(&self.method_name, &self.service_name, rpc_code); + } + + /// record a http status code. + pub fn record_http(&self, http_code: HttpCode) { + MethodMetrics::increment_http_response_code( + &self.method_name, + &self.service_name, + http_code, + ); + } + + /// end of the session. Record the session duration. + pub fn done(&self) { + MethodMetrics::record_request_latency( + &self.method_name, + &self.service_name, + &self.protocol, + self.start_time.elapsed(), + ); + MethodMetrics::decrement_open_requests( + &self.method_name, + &self.service_name, + &self.protocol, + ); + } +} + +struct MethodMetrics {} + +impl MethodMetrics { + pub(crate) fn increment_num_requests(method_name: &str, service_name: &str, protocol: &str) { + counter!("num_requests", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocol" => protocol.to_string()).increment(1) + } + + pub(crate) fn increment_open_requests(method_name: &str, service_name: &str, protocol: &str) { + gauge!("open_requests", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocol" => protocol.to_string()).increment(1_f64) + } + + pub(crate) fn decrement_open_requests(method_name: &str, service_name: &str, protocol: &str) { + gauge!("open_requests", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocol" => protocol.to_string()).decrement(1_f64) + } + + pub(crate) fn increment_http_response_code( + method_name: &str, + service_name: &str, + http_status_code: HttpCode, + ) { + counter!("http_response_status", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocol" => "http", "response_code" => http_status_code.to_string()).increment(1) + } + + pub(crate) fn increment_rpc_response_code( + method_name: &str, + service_name: &str, + rpc_status_code: RpcCode, + ) { + counter!("rpc_response_status", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocol" => "rpc", "response_code" => rpc_status_code.to_string()).increment(1) + } + + pub(crate) fn record_request_latency( + method_name: &str, + service_name: &str, + protocol: &str, + latency: Duration, + ) { + histogram!("request_latency", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocol" => protocol.to_string()).record(latency.as_millis() as f64) + } +} diff --git a/crates/types/src/task/mod.rs b/crates/types/src/task/mod.rs index 28e4ab747..9a884cf29 100644 --- a/crates/types/src/task/mod.rs +++ b/crates/types/src/task/mod.rs @@ -15,5 +15,7 @@ //! //! This module contains traits related to Rundler tasks. +/// recorder for metrics. +pub mod metric_recorder; /// method extractor trait. -pub mod traits; +pub mod status_code; diff --git a/crates/types/src/task/status_code.rs b/crates/types/src/task/status_code.rs new file mode 100644 index 000000000..610e0ccdd --- /dev/null +++ b/crates/types/src/task/status_code.rs @@ -0,0 +1,64 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. + +//! Response code. +use parse_display::Display; + +/// RPC status code. +#[derive(Display)] +#[display(style = "snake_case")] +#[doc(hidden)] +pub enum RpcCode { + Success, + Cancelled, + Other, + InvalidParams, + DeadlineExceed, + MethodNotFound, + AlreadyExist, + PermissionDenied, + ResourceExhausted, + FailedPrecondition, + Aborted, + OutOfRange, + Unimplemented, + InternalError, + Unavailable, + DataLoss, + Unauthenticated, + ParseError, + InvalidRequest, + ServerError, + InvalidArgument, +} + +/// HTTP status code. +#[doc(hidden)] +#[derive(Display)] +#[display(style = "snake_case")] +pub enum HttpCode { + TwoHundreds, + FourHundreds, + FiveHundreds, + Other, +} + +/// utility function to conert a http status code to HttpCode object. +pub fn get_http_status_from_code(code: u16) -> HttpCode { + match code { + x if (200..=299).contains(&x) => HttpCode::TwoHundreds, + x if (400..=499).contains(&x) => HttpCode::FourHundreds, + x if (500..=599).contains(&x) => HttpCode::FiveHundreds, + _ => HttpCode::Other, + } +} diff --git a/crates/types/src/task/traits.rs b/crates/types/src/task/traits.rs deleted file mode 100644 index 2ab3923c2..000000000 --- a/crates/types/src/task/traits.rs +++ /dev/null @@ -1,19 +0,0 @@ -// This file is part of Rundler. -// -// Rundler is free software: you can redistribute it and/or modify it under the -// terms of the GNU Lesser General Public License as published by the Free Software -// Foundation, either version 3 of the License, or (at your option) any later version. -// -// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -// See the GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License along with Rundler. -// If not, see https://www.gnu.org/licenses/. - -/// Trait to expose request method name. - -pub trait RequestExtractor: Sync + Send { - /// Get method name. - fn get_method_name(request: &R) -> String; -}