From 42595f8ffbe0f011028d0ecb7a005ccb97354e79 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 8 Jun 2024 07:58:32 +0900 Subject: [PATCH 1/2] Add test for next_peer_request_ready --- Cargo.lock | 1 + beacon_node/lighthouse_network/Cargo.toml | 1 + .../src/rpc/self_limiter.rs | 69 +++++++++++++++++++ 3 files changed, 71 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 1a6784b84f9..524a2bd1e0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4964,6 +4964,7 @@ dependencies = [ "libp2p-mplex", "lighthouse_metrics", "lighthouse_version", + "logging", "lru", "lru_cache", "parking_lot 0.12.3", diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index e850bced16b..56a8fe99c70 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -60,6 +60,7 @@ tempfile = { workspace = true } quickcheck = { workspace = true } quickcheck_macros = { workspace = true } async-channel = { workspace = true } +logging = { workspace = true } [features] libp2p-websocket = [] diff --git a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs index be4c572308d..1016d959256 100644 --- a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs @@ -205,3 +205,72 @@ impl SelfRateLimiter { Poll::Pending } } + +#[cfg(test)] +mod tests { + use crate::rpc::config::{OutboundRateLimiterConfig, RateLimiterConfig}; + use crate::rpc::rate_limiter::Quota; + use crate::rpc::self_limiter::SelfRateLimiter; + use crate::rpc::{OutboundRequest, Ping, Protocol}; + use crate::service::api_types::RequestId; + use libp2p::PeerId; + use std::time::Duration; + use types::MainnetEthSpec; + + /// Test that `next_peer_request_ready` correctly maintains the queue. + #[tokio::test] + async fn test_next_peer_request_ready() { + let log = logging::test_logger(); + let config = OutboundRateLimiterConfig(RateLimiterConfig { + ping_quota: Quota::n_every(1, 2), + ..Default::default() + }); + let mut limiter: SelfRateLimiter, MainnetEthSpec> = + SelfRateLimiter::new(config, log).unwrap(); + let peer_id = PeerId::random(); + + for i in 1..=5 { + let _ = limiter.allows( + peer_id, + RequestId::Application(i), + OutboundRequest::Ping(Ping { data: i }), + ); + } + + { + let queue = limiter + .delayed_requests + .get(&(peer_id, Protocol::Ping)) + .unwrap(); + assert_eq!(4, queue.len()); + + // Check that requests in the queue are ordered in the sequence 2, 3, 4, 5. + let mut iter = queue.iter(); + for i in 2..=5 { + assert_eq!(iter.next().unwrap().request_id, RequestId::Application(i)); + } + + assert_eq!(limiter.ready_requests.len(), 0); + } + + // Wait until the tokens have been regenerated, then run `next_peer_request_ready`. + tokio::time::sleep(Duration::from_secs(3)).await; + limiter.next_peer_request_ready(peer_id, Protocol::Ping); + + { + let queue = limiter + .delayed_requests + .get(&(peer_id, Protocol::Ping)) + .unwrap(); + assert_eq!(3, queue.len()); + + // Check that requests in the queue are ordered in the sequence 3, 4, 5. + let mut iter = queue.iter(); + for i in 3..=5 { + assert_eq!(iter.next().unwrap().request_id, RequestId::Application(i)); + } + + assert_eq!(limiter.ready_requests.len(), 1); + } + } +} From e77db154730ce079b769db4c802fe8c77485df7d Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 8 Jun 2024 07:59:00 +0900 Subject: [PATCH 2/2] Use push_front to requeue --- beacon_node/lighthouse_network/src/rpc/self_limiter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs index 1016d959256..115ae45a98e 100644 --- a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs @@ -147,7 +147,7 @@ impl SelfRateLimiter { Err((rate_limited_req, wait_time)) => { let key = (peer_id, protocol); self.next_peer_request.insert(key, wait_time); - queued_requests.push_back(rate_limited_req); + queued_requests.push_front(rate_limited_req); // If one fails just wait for the next window that allows sending requests. return; }