From 9035ecfa36f3f6d70f2307a4613a9f5637f039d3 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 14 Jan 2025 16:42:38 +0200 Subject: [PATCH 1/2] network/libp2p: Investigate requests that don't timeout Signed-off-by: Alexandru Vasile --- .../client/network/src/request_responses.rs | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 6c2631924df4a..f6e7c2be1fa86 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -244,6 +244,7 @@ pub struct OutgoingResponse { } /// Information stored about a pending request. +#[derive(Debug)] struct PendingRequest { started_at: Instant, response_tx: oneshot::Sender, ProtocolName), RequestFailure>>, @@ -359,6 +360,9 @@ pub struct RequestResponsesBehaviour { /// Primarily used to get a reputation of a node. peer_store: Arc, + + /// Interval to check for periodic requests. + periodic_request_check: tokio::time::Interval, } /// Generated by the response builder and waiting to be processed. @@ -412,6 +416,7 @@ impl RequestResponsesBehaviour { pending_responses_arrival_time: Default::default(), send_feedback: Default::default(), peer_store, + periodic_request_check: tokio::time::interval(Duration::from_secs(60)), }) } @@ -659,6 +664,36 @@ impl NetworkBehaviour for RequestResponsesBehaviour { params: &mut impl PollParameters, ) -> Poll>> { 'poll_all: loop { + if let Poll::Ready(_) = self.periodic_request_check.poll_tick(cx) { + let hanged_requests: Vec<_> = self + .pending_requests + .iter() + .filter_map(|(id, req)| { + if req.started_at.elapsed() > Duration::from_secs(60) { + Some((id.clone(), req.clone())) + } else { + None + } + }) + .collect(); + + if !hanged_requests.is_empty() { + log::warn!( + target: "sub-libp2p", + "Requests hanged for more than 60 seconds: {:?}", + hanged_requests + ); + + // TODO: Force close the requests to avoid possible mem leaks + // and resume operation of above subsystems. + + // for (id, req) in hanged_requests { + // let _ = req.response_tx.send(Err(RequestFailure::Timeout)); + // self.pending_requests.remove(&id); + // } + } + } + // Poll to see if any response is ready to be sent back. while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) { let RequestProcessingOutcome { From a078d2d500efe8d0928c71b3373837c2388e9be4 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 14 Jan 2025 16:10:33 +0000 Subject: [PATCH 2/2] network/libp2p: Adjust debug info Signed-off-by: Alexandru Vasile --- substrate/client/network/src/request_responses.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index f6e7c2be1fa86..ce1f17f889609 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -244,7 +244,6 @@ pub struct OutgoingResponse { } /// Information stored about a pending request. -#[derive(Debug)] struct PendingRequest { started_at: Instant, response_tx: oneshot::Sender, ProtocolName), RequestFailure>>, @@ -670,7 +669,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { .iter() .filter_map(|(id, req)| { if req.started_at.elapsed() > Duration::from_secs(60) { - Some((id.clone(), req.clone())) + Some((id.clone(), req.started_at, req.fallback_request.clone())) } else { None }