From 58dc798a590e280963d1d8b1877ca2a17d8ebe27 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Fri, 13 Dec 2024 16:32:12 +0530 Subject: [PATCH 1/2] fix(`transport`): allow `RetryPolicy` to be set via layer --- crates/transport/src/layers/retry.rs | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/crates/transport/src/layers/retry.rs b/crates/transport/src/layers/retry.rs index e2eb712fcc7..7336d632646 100644 --- a/crates/transport/src/layers/retry.rs +++ b/crates/transport/src/layers/retry.rs @@ -25,13 +25,15 @@ use tokio::time::sleep; /// /// TransportError: crate::error::TransportError #[derive(Debug, Clone)] -pub struct RetryBackoffLayer { +pub struct RetryBackoffLayer { /// The maximum number of retries for rate limit errors max_rate_limit_retries: u32, /// The initial backoff in milliseconds initial_backoff: u64, /// The number of compute units per second for this provider compute_units_per_second: u64, + /// The [RetryPolicy] to use. Defaults to [RateLimitRetryPolicy] + policy: P, } impl RetryBackoffLayer { @@ -41,7 +43,19 @@ impl RetryBackoffLayer { initial_backoff: u64, compute_units_per_second: u64, ) -> Self { - Self { max_rate_limit_retries, initial_backoff, compute_units_per_second } + Self { + max_rate_limit_retries, + initial_backoff, + compute_units_per_second, + policy: RateLimitRetryPolicy, + } + } +} + +impl RetryBackoffLayer

{ + /// Sets the retry policy for the layer. + pub fn with_policy(self, policy: P) -> Self { + Self { policy, ..self } } } @@ -78,7 +92,7 @@ impl Layer for RetryBackoffLayer { fn layer(&self, inner: S) -> Self::Service { RetryBackoffService { inner, - policy: RateLimitRetryPolicy, + policy: self.policy, max_rate_limit_retries: self.max_rate_limit_retries, initial_backoff: self.initial_backoff, compute_units_per_second: self.compute_units_per_second, @@ -90,11 +104,11 @@ impl Layer for RetryBackoffLayer { /// A Tower Service used by the RetryBackoffLayer that is responsible for retrying requests based /// on the error type. See [TransportError] and [RateLimitRetryPolicy]. #[derive(Debug, Clone)] -pub struct RetryBackoffService { +pub struct RetryBackoffService { /// The inner service inner: S, - /// The retry policy - policy: RateLimitRetryPolicy, + /// The [RetryPolicy] to use. + policy: P, /// The maximum number of retries for rate limit errors max_rate_limit_retries: u32, /// The initial backoff in milliseconds From eac210caad5835df6b7de6a92e76ef9338f4828f Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Fri, 13 Dec 2024 16:57:24 +0530 Subject: [PATCH 2/2] fix --- crates/provider/src/provider/trait.rs | 29 ++++++++++++++++++++++++++- crates/transport/src/layers/retry.rs | 24 +++++++++++++--------- 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index b21b372c19c..74420f10e2c 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1119,9 +1119,10 @@ mod tests { use alloy_network::{AnyNetwork, EthereumWallet, TransactionBuilder}; use alloy_node_bindings::Anvil; use alloy_primitives::{address, b256, bytes, keccak256}; - use alloy_rpc_client::BuiltInConnectionString; + use alloy_rpc_client::{BuiltInConnectionString, RpcClient}; use alloy_rpc_types_eth::{request::TransactionRequest, Block}; use alloy_signer_local::PrivateKeySigner; + use alloy_transport::layers::{RetryBackoffLayer, RetryPolicy}; // For layer transport tests #[cfg(feature = "hyper")] use alloy_transport_http::{ @@ -1427,6 +1428,32 @@ mod tests { } } + #[tokio::test] + async fn test_custom_retry_policy() { + #[derive(Debug, Clone)] + struct CustomPolicy; + impl RetryPolicy for CustomPolicy { + fn should_retry(&self, _err: &alloy_transport::TransportError) -> bool { + true + } + + fn backoff_hint( + &self, + _error: &alloy_transport::TransportError, + ) -> Option { + None + } + } + + let retry_layer = RetryBackoffLayer::new_with_policy(10, 100, 10000, CustomPolicy); + let anvil = Anvil::new().spawn(); + let client = RpcClient::builder().layer(retry_layer).http(anvil.endpoint_url()); + + let provider = RootProvider::<_, Ethereum>::new(client); + let num = provider.get_block_number().await.unwrap(); + assert_eq!(0, num); + } + #[tokio::test] async fn test_send_tx() { let provider = ProviderBuilder::new().on_anvil(); diff --git a/crates/transport/src/layers/retry.rs b/crates/transport/src/layers/retry.rs index 7336d632646..a6114b2ff38 100644 --- a/crates/transport/src/layers/retry.rs +++ b/crates/transport/src/layers/retry.rs @@ -37,7 +37,7 @@ pub struct RetryBackoffLayer { } impl RetryBackoffLayer { - /// Creates a new retry layer with the given parameters. + /// Creates a new retry layer with the given parameters and the default [RateLimitRetryPolicy]. pub const fn new( max_rate_limit_retries: u32, initial_backoff: u64, @@ -53,9 +53,14 @@ impl RetryBackoffLayer { } impl RetryBackoffLayer

{ - /// Sets the retry policy for the layer. - pub fn with_policy(self, policy: P) -> Self { - Self { policy, ..self } + /// Creates a new retry layer with the given parameters and [RetryPolicy]. + pub const fn new_with_policy( + max_rate_limit_retries: u32, + initial_backoff: u64, + compute_units_per_second: u64, + policy: P, + ) -> Self { + Self { max_rate_limit_retries, initial_backoff, compute_units_per_second, policy } } } @@ -86,13 +91,13 @@ impl RetryPolicy for RateLimitRetryPolicy { } } -impl Layer for RetryBackoffLayer { - type Service = RetryBackoffService; +impl Layer for RetryBackoffLayer

{ + type Service = RetryBackoffService; fn layer(&self, inner: S) -> Self::Service { RetryBackoffService { inner, - policy: self.policy, + policy: self.policy.clone(), max_rate_limit_retries: self.max_rate_limit_retries, initial_backoff: self.initial_backoff, compute_units_per_second: self.compute_units_per_second, @@ -119,18 +124,19 @@ pub struct RetryBackoffService { requests_enqueued: Arc, } -impl RetryBackoffService { +impl RetryBackoffService { const fn initial_backoff(&self) -> Duration { Duration::from_millis(self.initial_backoff) } } -impl Service for RetryBackoffService +impl Service for RetryBackoffService where S: Service, Error = TransportError> + Send + 'static + Clone, + P: RetryPolicy + Clone + 'static, { type Response = ResponsePacket; type Error = TransportError;