Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(middleware): add response extractor. #816

Merged
merged 9 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ serde = "1.0.210"
serde_json = "1.0.128"
rand = "0.8.5"
reqwest = { version = "0.12.8", default-features = false, features = ["rustls-tls"] }
rustls = "0.23.13"
thiserror = "1.0.64"
tokio = { version = "1.39.3", default-features = false, features = ["rt", "sync", "time"]}
tokio-util = "0.7.12"
Expand Down
8 changes: 2 additions & 6 deletions crates/pool/src/server/remote/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ use alloy_primitives::{Address, B256};
use async_trait::async_trait;
use futures_util::StreamExt;
use rundler_task::{
grpc::{grpc_metrics::HttpMethodExtractor, protos::from_bytes},
metrics::MetricsLayer,
grpc::{grpc_metrics::GrpcMetricsLayer, protos::from_bytes},
GracefulShutdown, TaskSpawner,
};
use rundler_types::{
Expand Down Expand Up @@ -83,10 +82,7 @@ pub(crate) async fn remote_mempool_server_task(
.set_serving::<OpPoolServer<OpPoolImpl>>()
.await;

let metrics_layer = MetricsLayer::<HttpMethodExtractor, _>::new(
"op_pool_service".to_string(),
"http-grpc".to_string(),
);
let metrics_layer = GrpcMetricsLayer::new("op_pool_service".to_string());

if let Err(e) = Server::builder()
.layer(metrics_layer)
Expand Down
3 changes: 3 additions & 0 deletions crates/provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ anyhow.workspace = true
async-trait.workspace = true
auto_impl.workspace = true
thiserror.workspace = true
futures-util.workspace = true
tower.workspace = true
tracing.workspace = true
url.workspace = true

mockall = {workspace = true, optional = true }

[features]
Expand Down
168 changes: 153 additions & 15 deletions crates/provider/src/alloy/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,160 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use alloy_json_rpc::RequestPacket;
/// Method extractor
use rundler_types::task::traits::RequestExtractor;

/// Method extractor for Alloy providers
#[derive(Clone, Copy)]
pub struct AlloyMethodExtractor;

impl RequestExtractor<RequestPacket> for AlloyMethodExtractor {
fn get_method_name(req: &RequestPacket) -> String {
match req {
RequestPacket::Single(request) => request.method().to_string(),
_ => {
// can't extract method name for batch.
"batch".to_string()
use std::task::{Context, Poll};

use alloy_json_rpc::{RequestPacket, ResponsePacket};
use alloy_transport::{BoxFuture, HttpError, TransportError, TransportErrorKind};
use futures_util::FutureExt;
use rundler_types::task::{
metric_recorder::MethodSessionLogger,
status_code::{HttpCode, RpcCode},
};
use tower::{Layer, Service};

/// Alloy provider metric layer.
#[derive(Default)]
pub(crate) struct AlloyMetricLayer {}

impl AlloyMetricLayer {}

impl<S> Layer<S> for AlloyMetricLayer
where
S: Service<RequestPacket, Response = ResponsePacket, Error = TransportError> + Sync,
{
type Service = AlloyMetricMiddleware<S>;

fn layer(&self, service: S) -> Self::Service {
AlloyMetricMiddleware::new(service)
}
}

pub struct AlloyMetricMiddleware<S> {
service: S,
}

impl<S> AlloyMetricMiddleware<S>
where
S: Service<RequestPacket, Response = ResponsePacket, Error = TransportError> + Sync,
{
/// carete an alloy provider metric layer.
pub fn new(service: S) -> Self {
Self { service }
}
}

impl<S> Clone for AlloyMetricMiddleware<S>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
service: self.service.clone(),
}
}
}

impl<S> Service<RequestPacket> for AlloyMetricMiddleware<S>
where
S: Service<RequestPacket, Response = ResponsePacket, Error = TransportError>
+ Sync
+ Send
+ Clone
+ 'static,
S::Future: Send,
{
type Response = ResponsePacket;
type Error = TransportError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}

fn call(&mut self, request: RequestPacket) -> Self::Future {
andysim3d marked this conversation as resolved.
Show resolved Hide resolved
let method_name = get_method_name(&request);
let method_logger = MethodSessionLogger::start(
"alloy_provider_client".to_string(),
method_name,
"rpc".to_string(),
);
let mut svc = self.service.clone();
async move {
let response = svc.call(request).await;
method_logger.done();
match &response {
Ok(resp) => {
method_logger.record_http(HttpCode::TwoHundreds);
method_logger.record_rpc(get_rpc_status_code(resp));
}
Err(e) => match e {
alloy_json_rpc::RpcError::ErrorResp(rpc_error) => {
method_logger.record_http(HttpCode::TwoHundreds);
method_logger.record_rpc(get_rpc_status_from_code(rpc_error.code));
}
alloy_json_rpc::RpcError::Transport(TransportErrorKind::HttpError(
HttpError { status, body: _ },
)) => {
method_logger.record_http(get_http_status_from_code(*status));
}
alloy_json_rpc::RpcError::NullResp => {
method_logger.record_http(HttpCode::TwoHundreds);
method_logger.record_rpc(RpcCode::Success);
}
_ => {}
},
}
response
}
.boxed()
}
}

/// Get the method name from the request
fn get_method_name(req: &RequestPacket) -> String {
match req {
RequestPacket::Single(request) => request.method().to_string(),
RequestPacket::Batch(_) => {
// can't extract method name for batch.
"batch".to_string()
}
}
}

fn get_rpc_status_from_code(code: i64) -> RpcCode {
match code {
-32700 => RpcCode::ParseError,
-32600 => RpcCode::InvalidRequest,
-32601 => RpcCode::MethodNotFound,
-32602 => RpcCode::InvalidParams,
-32603 => RpcCode::InternalError,
x if (-32099..=-32000).contains(&x) => RpcCode::ServerError,
_ => RpcCode::Other,
}
}

fn get_http_status_from_code(code: u16) -> HttpCode {
match code {
x if (200..=299).contains(&x) => HttpCode::TwoHundreds,
x if (400..=499).contains(&x) => HttpCode::FourHundreds,
x if (500..=599).contains(&x) => HttpCode::FiveHundreds,
_ => HttpCode::Other,
}
}

fn get_rpc_status_code(response_packet: &ResponsePacket) -> RpcCode {
let response: &alloy_json_rpc::Response = match response_packet {
ResponsePacket::Batch(resps) => {
if resps.is_empty() {
return RpcCode::Success;
}
&resps[0]
}
ResponsePacket::Single(resp) => resp,
};
let response_code: i64 = match &response.payload {
alloy_json_rpc::ResponsePayload::Success(_) => 0,
alloy_json_rpc::ResponsePayload::Failure(error_payload) => error_payload.code,
};
get_rpc_status_from_code(response_code)
}
16 changes: 13 additions & 3 deletions crates/provider/src/alloy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

use alloy_provider::{Provider as AlloyProvider, ProviderBuilder};
use alloy_rpc_client::ClientBuilder;
use alloy_transport::layers::RetryBackoffService;
use alloy_transport_http::Http;
use anyhow::Context;
use evm::AlloyEvmProvider;
use metrics::{AlloyMetricLayer, AlloyMetricMiddleware};
use reqwest::Client;
use url::Url;

Expand All @@ -34,8 +36,16 @@ pub fn new_alloy_evm_provider(rpc_url: &str) -> anyhow::Result<impl EvmProvider
/// Create a new alloy provider from a given RPC URL
pub fn new_alloy_provider(
rpc_url: &str,
) -> anyhow::Result<impl AlloyProvider<Http<Client>> + Clone> {
) -> anyhow::Result<
impl AlloyProvider<RetryBackoffService<AlloyMetricMiddleware<Http<Client>>>> + Clone,
> {
let url = Url::parse(rpc_url).context("invalid rpc url")?;
let client = ClientBuilder::default().http(url);
Ok(ProviderBuilder::new().on_client(client))
let metric_layer = AlloyMetricLayer::default();
let retry_layer = alloy_transport::layers::RetryBackoffLayer::new(10, 500, 0);
dancoombs marked this conversation as resolved.
Show resolved Hide resolved
let client = ClientBuilder::default()
.layer(retry_layer)
.layer(metric_layer)
.http(url);
let provider = ProviderBuilder::new().on_client(client);
Ok(provider)
}
1 change: 0 additions & 1 deletion crates/provider/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ pub use alloy::{
},
},
evm::AlloyEvmProvider,
metrics::AlloyMethodExtractor,
new_alloy_evm_provider, new_alloy_provider,
};

Expand Down
1 change: 1 addition & 0 deletions crates/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ serde.workspace = true
strum.workspace = true
url.workspace = true
futures-util.workspace = true
http = "1.1.0"

[dev-dependencies]
mockall.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ pub use rundler::{RundlerApiClient, Settings as RundlerApiSettings};
mod task;
pub use task::{Args as RpcTaskArgs, RpcTask};

mod rpc_metrics;
mod types;
mod utils;
Loading
Loading