From 1db7fc364edc1a2bed8fcceb84b0ea5c5c5b1675 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 11 May 2023 22:10:06 +0200 Subject: [PATCH 01/19] Never close connections in request-response --- Cargo.lock | 2 ++ examples/file-sharing/Cargo.toml | 3 ++- examples/file-sharing/src/network.rs | 4 ++-- protocols/request-response/Cargo.toml | 1 + protocols/request-response/src/handler.rs | 8 +------- 5 files changed, 8 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e850e031a0..0b237cea2e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1594,6 +1594,7 @@ dependencies = [ "futures", "libp2p", "multiaddr", + "void", ] [[package]] @@ -2965,6 +2966,7 @@ dependencies = [ "libp2p-yamux", "rand 0.8.5", "smallvec", + "void", ] [[package]] diff --git a/examples/file-sharing/Cargo.toml b/examples/file-sharing/Cargo.toml index 9f2ab176db9..ad6a410c2cc 100644 --- a/examples/file-sharing/Cargo.toml +++ b/examples/file-sharing/Cargo.toml @@ -13,4 +13,5 @@ either = "1.8" env_logger = "0.10" futures = "0.3.28" libp2p = { path = "../../libp2p", features = ["async-std", "dns", "kad", "noise", "macros", "request-response", "tcp", "websocket", "yamux"] } -multiaddr = { version = "0.17.1" } \ No newline at end of file +multiaddr = { version = "0.17.1" } +void = "1.0.2" diff --git a/examples/file-sharing/src/network.rs b/examples/file-sharing/src/network.rs index 404aef01c4d..9b3b87758bb 100644 --- a/examples/file-sharing/src/network.rs +++ b/examples/file-sharing/src/network.rs @@ -16,7 +16,7 @@ use libp2p::{ multiaddr::Protocol, noise, request_response::{self, ProtocolSupport, RequestId, ResponseChannel}, - swarm::{NetworkBehaviour, StreamUpgradeError, Swarm, SwarmBuilder, SwarmEvent}, + swarm::{NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}, tcp, yamux, PeerId, Transport, }; @@ -216,7 +216,7 @@ impl EventLoop { async fn handle_event( &mut self, - event: SwarmEvent, io::Error>>, + event: SwarmEvent>, ) { match event { SwarmEvent::Behaviour(ComposedEvent::Kademlia( diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 3bde34e2cc1..a7e14f70834 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -19,6 +19,7 @@ libp2p-swarm = { workspace = true } libp2p-identity = { workspace = true } rand = "0.8" smallvec = "1.6.1" +void = "1.0.2" [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 3a323d75edc..cb866fb38c4 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -241,7 +241,7 @@ where { type InEvent = RequestProtocol; type OutEvent = Event; - type Error = StreamUpgradeError; + type Error = void::Void; type InboundProtocol = ResponseProtocol; type OutboundProtocol = RequestProtocol; type OutboundOpenInfo = RequestId; @@ -295,12 +295,6 @@ where cx: &mut Context<'_>, ) -> Poll, RequestId, Self::OutEvent, Self::Error>> { - // Check for a pending (fatal) error. - if let Some(err) = self.pending_error.take() { - // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::Close(err)); - } - // Drain pending events. if let Some(event) = self.pending_events.pop_front() { return Poll::Ready(ConnectionHandlerEvent::Custom(event)); From 10508d64fb0a536b54f9455217155f2445f5a5a7 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 11 May 2023 22:13:41 +0200 Subject: [PATCH 02/19] Log error instead --- Cargo.lock | 1 + protocols/request-response/CHANGELOG.md | 5 +++++ protocols/request-response/Cargo.toml | 1 + protocols/request-response/src/handler.rs | 18 ++++++++---------- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b237cea2e1..6e1384c81da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2964,6 +2964,7 @@ dependencies = [ "libp2p-swarm-test", "libp2p-tcp", "libp2p-yamux", + "log", "rand 0.8.5", "smallvec", "void", diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index b2adb03c67b..cc9e2ea2291 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -8,9 +8,14 @@ These variants are no longer constructed. See [PR 3605]. +- Don't close connections if individual streams fail. + Log the error instead. + See [PR XXXX]. + [PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 [PR 3702]: https://github.com/libp2p/rust-libp2p/pull/3702 +[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX ## 0.24.1 diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index a7e14f70834..55108a2411f 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -20,6 +20,7 @@ libp2p-identity = { workspace = true } rand = "0.8" smallvec = "1.6.1" void = "1.0.2" +log = "0.4.17" [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index cb866fb38c4..c2da7dc9da4 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -39,7 +39,7 @@ use libp2p_swarm::{ use smallvec::SmallVec; use std::{ collections::VecDeque, - fmt, io, + fmt, sync::{ atomic::{AtomicU64, Ordering}, Arc, @@ -65,8 +65,6 @@ where substream_timeout: Duration, /// The current connection keep-alive. keep_alive: KeepAlive, - /// A pending fatal error that results in the connection being closed. - pending_error: Option>, /// Queue of events to emit in `poll()`. pending_events: VecDeque>, /// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`. @@ -107,7 +105,6 @@ where outbound: VecDeque::new(), inbound: FuturesUnordered::new(), pending_events: VecDeque::new(), - pending_error: None, inbound_request_id, } } @@ -151,21 +148,22 @@ where self.pending_events .push_back(Event::OutboundUnsupportedProtocols(info)); } - _ => { - // Anything else is considered a fatal error or misbehaviour of - // the remote peer and results in closing the connection. - self.pending_error = Some(error); + StreamUpgradeError::Apply(e) => { + log::debug!("outbound stream {info} failed: {e}"); + } + StreamUpgradeError::Io(e) => { + log::debug!("outbound stream {info} failed: {e}"); } } } fn on_listen_upgrade_error( &mut self, - ListenUpgradeError { error, .. }: ListenUpgradeError< + ListenUpgradeError { error, info }: ListenUpgradeError< ::InboundOpenInfo, ::InboundProtocol, >, ) { - self.pending_error = Some(StreamUpgradeError::Apply(error)); + log::debug!("inbound stream {info} failed: {error}"); } } From f4f1e5839222c323cac89189103ec06ad777f043 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 11 May 2023 23:21:48 +0200 Subject: [PATCH 03/19] Refactor `request-response` to not use upgrade mechanism --- Cargo.lock | 1 + protocols/request-response/Cargo.toml | 1 + protocols/request-response/src/handler.rs | 236 ++++++++++++------ .../request-response/src/handler/protocol.rs | 125 ++-------- protocols/request-response/src/lib.rs | 15 +- 5 files changed, 190 insertions(+), 188 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6e1384c81da..5a9d7609e23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2956,6 +2956,7 @@ dependencies = [ "async-trait", "env_logger 0.10.0", "futures", + "futures-timer", "instant", "libp2p-core", "libp2p-identity", diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 55108a2411f..6b2bed3235b 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -21,6 +21,7 @@ rand = "0.8" smallvec = "1.6.1" void = "1.0.2" log = "0.4.17" +futures-timer = "3.0.2" [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index c2da7dc9da4..2202513970e 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -23,7 +23,7 @@ pub(crate) mod protocol; pub use protocol::ProtocolSupport; use crate::codec::Codec; -use crate::handler::protocol::{RequestProtocol, ResponseProtocol}; +use crate::handler::protocol::Protocol; use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered}; @@ -37,9 +37,10 @@ use libp2p_swarm::{ SubstreamProtocol, }; use smallvec::SmallVec; +use std::pin::pin; use std::{ collections::VecDeque, - fmt, + fmt, io, sync::{ atomic::{AtomicU64, Ordering}, Arc, @@ -68,7 +69,9 @@ where /// Queue of events to emit in `poll()`. pending_events: VecDeque>, /// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`. - outbound: VecDeque>, + pending_outbound: VecDeque>, + + requested_outbound: VecDeque>, /// Inbound upgrades waiting for the incoming request. inbound: FuturesUnordered< BoxFuture< @@ -83,6 +86,8 @@ where >, >, inbound_request_id: Arc, + + worker_streams: FuturesUnordered, io::Error>>>, } impl Handler @@ -102,68 +107,155 @@ where keep_alive: KeepAlive::Yes, keep_alive_timeout, substream_timeout, - outbound: VecDeque::new(), + pending_outbound: VecDeque::new(), + requested_outbound: Default::default(), inbound: FuturesUnordered::new(), pending_events: VecDeque::new(), inbound_request_id, + worker_streams: Default::default(), } } fn on_fully_negotiated_inbound( &mut self, FullyNegotiatedInbound { - protocol: sent, - info: request_id, + protocol: (mut stream, protocol), + info: (), }: FullyNegotiatedInbound< ::InboundProtocol, ::InboundOpenInfo, >, ) { - if sent { - self.pending_events - .push_back(Event::ResponseSent(request_id)) - } else { - self.pending_events - .push_back(Event::ResponseOmission(request_id)) - } + let mut codec = self.codec.clone(); + + // A channel for notifying the handler when the inbound + // upgrade received the request. + let (rq_send, rq_recv) = oneshot::channel(); + + // A channel for notifying the inbound upgrade when the + // response is sent. + let (rs_send, rs_recv) = oneshot::channel(); + + // The handler waits for the request to come in. It then emits + // `Event::Request` together with a + // `ResponseChannel`. + self.inbound + .push(rq_recv.map_ok(move |rq| (rq, rs_send)).boxed()); + + let request_id = RequestId(self.inbound_request_id.fetch_add(1, Ordering::Relaxed)); + let timeout = self.substream_timeout; + + let recv = async move { + let read = codec.read_request(&protocol, &mut stream); + let request = read.await?; + match rq_send.send((request_id, request)) { + Ok(()) => {} + Err(_) => { + panic!("Expect request receiver to be alive i.e. protocol handler to be alive.",) + } + } + + if let Ok(response) = rs_recv.await { + let write = codec.write_response(&protocol, &mut stream, response); + write.await?; + + stream.close().await?; + // Response was sent. Indicate to handler to emit a `ResponseSent` event. + Ok(Event::ResponseSent(request_id)) + } else { + stream.close().await?; + // No response was sent. Indicate to handler to emit a `ResponseOmission` event. + Ok(Event::ResponseOmission(request_id)) + } + }; + + self.worker_streams.push(Box::pin(async move { + match future::select(pin!(recv), futures_timer::Delay::new(timeout)).await { + future::Either::Left((recv, _)) => recv, + future::Either::Right(((), _)) => Err(io::ErrorKind::TimedOut.into()), + } + })); + } + + fn on_fully_negotiated_outbound( + &mut self, + FullyNegotiatedOutbound { + protocol: (mut stream, protocol), + info: (), + }: FullyNegotiatedOutbound< + ::OutboundProtocol, + ::OutboundOpenInfo, + >, + ) { + let message = self + .requested_outbound + .pop_front() + .expect("negotiated a stream without a pending message"); + + let mut codec = self.codec.clone(); + let timeout = self.substream_timeout; + let request_id = message.request_id; + + let send = async move { + let write = codec.write_request(&protocol, &mut stream, message.request); + write.await?; + stream.close().await?; + let read = codec.read_response(&protocol, &mut stream); + let response = read.await?; + + Ok(Event::Response { + request_id, + response, + }) + }; + + self.worker_streams.push(Box::pin(async move { + match future::select(pin!(send), futures_timer::Delay::new(timeout)).await { + future::Either::Left((recv, _)) => recv, + future::Either::Right(((), _)) => Ok(Event::OutboundTimeout(request_id)), + } + })); } fn on_dial_upgrade_error( &mut self, - DialUpgradeError { info, error }: DialUpgradeError< + DialUpgradeError { error, info: () }: DialUpgradeError< ::OutboundOpenInfo, ::OutboundProtocol, >, ) { match error { StreamUpgradeError::Timeout => { - self.pending_events.push_back(Event::OutboundTimeout(info)); + unreachable!("`future::Ready` never times out") } StreamUpgradeError::NegotiationFailed => { + let message = self + .requested_outbound + .pop_front() + .expect("negotiated a stream without a pending message"); + // The remote merely doesn't support the protocol(s) we requested. // This is no reason to close the connection, which may // successfully communicate with other protocols already. // An event is reported to permit user code to react to the fact that // the remote peer does not support the requested protocol(s). self.pending_events - .push_back(Event::OutboundUnsupportedProtocols(info)); - } - StreamUpgradeError::Apply(e) => { - log::debug!("outbound stream {info} failed: {e}"); + .push_back(Event::OutboundUnsupportedProtocols(message.request_id)); } + StreamUpgradeError::Apply(e) => void::unreachable(e), StreamUpgradeError::Io(e) => { - log::debug!("outbound stream {info} failed: {e}"); + log::debug!("outbound stream failed: {e}"); } } } fn on_listen_upgrade_error( &mut self, - ListenUpgradeError { error, info }: ListenUpgradeError< + ListenUpgradeError { error, .. }: ListenUpgradeError< ::InboundOpenInfo, ::InboundProtocol, >, ) { - log::debug!("inbound stream {info} failed: {error}"); + void::unreachable(error) } } @@ -233,55 +325,45 @@ impl fmt::Debug for Event { } } +pub struct OutboundMessage { + pub(crate) request_id: RequestId, + pub(crate) request: TCodec::Request, + pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>, +} + +impl fmt::Debug for OutboundMessage +where + TCodec: Codec, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("OutboundMessage").finish_non_exhaustive() + } +} + impl ConnectionHandler for Handler where TCodec: Codec + Send + Clone + 'static, { - type InEvent = RequestProtocol; + type InEvent = OutboundMessage; type OutEvent = Event; type Error = void::Void; - type InboundProtocol = ResponseProtocol; - type OutboundProtocol = RequestProtocol; - type OutboundOpenInfo = RequestId; - type InboundOpenInfo = RequestId; + type InboundProtocol = Protocol; + type OutboundProtocol = Protocol; + type OutboundOpenInfo = (); + type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { - // A channel for notifying the handler when the inbound - // upgrade received the request. - let (rq_send, rq_recv) = oneshot::channel(); - - // A channel for notifying the inbound upgrade when the - // response is sent. - let (rs_send, rs_recv) = oneshot::channel(); - - let request_id = RequestId(self.inbound_request_id.fetch_add(1, Ordering::Relaxed)); - - // By keeping all I/O inside the `ResponseProtocol` and thus the - // inbound substream upgrade via above channels, we ensure that it - // is all subject to the configured timeout without extra bookkeeping - // for inbound substreams as well as their timeouts and also make the - // implementation of inbound and outbound upgrades symmetric in - // this sense. - let proto = ResponseProtocol { - protocols: self.inbound_protocols.clone(), - codec: self.codec.clone(), - request_sender: rq_send, - response_receiver: rs_recv, - request_id, - }; - - // The handler waits for the request to come in. It then emits - // `Event::Request` together with a - // `ResponseChannel`. - self.inbound - .push(rq_recv.map_ok(move |rq| (rq, rs_send)).boxed()); - - SubstreamProtocol::new(proto, request_id).with_timeout(self.substream_timeout) + SubstreamProtocol::new( + Protocol { + protocols: self.inbound_protocols.clone(), + }, + (), + ) } fn on_behaviour_event(&mut self, request: Self::InEvent) { self.keep_alive = KeepAlive::Yes; - self.outbound.push_back(request); + self.pending_outbound.push_back(request); } fn connection_keep_alive(&self) -> KeepAlive { @@ -291,8 +373,17 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll, RequestId, Self::OutEvent, Self::Error>> + ) -> Poll, (), Self::OutEvent, Self::Error>> { + while let Poll::Ready(Some(result)) = self.worker_streams.poll_next_unpin(cx) { + match result { + Ok(event) => return Poll::Ready(ConnectionHandlerEvent::Custom(event)), + Err(e) => { + log::debug!("worker stream failed: {e}") + } + } + } + // Drain pending events. if let Some(event) = self.pending_events.pop_front() { return Poll::Ready(ConnectionHandlerEvent::Custom(event)); @@ -321,18 +412,19 @@ where } // Emit outbound requests. - if let Some(request) = self.outbound.pop_front() { - let info = request.request_id; + if let Some(request) = self.pending_outbound.pop_front() { + let protocols = request.protocols.clone(); + self.requested_outbound.push_back(request); + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(request, info) - .with_timeout(self.substream_timeout), + protocol: SubstreamProtocol::new(Protocol { protocols }, ()), }); } - debug_assert!(self.outbound.is_empty()); + debug_assert!(self.pending_outbound.is_empty()); - if self.outbound.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { - self.outbound.shrink_to_fit(); + if self.pending_outbound.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { + self.pending_outbound.shrink_to_fit(); } if self.inbound.is_empty() && self.keep_alive.is_yes() { @@ -359,14 +451,8 @@ where ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { self.on_fully_negotiated_inbound(fully_negotiated_inbound) } - ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { - protocol: response, - info: request_id, - }) => { - self.pending_events.push_back(Event::Response { - request_id, - response, - }); + ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { + self.on_fully_negotiated_outbound(fully_negotiated_outbound) } ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { self.on_dial_upgrade_error(dial_upgrade_error) diff --git a/protocols/request-response/src/handler/protocol.rs b/protocols/request-response/src/handler/protocol.rs index 84ef365734f..c4a6c2b69f5 100644 --- a/protocols/request-response/src/handler/protocol.rs +++ b/protocols/request-response/src/handler/protocol.rs @@ -23,14 +23,10 @@ //! receives a request and sends a response, whereas the //! outbound upgrade send a request and receives a response. -use crate::codec::Codec; -use crate::RequestId; - -use futures::{channel::oneshot, future::BoxFuture, prelude::*}; +use futures::future::{ready, Ready}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_swarm::NegotiatedSubstream; use smallvec::SmallVec; -use std::{fmt, io}; /// The level of support for a particular protocol. #[derive(Debug, Clone)] @@ -65,22 +61,15 @@ impl ProtocolSupport { /// /// Receives a request and sends a response. #[derive(Debug)] -pub struct ResponseProtocol -where - TCodec: Codec, -{ - pub(crate) codec: TCodec, - pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>, - pub(crate) request_sender: oneshot::Sender<(RequestId, TCodec::Request)>, - pub(crate) response_receiver: oneshot::Receiver, - pub(crate) request_id: RequestId, +pub struct Protocol

{ + pub(crate) protocols: SmallVec<[P; 2]>, } -impl UpgradeInfo for ResponseProtocol +impl

UpgradeInfo for Protocol

where - TCodec: Codec, + P: AsRef + Clone, { - type Info = TCodec::Protocol; + type Info = P; type InfoIter = smallvec::IntoIter<[Self::Info; 2]>; fn protocol_info(&self) -> Self::InfoIter { @@ -88,102 +77,28 @@ where } } -impl InboundUpgrade for ResponseProtocol +impl

InboundUpgrade for Protocol

where - TCodec: Codec + Send + 'static, + P: AsRef + Clone, { - type Output = bool; - type Error = io::Error; - type Future = BoxFuture<'static, Result>; - - fn upgrade_inbound( - mut self, - mut io: NegotiatedSubstream, - protocol: Self::Info, - ) -> Self::Future { - async move { - let read = self.codec.read_request(&protocol, &mut io); - let request = read.await?; - match self.request_sender.send((self.request_id, request)) { - Ok(()) => {}, - Err(_) => panic!( - "Expect request receiver to be alive i.e. protocol handler to be alive.", - ), - } + type Output = (NegotiatedSubstream, P); + type Error = void::Void; + type Future = Ready>; - if let Ok(response) = self.response_receiver.await { - let write = self.codec.write_response(&protocol, &mut io, response); - write.await?; - - io.close().await?; - // Response was sent. Indicate to handler to emit a `ResponseSent` event. - Ok(true) - } else { - io.close().await?; - // No response was sent. Indicate to handler to emit a `ResponseOmission` event. - Ok(false) - } - }.boxed() + fn upgrade_inbound(self, io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future { + ready(Ok((io, protocol))) } } -/// Request substream upgrade protocol. -/// -/// Sends a request and receives a response. -pub struct RequestProtocol +impl

OutboundUpgrade for Protocol

where - TCodec: Codec, + P: AsRef + Clone, { - pub(crate) codec: TCodec, - pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>, - pub(crate) request_id: RequestId, - pub(crate) request: TCodec::Request, -} + type Output = (NegotiatedSubstream, P); + type Error = void::Void; + type Future = Ready>; -impl fmt::Debug for RequestProtocol -where - TCodec: Codec, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("RequestProtocol") - .field("request_id", &self.request_id) - .finish() - } -} - -impl UpgradeInfo for RequestProtocol -where - TCodec: Codec, -{ - type Info = TCodec::Protocol; - type InfoIter = smallvec::IntoIter<[Self::Info; 2]>; - - fn protocol_info(&self) -> Self::InfoIter { - self.protocols.clone().into_iter() - } -} - -impl OutboundUpgrade for RequestProtocol -where - TCodec: Codec + Send + 'static, -{ - type Output = TCodec::Response; - type Error = io::Error; - type Future = BoxFuture<'static, Result>; - - fn upgrade_outbound( - mut self, - mut io: NegotiatedSubstream, - protocol: Self::Info, - ) -> Self::Future { - async move { - let write = self.codec.write_request(&protocol, &mut io, self.request); - write.await?; - io.close().await?; - let read = self.codec.read_response(&protocol, &mut io); - let response = read.await?; - Ok(response) - } - .boxed() + fn upgrade_outbound(self, io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future { + ready(Ok((io, protocol))) } } diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 4267f83da8c..0b4ee4d6fef 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -64,7 +64,7 @@ mod handler; pub use codec::Codec; pub use handler::ProtocolSupport; -use crate::handler::protocol::RequestProtocol; +use crate::handler::OutboundMessage; use futures::channel::oneshot; use handler::Handler; use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr}; @@ -317,7 +317,7 @@ where codec: TCodec, /// Pending events to return from `poll`. pending_events: - VecDeque, RequestProtocol>>, + VecDeque, OutboundMessage>>, /// The currently connected peers, their pending outbound and inbound responses and their known, /// reachable addresses, if any. connected: HashMap>, @@ -325,7 +325,7 @@ where addresses: HashMap>, /// Requests that have not yet been sent and are waiting for a connection /// to be established. - pending_outbound_requests: HashMap; 10]>>, + pending_outbound_requests: HashMap; 10]>>, } impl Behaviour @@ -376,11 +376,10 @@ where /// > [`Behaviour::remove_address`]. pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> RequestId { let request_id = self.next_request_id(); - let request = RequestProtocol { + let request = OutboundMessage { request_id, - codec: self.codec.clone(), - protocols: self.outbound_protocols.clone(), request, + protocols: self.outbound_protocols.clone(), }; if let Some(request) = self.try_send_request(peer, request) { @@ -494,8 +493,8 @@ where fn try_send_request( &mut self, peer: &PeerId, - request: RequestProtocol, - ) -> Option> { + request: OutboundMessage, + ) -> Option> { if let Some(connections) = self.connected.get_mut(peer) { if connections.is_empty() { return Some(request); From 30f53b3948b629ee2d5003bda0b4fbe09e8707c1 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 5 Jun 2023 12:35:16 +0200 Subject: [PATCH 04/19] Don't panic on timeouts --- protocols/request-response/src/handler.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 24513c8655b..15a4b093962 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -224,16 +224,17 @@ where ::OutboundProtocol, >, ) { + let message = self + .requested_outbound + .pop_front() + .expect("negotiated a stream without a pending message"); + match error { StreamUpgradeError::Timeout => { - unreachable!("`future::Ready` never times out") + self.pending_events + .push_back(Event::OutboundTimeout(message.request_id)); } StreamUpgradeError::NegotiationFailed => { - let message = self - .requested_outbound - .pop_front() - .expect("negotiated a stream without a pending message"); - // The remote merely doesn't support the protocol(s) we requested. // This is no reason to close the connection, which may // successfully communicate with other protocols already. @@ -244,7 +245,11 @@ where } StreamUpgradeError::Apply(e) => void::unreachable(e), StreamUpgradeError::Io(e) => { - log::debug!("outbound stream failed: {e}"); + log::debug!( + "outbound stream for request {} failed: {e}, retrying", + message.request_id + ); + self.requested_outbound.push_back(message); } } } From 50e6207edb57de7d2e8dda3be6c72645ebfe6f67 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 5 Jun 2023 12:36:21 +0200 Subject: [PATCH 05/19] Remove unnecessary comment --- protocols/request-response/src/handler.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 15a4b093962..a8145f04f86 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -160,11 +160,9 @@ where write.await?; stream.close().await?; - // Response was sent. Indicate to handler to emit a `ResponseSent` event. Ok(Event::ResponseSent(request_id)) } else { stream.close().await?; - // No response was sent. Indicate to handler to emit a `ResponseOmission` event. Ok(Event::ResponseOmission(request_id)) } }; From aea1027b5dad288034d8ba29f2d6208940cf2738 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 5 Jun 2023 12:50:56 +0200 Subject: [PATCH 06/19] Use `mpsc` channel for sending inbound requests --- protocols/request-response/src/handler.rs | 90 ++++++++++------------- 1 file changed, 37 insertions(+), 53 deletions(-) diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index a8145f04f86..bcd4cfa8992 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -26,6 +26,7 @@ use crate::codec::Codec; use crate::handler::protocol::Protocol; use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; +use futures::channel::mpsc; use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered}; use instant::Instant; use libp2p_swarm::handler::{ @@ -72,19 +73,19 @@ where pending_outbound: VecDeque>, requested_outbound: VecDeque>, - /// Inbound upgrades waiting for the incoming request. - inbound: FuturesUnordered< - BoxFuture< - 'static, - Result< - ( - (RequestId, TCodec::Request), - oneshot::Sender, - ), - oneshot::Canceled, - >, - >, - >, + /// A channel for receiving inbound requests. + inbound_receiver: mpsc::Receiver<( + RequestId, + TCodec::Request, + oneshot::Sender, + )>, + /// The [`mpsc::Sender`] for the above receiver. Cloned for each inbound request. + inbound_sender: mpsc::Sender<( + RequestId, + TCodec::Request, + oneshot::Sender, + )>, + inbound_request_id: Arc, worker_streams: FuturesUnordered, io::Error>>>, @@ -101,6 +102,7 @@ where substream_timeout: Duration, inbound_request_id: Arc, ) -> Self { + let (inbound_sender, inbound_receiver) = mpsc::channel(0); Self { inbound_protocols, codec, @@ -109,7 +111,8 @@ where substream_timeout, pending_outbound: VecDeque::new(), requested_outbound: Default::default(), - inbound: FuturesUnordered::new(), + inbound_receiver, + inbound_sender, pending_events: VecDeque::new(), inbound_request_id, worker_streams: Default::default(), @@ -127,33 +130,22 @@ where >, ) { let mut codec = self.codec.clone(); - - // A channel for notifying the handler when the inbound - // upgrade received the request. - let (rq_send, rq_recv) = oneshot::channel(); - - // A channel for notifying the inbound upgrade when the - // response is sent. - let (rs_send, rs_recv) = oneshot::channel(); - - // The handler waits for the request to come in. It then emits - // `Event::Request` together with a - // `ResponseChannel`. - self.inbound - .push(rq_recv.map_ok(move |rq| (rq, rs_send)).boxed()); - let request_id = RequestId(self.inbound_request_id.fetch_add(1, Ordering::Relaxed)); let timeout = self.substream_timeout; + let mut sender = self.inbound_sender.clone(); let recv = async move { + // A channel for notifying the inbound upgrade when the + // response is sent. + let (rs_send, rs_recv) = oneshot::channel(); + let read = codec.read_request(&protocol, &mut stream); let request = read.await?; - match rq_send.send((request_id, request)) { - Ok(()) => {} - Err(_) => { - panic!("Expect request receiver to be alive i.e. protocol handler to be alive.",) - } - } + sender + .send((request_id, request, rs_send)) + .await + .expect("`ConnectionHandler` owns both ends of the channel"); + drop(sender); if let Ok(response) = rs_recv.await { let write = codec.write_response(&protocol, &mut stream, response); @@ -395,23 +387,14 @@ where } // Check for inbound requests. - while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) { - match result { - Ok(((id, rq), rs_sender)) => { - // We received an inbound request. - self.keep_alive = KeepAlive::Yes; - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Request { - request_id: id, - request: rq, - sender: rs_sender, - })); - } - Err(oneshot::Canceled) => { - // The inbound upgrade has errored or timed out reading - // or waiting for the request. The handler is informed - // via `on_connection_event` call with `ConnectionEvent::ListenUpgradeError`. - } - } + if let Poll::Ready(Some((id, rq, rs_sender))) = self.inbound_receiver.poll_next_unpin(cx) { + // We received an inbound request. + self.keep_alive = KeepAlive::Yes; + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Request { + request_id: id, + request: rq, + sender: rs_sender, + })); } // Emit outbound requests. @@ -430,7 +413,8 @@ where self.pending_outbound.shrink_to_fit(); } - if self.inbound.is_empty() && self.keep_alive.is_yes() { + // if self.inbound_sender.is_empty() && self.keep_alive.is_yes() { TODO: Fix keep-alive tracking first? + if self.keep_alive.is_yes() { // No new inbound or outbound requests. However, we may just have // started the latest inbound or outbound upgrade(s), so make sure // the keep-alive timeout is preceded by the substream timeout. From d41f44bce9432b84dd9cc242556d884de9649f77 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 31 Jul 2023 13:31:40 +0200 Subject: [PATCH 07/19] Fix keep-alive TODO --- protocols/request-response/src/handler.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index bcd4cfa8992..67db22c8479 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -413,8 +413,7 @@ where self.pending_outbound.shrink_to_fit(); } - // if self.inbound_sender.is_empty() && self.keep_alive.is_yes() { TODO: Fix keep-alive tracking first? - if self.keep_alive.is_yes() { + if self.worker_streams.is_empty() && self.keep_alive.is_yes() { // No new inbound or outbound requests. However, we may just have // started the latest inbound or outbound upgrade(s), so make sure // the keep-alive timeout is preceded by the substream timeout. From 791220ca950f56674de546b0282446a086714978 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 31 Jul 2023 13:38:39 +0200 Subject: [PATCH 08/19] Fix MSRV issue --- protocols/request-response/src/handler.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 67db22c8479..dff87272918 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -27,7 +27,7 @@ use crate::handler::protocol::Protocol; use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; use futures::channel::mpsc; -use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered}; +use futures::{channel::oneshot, future::BoxFuture, pin_mut, prelude::*, stream::FuturesUnordered}; use instant::Instant; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, @@ -38,7 +38,6 @@ use libp2p_swarm::{ SubstreamProtocol, }; use smallvec::SmallVec; -use std::pin::pin; use std::{ collections::VecDeque, fmt, io, @@ -160,7 +159,9 @@ where }; self.worker_streams.push(Box::pin(async move { - match future::select(pin!(recv), futures_timer::Delay::new(timeout)).await { + pin_mut!(recv); + + match future::select(recv, futures_timer::Delay::new(timeout)).await { future::Either::Left((recv, _)) => recv, future::Either::Right(((), _)) => Err(io::ErrorKind::TimedOut.into()), } @@ -200,7 +201,9 @@ where }; self.worker_streams.push(Box::pin(async move { - match future::select(pin!(send), futures_timer::Delay::new(timeout)).await { + pin_mut!(send); + + match future::select(send, futures_timer::Delay::new(timeout)).await { future::Either::Left((recv, _)) => recv, future::Either::Right(((), _)) => Ok(Event::OutboundTimeout(request_id)), } From eaf1d97a809dcc1532df922678e86962fddc724a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 3 Aug 2023 12:40:07 +0200 Subject: [PATCH 09/19] Introduce `futures-bounded` for time and space bounded workers --- Cargo.lock | 10 ++ Cargo.toml | 7 +- misc/futures-bounded/Cargo.toml | 18 ++++ misc/futures-bounded/src/lib.rs | 124 ++++++++++++++++++++++ protocols/request-response/Cargo.toml | 1 + protocols/request-response/src/handler.rs | 54 +++++----- swarm/CHANGELOG.md | 1 + 7 files changed, 186 insertions(+), 29 deletions(-) create mode 100644 misc/futures-bounded/Cargo.toml create mode 100644 misc/futures-bounded/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 11e67364e38..93d9a0ec96b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1749,6 +1749,15 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-bounded" +version = "0.1.0" +dependencies = [ + "futures-timer", + "futures-util", + "tokio", +] + [[package]] name = "futures-channel" version = "0.3.28" @@ -3130,6 +3139,7 @@ dependencies = [ "cbor4ii", "env_logger 0.10.0", "futures", + "futures-bounded", "futures-timer", "futures_ringbuf", "instant", diff --git a/Cargo.toml b/Cargo.toml index fa8c70eb4ea..2ee7a22b46d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ members = [ "interop-tests", "misc/allow-block-list", "misc/connection-limits", + "misc/futures-bounded", "misc/keygen", "misc/metrics", "misc/multistream-select", @@ -62,6 +63,7 @@ resolver = "2" rust-version = "1.65.0" [workspace.dependencies] +futures-bounded = { version = "0.1.0", path = "misc/futures-bounded" } libp2p-allow-block-list = { version = "0.2.0", path = "misc/allow-block-list" } libp2p-autonat = { version = "0.11.0", path = "protocols/autonat" } libp2p-connection-limits = { version = "0.2.1", path = "misc/connection-limits" } @@ -98,13 +100,12 @@ libp2p-webrtc = { version = "0.6.0-alpha", path = "transports/webrtc" } libp2p-websocket = { version = "0.42.0", path = "transports/websocket" } libp2p-webtransport-websys = { version = "0.1.0", path = "transports/webtransport-websys" } libp2p-yamux = { version = "0.44.0", path = "muxers/yamux" } +multiaddr = "0.18.0" +multihash = "0.19.0" multistream-select = { version = "0.13.0", path = "misc/multistream-select" } quick-protobuf-codec = { version = "0.2.0", path = "misc/quick-protobuf-codec" } quickcheck = { package = "quickcheck-ext", path = "misc/quickcheck-ext" } rw-stream-sink = { version = "0.4.0", path = "misc/rw-stream-sink" } -multiaddr = "0.18.0" -multihash = "0.19.0" - [patch.crates-io] diff --git a/misc/futures-bounded/Cargo.toml b/misc/futures-bounded/Cargo.toml new file mode 100644 index 00000000000..908333ec83b --- /dev/null +++ b/misc/futures-bounded/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "futures-bounded" +version = "0.1.0" +edition = "2021" +rust-version.workspace = true +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["futures", "async", "backpressure"] +categories = ["data-structures", "asynchronous"] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +futures-util = { version = "0.3.28" } +futures-timer = "3.0.2" + +[dev-dependencies] +tokio = { version = "1.29.1", features = ["macros", "rt"] } diff --git a/misc/futures-bounded/src/lib.rs b/misc/futures-bounded/src/lib.rs new file mode 100644 index 00000000000..52eb5bac730 --- /dev/null +++ b/misc/futures-bounded/src/lib.rs @@ -0,0 +1,124 @@ +use std::future::Future; +use std::task::{Context, Poll, Waker}; +use std::time::Duration; + +use futures_timer::Delay; +use futures_util::future::{select, BoxFuture, Either}; +use futures_util::stream::FuturesUnordered; +use futures_util::{ready, FutureExt, StreamExt}; + +/// Represents a set of (Worker)-[Future]s. +/// +/// This wraps [FuturesUnordered] but bounds it by time and size. +/// In other words, each worker must finish within the specified time and the set never outgrows its capacity. +pub struct WorkerFutures { + timeout: Duration, + capacity: usize, + inner: FuturesUnordered)>>, + + empty_waker: Option, + full_waker: Option, +} + +impl WorkerFutures { + pub fn new(timeout: Duration, capacity: usize) -> Self { + Self { + timeout, + capacity, + inner: Default::default(), + empty_waker: None, + full_waker: None, + } + } +} + +impl WorkerFutures +where + K: Send + 'static, +{ + pub fn try_push(&mut self, key: K, worker: F) -> Option + where + F: Future + Send + 'static + Unpin, + { + if self.inner.len() >= self.capacity { + return Some(worker); + } + let timeout = Delay::new(self.timeout); + + self.inner.push( + async move { + match select(worker, timeout).await { + Either::Left((out, _)) => (key, Ok(out)), + Either::Right(((), _)) => (key, Err(Timeout::new())), + } + } + .boxed(), + ); + + None + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> { + if self.inner.len() < self.capacity { + return Poll::Ready(()); + } + + self.full_waker = Some(cx.waker().clone()); + Poll::Pending + } + + pub fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<(K, Result)> { + match ready!(self.inner.poll_next_unpin(cx)) { + None => { + self.empty_waker = Some(cx.waker().clone()); + Poll::Pending + } + Some(result) => { + if let Some(waker) = self.full_waker.take() { + waker.wake(); + } + + Poll::Ready(result) + } + } + } +} + +pub struct Timeout { + _priv: (), +} + +impl Timeout { + fn new() -> Self { + Self { _priv: () } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::future::{pending, poll_fn, ready}; + + #[test] + fn cannot_push_more_than_capacity_tasks() { + let mut workers = WorkerFutures::new(Duration::from_secs(10), 1); + + assert!(workers.try_push((), ready(())).is_none()); + assert!(workers.try_push((), ready(())).is_some()); + } + + #[tokio::test] + async fn workers_timeout() { + let mut workers = WorkerFutures::new(Duration::from_millis(100), 1); + + let _ = workers.try_push((), pending::<()>()); + Delay::new(Duration::from_millis(150)).await; + let (_, result) = poll_fn(|cx| workers.poll_unpin(cx)).await; + + assert!(result.is_err()) + } +} diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 23b875fa3e1..b10e8e7951f 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -25,6 +25,7 @@ smallvec = "1.11.0" void = "1.0.2" log = "0.4.19" futures-timer = "3.0.2" +futures-bounded = { workspace = true } [features] json = ["dep:serde", "dep:serde_json", "libp2p-swarm/macros"] diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index dff87272918..f83d7cf5e00 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -27,7 +27,7 @@ use crate::handler::protocol::Protocol; use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; use futures::channel::mpsc; -use futures::{channel::oneshot, future::BoxFuture, pin_mut, prelude::*, stream::FuturesUnordered}; +use futures::{channel::oneshot, prelude::*}; use instant::Instant; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, @@ -87,7 +87,7 @@ where inbound_request_id: Arc, - worker_streams: FuturesUnordered, io::Error>>>, + worker_streams: futures_bounded::WorkerFutures, io::Error>>, } impl Handler @@ -114,7 +114,7 @@ where inbound_sender, pending_events: VecDeque::new(), inbound_request_id, - worker_streams: Default::default(), + worker_streams: futures_bounded::WorkerFutures::new(substream_timeout, 100), } } @@ -130,7 +130,6 @@ where ) { let mut codec = self.codec.clone(); let request_id = RequestId(self.inbound_request_id.fetch_add(1, Ordering::Relaxed)); - let timeout = self.substream_timeout; let mut sender = self.inbound_sender.clone(); let recv = async move { @@ -158,14 +157,13 @@ where } }; - self.worker_streams.push(Box::pin(async move { - pin_mut!(recv); - - match future::select(recv, futures_timer::Delay::new(timeout)).await { - future::Either::Left((recv, _)) => recv, - future::Either::Right(((), _)) => Err(io::ErrorKind::TimedOut.into()), - } - })); + if self + .worker_streams + .try_push(request_id, recv.boxed()) + .is_some() + { + log::warn!("Dropping inbound stream because we are at capacity") + } } fn on_fully_negotiated_outbound( @@ -184,7 +182,6 @@ where .expect("negotiated a stream without a pending message"); let mut codec = self.codec.clone(); - let timeout = self.substream_timeout; let request_id = message.request_id; let send = async move { @@ -200,14 +197,13 @@ where }) }; - self.worker_streams.push(Box::pin(async move { - pin_mut!(send); - - match future::select(send, futures_timer::Delay::new(timeout)).await { - future::Either::Left((recv, _)) => recv, - future::Either::Right(((), _)) => Ok(Event::OutboundTimeout(request_id)), - } - })); + if self + .worker_streams + .try_push(request_id, send.boxed()) + .is_some() + { + log::warn!("Dropping outbound stream because we are at capacity") + } } fn on_dial_upgrade_error( @@ -373,12 +369,18 @@ where cx: &mut Context<'_>, ) -> Poll, (), Self::ToBehaviour, Self::Error>> { - while let Poll::Ready(Some(result)) = self.worker_streams.poll_next_unpin(cx) { - match result { - Ok(event) => return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)), - Err(e) => { - log::debug!("worker stream failed: {e}") + loop { + match self.worker_streams.poll_unpin(cx) { + Poll::Ready((_, Ok(Ok(event)))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)) + } + Poll::Ready((id, Ok(Err(e)))) => { + log::debug!("Stream for request {id} failed: {e}"); + } + Poll::Ready((id, Err(futures_bounded::Timeout { .. }))) => { + log::debug!("Stream for request {id} timed out"); } + Poll::Pending => break, } } diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 1d4108ac92c..cff499be2be 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -15,6 +15,7 @@ ## 0.43.0 - Allow `NetworkBehaviours` to create and remove listeners. +- See [PR 3292]. - Raise MSRV to 1.65. From be64f0d30cf3c479c991ffbcce96b6282dc939f5 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 3 Aug 2023 12:41:35 +0200 Subject: [PATCH 10/19] Add changelog --- misc/futures-bounded/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 misc/futures-bounded/CHANGELOG.md diff --git a/misc/futures-bounded/CHANGELOG.md b/misc/futures-bounded/CHANGELOG.md new file mode 100644 index 00000000000..712e5543386 --- /dev/null +++ b/misc/futures-bounded/CHANGELOG.md @@ -0,0 +1,3 @@ +## 0.1.0 - unreleased + +Initial release. From bdd95f78db0e06885032c1cf5208f2e3535ae065 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 3 Aug 2023 12:43:01 +0200 Subject: [PATCH 11/19] Add description --- misc/futures-bounded/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/misc/futures-bounded/Cargo.toml b/misc/futures-bounded/Cargo.toml index 908333ec83b..8c4af13be82 100644 --- a/misc/futures-bounded/Cargo.toml +++ b/misc/futures-bounded/Cargo.toml @@ -7,6 +7,7 @@ license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" keywords = ["futures", "async", "backpressure"] categories = ["data-structures", "asynchronous"] +description = "Utilities for bounding futures in size and time." # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html From 51ef639f9b420c34ecabb55b2f8f3ec885a2da86 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 3 Aug 2023 13:06:51 +0200 Subject: [PATCH 12/19] Add test for backpressure --- misc/futures-bounded/src/lib.rs | 69 +++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/misc/futures-bounded/src/lib.rs b/misc/futures-bounded/src/lib.rs index 52eb5bac730..b9016f09966 100644 --- a/misc/futures-bounded/src/lib.rs +++ b/misc/futures-bounded/src/lib.rs @@ -55,6 +55,10 @@ where .boxed(), ); + if let Some(waker) = self.empty_waker.take() { + waker.wake(); + } + None } @@ -88,6 +92,7 @@ where } } +#[derive(Debug)] pub struct Timeout { _priv: (), } @@ -102,6 +107,8 @@ impl Timeout { mod tests { use super::*; use std::future::{pending, poll_fn, ready}; + use std::pin::Pin; + use std::time::Instant; #[test] fn cannot_push_more_than_capacity_tasks() { @@ -121,4 +128,66 @@ mod tests { assert!(result.is_err()) } + + // Each worker causes a delay, `Task` only has a capacity of 1, meaning they must be processed in sequence. + // We stop after NUM_WORKERS tasks, meaning the overall execution must at least take DELAY * NUM_WORKERS. + #[tokio::test] + async fn backpressure() { + const DELAY: Duration = Duration::from_millis(100); + const NUM_WORKERS: u32 = 10; + + let start = Instant::now(); + Task::new(DELAY, NUM_WORKERS, 1).await; + let duration = start.elapsed(); + + assert!(duration >= DELAY * NUM_WORKERS); + } + + struct Task { + worker: Duration, + num_workers: usize, + num_processed: usize, + inner: WorkerFutures<(), ()>, + } + + impl Task { + fn new(worker: Duration, num_workers: u32, capacity: usize) -> Self { + Self { + worker, + num_workers: num_workers as usize, + num_processed: 0, + inner: WorkerFutures::new(Duration::from_secs(60), capacity), + } + } + } + + impl Future for Task { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + while this.num_processed < this.num_workers { + if let Poll::Ready(((), result)) = this.inner.poll_unpin(cx) { + if result.is_err() { + panic!("Timeout is great than worker delay") + } + + this.num_processed += 1; + continue; + } + + if let Poll::Ready(()) = this.inner.poll_ready_unpin(cx) { + let maybe_worker = this.inner.try_push((), Delay::new(this.worker)); + assert!(maybe_worker.is_none(), "we polled for readiness"); + + continue; + } + + return Poll::Pending; + } + + Poll::Ready(()) + } + } } From a2504575495392c968582146e1246392a433871e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sun, 6 Aug 2023 17:02:53 +0200 Subject: [PATCH 13/19] Allow configuration of max capacity for worker streams --- protocols/autonat/src/behaviour.rs | 8 +++++--- protocols/perf/src/client.rs | 7 +++---- protocols/perf/src/server.rs | 8 +++----- protocols/request-response/CHANGELOG.md | 5 +++++ protocols/request-response/src/handler.rs | 6 +++++- protocols/request-response/src/lib.rs | 24 +++++++++++++++++++++++ 6 files changed, 45 insertions(+), 13 deletions(-) diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index 439543f8318..29c2036ff40 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -220,9 +220,11 @@ pub struct Behaviour { impl Behaviour { pub fn new(local_peer_id: PeerId, config: Config) -> Self { let protocols = iter::once((DEFAULT_PROTOCOL_NAME, ProtocolSupport::Full)); - let mut cfg = request_response::Config::default(); - cfg.set_request_timeout(config.timeout); - let inner = request_response::Behaviour::with_codec(AutoNatCodec, protocols, cfg); + let inner = request_response::Behaviour::with_codec( + AutoNatCodec, + protocols, + request_response::Config::default().with_request_timeout(config.timeout), + ); Self { local_peer_id, inner, diff --git a/protocols/perf/src/client.rs b/protocols/perf/src/client.rs index 93c2086a49e..9f7b80f4871 100644 --- a/protocols/perf/src/client.rs +++ b/protocols/perf/src/client.rs @@ -58,9 +58,6 @@ pub struct Behaviour { impl Default for Behaviour { fn default() -> Self { - let mut req_resp_config = request_response::Config::default(); - req_resp_config.set_connection_keep_alive(Duration::from_secs(60 * 5)); - req_resp_config.set_request_timeout(Duration::from_secs(60 * 5)); Self { connected: Default::default(), request_response: request_response::Behaviour::new( @@ -68,7 +65,9 @@ impl Default for Behaviour { crate::PROTOCOL_NAME, request_response::ProtocolSupport::Outbound, )), - req_resp_config, + request_response::Config::default() + .with_connection_keep_alive(Duration::from_secs(60 * 5)) + .with_request_timeout(Duration::from_secs(60 * 5)), ), } } diff --git a/protocols/perf/src/server.rs b/protocols/perf/src/server.rs index 79f77c74650..f34551f7cb9 100644 --- a/protocols/perf/src/server.rs +++ b/protocols/perf/src/server.rs @@ -37,17 +37,15 @@ pub struct Behaviour { impl Default for Behaviour { fn default() -> Self { - let mut req_resp_config = request_response::Config::default(); - req_resp_config.set_connection_keep_alive(Duration::from_secs(60 * 5)); - req_resp_config.set_request_timeout(Duration::from_secs(60 * 5)); - Self { request_response: request_response::Behaviour::new( std::iter::once(( crate::PROTOCOL_NAME, request_response::ProtocolSupport::Inbound, )), - req_resp_config, + request_response::Config::default() + .with_connection_keep_alive(Duration::from_secs(60 * 5)) + .with_request_timeout(Duration::from_secs(60 * 5)), ), } } diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 292cb812103..eead6711f01 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -3,7 +3,12 @@ - Replace unmaintained `serde_cbor` dependency with `cbor4ii`. See [PR 4187]. +- Allow at most 100 concurrent inbound + outbound streams. + This limit is configurable via `Config::with_max_concurrent_streams`. + See [PR 3914]. + [PR 4187]: https://github.com/libp2p/rust-libp2p/pull/4187 +[PR 3914]: https://github.com/libp2p/rust-libp2p/pull/3914 ## 0.25.0 diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index f83d7cf5e00..af40fb45f20 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -100,6 +100,7 @@ where keep_alive_timeout: Duration, substream_timeout: Duration, inbound_request_id: Arc, + max_concurrent_streams: usize, ) -> Self { let (inbound_sender, inbound_receiver) = mpsc::channel(0); Self { @@ -114,7 +115,10 @@ where inbound_sender, pending_events: VecDeque::new(), inbound_request_id, - worker_streams: futures_bounded::WorkerFutures::new(substream_timeout, 100), + worker_streams: futures_bounded::WorkerFutures::new( + substream_timeout, + max_concurrent_streams, + ), } } diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 239a4263a76..7e81f6f5b5c 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -285,6 +285,7 @@ impl fmt::Display for RequestId { pub struct Config { request_timeout: Duration, connection_keep_alive: Duration, + max_concurrent_streams: usize, } impl Default for Config { @@ -292,22 +293,43 @@ impl Default for Config { Self { connection_keep_alive: Duration::from_secs(10), request_timeout: Duration::from_secs(10), + max_concurrent_streams: 100, } } } impl Config { /// Sets the keep-alive timeout of idle connections. + #[deprecated(note = "Use `Config::with_connection_keep_alive` for one-liner constructions.")] pub fn set_connection_keep_alive(&mut self, v: Duration) -> &mut Self { self.connection_keep_alive = v; self } /// Sets the timeout for inbound and outbound requests. + #[deprecated(note = "Use `Config::with_request_timeout` for one-liner constructions.")] pub fn set_request_timeout(&mut self, v: Duration) -> &mut Self { self.request_timeout = v; self } + + /// Sets the keep-alive timeout of idle connections. + pub fn with_connection_keep_alive(mut self, v: Duration) -> Self { + self.connection_keep_alive = v; + self + } + + /// Sets the timeout for inbound and outbound requests. + pub fn with_request_timeout(mut self, v: Duration) -> Self { + self.request_timeout = v; + self + } + + /// Sets the upper bound for the number of concurrent inbound + outbound streams. + pub fn with_max_concurrent_streams(mut self, num_streams: usize) -> Self { + self.max_concurrent_streams = num_streams; + self + } } /// A request/response protocol for some message codec. @@ -722,6 +744,7 @@ where self.config.connection_keep_alive, self.config.request_timeout, self.next_inbound_id.clone(), + self.config.max_concurrent_streams, )) } @@ -761,6 +784,7 @@ where self.config.connection_keep_alive, self.config.request_timeout, self.next_inbound_id.clone(), + self.config.max_concurrent_streams, )) } From f8c42de741c5d8e034132c5b097edd00a7df40d0 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 20 Sep 2023 18:24:51 +1000 Subject: [PATCH 14/19] Fix compile errors --- protocols/request-response/src/handler.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index af40fb45f20..02e5c57def1 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -87,7 +87,7 @@ where inbound_request_id: Arc, - worker_streams: futures_bounded::WorkerFutures, io::Error>>, + worker_streams: futures_bounded::FuturesMap, io::Error>>, } impl Handler @@ -115,7 +115,7 @@ where inbound_sender, pending_events: VecDeque::new(), inbound_request_id, - worker_streams: futures_bounded::WorkerFutures::new( + worker_streams: futures_bounded::FuturesMap::new( substream_timeout, max_concurrent_streams, ), @@ -164,7 +164,7 @@ where if self .worker_streams .try_push(request_id, recv.boxed()) - .is_some() + .is_err() { log::warn!("Dropping inbound stream because we are at capacity") } @@ -204,7 +204,7 @@ where if self .worker_streams .try_push(request_id, send.boxed()) - .is_some() + .is_err() { log::warn!("Dropping outbound stream because we are at capacity") } From 79601175df8b2a0fb0ef3076a394af59e95fac5b Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 20 Sep 2023 18:32:15 +1000 Subject: [PATCH 15/19] Update swarm/CHANGELOG.md --- swarm/CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index b1f8d27c196..06ddd740873 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -37,7 +37,6 @@ ## 0.43.0 - Allow `NetworkBehaviours` to create and remove listeners. -- See [PR 3292]. - Raise MSRV to 1.65. From 302c9201566d4edb6030b58dfa4ede7cea0489bf Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 20 Oct 2023 12:20:49 +1100 Subject: [PATCH 16/19] Move changelog entry --- protocols/request-response/CHANGELOG.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index c04828d8f76..0ea56ee7788 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.26.0 - unreleased + +- Allow at most 100 concurrent inbound + outbound streams per instance of `request_response::Behaviour`. + This limit is configurable via `Config::with_max_concurrent_streams`. + See [PR 3914](https://github.com/libp2p/rust-libp2p/pull/3914). + ## 0.25.2 - Deprecate `request_response::Config::set_connection_keep_alive` in favor of `SwarmBuilder::idle_connection_timeout`. @@ -14,12 +20,7 @@ - Replace unmaintained `serde_cbor` dependency with `cbor4ii`. See [PR 4187]. -- Allow at most 100 concurrent inbound + outbound streams. - This limit is configurable via `Config::with_max_concurrent_streams`. - See [PR 3914]. - [PR 4187]: https://github.com/libp2p/rust-libp2p/pull/4187 -[PR 3914]: https://github.com/libp2p/rust-libp2p/pull/3914 ## 0.25.0 From 71354b2f42d881b325f5761f5b4748485dcffddb Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 20 Oct 2023 18:22:32 +1100 Subject: [PATCH 17/19] Report IO failures --- protocols/autonat/CHANGELOG.md | 3 ++ protocols/autonat/src/behaviour.rs | 2 +- protocols/autonat/src/behaviour/as_client.rs | 4 +-- protocols/autonat/src/behaviour/as_server.rs | 4 +-- protocols/request-response/CHANGELOG.md | 2 ++ protocols/request-response/src/handler.rs | 18 +++++++++++ protocols/request-response/src/lib.rs | 34 ++++++++++++++++++-- 7 files changed, 59 insertions(+), 8 deletions(-) diff --git a/protocols/autonat/CHANGELOG.md b/protocols/autonat/CHANGELOG.md index 852e5da7b89..2b14598bd3e 100644 --- a/protocols/autonat/CHANGELOG.md +++ b/protocols/autonat/CHANGELOG.md @@ -1,5 +1,8 @@ ## 0.12.0 - unreleased +- Remove `Clone`, `PartialEq` and `Eq` implementations on `Event` and its sub-structs. + The `Event` also contains errors which are not clonable or comparable. + See [PR 3914](https://github.com/libp2p/rust-libp2p/pull/3914). ## 0.11.0 diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index 29c2036ff40..5494ed336bf 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -133,7 +133,7 @@ impl ProbeId { } /// Event produced by [`Behaviour`]. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug)] pub enum Event { /// Event on an inbound probe. InboundProbe(InboundProbeEvent), diff --git a/protocols/autonat/src/behaviour/as_client.rs b/protocols/autonat/src/behaviour/as_client.rs index e57523afaf8..5c6194491e4 100644 --- a/protocols/autonat/src/behaviour/as_client.rs +++ b/protocols/autonat/src/behaviour/as_client.rs @@ -39,7 +39,7 @@ use std::{ }; /// Outbound probe failed or was aborted. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug)] pub enum OutboundProbeError { /// Probe was aborted because no server is known, or all servers /// are throttled through [`Config::throttle_server_period`]. @@ -53,7 +53,7 @@ pub enum OutboundProbeError { Response(ResponseError), } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug)] pub enum OutboundProbeEvent { /// A dial-back request was sent to a remote peer. Request { diff --git a/protocols/autonat/src/behaviour/as_server.rs b/protocols/autonat/src/behaviour/as_server.rs index 09c70a27e93..0cbe83f1245 100644 --- a/protocols/autonat/src/behaviour/as_server.rs +++ b/protocols/autonat/src/behaviour/as_server.rs @@ -38,7 +38,7 @@ use std::{ }; /// Inbound probe failed. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug)] pub enum InboundProbeError { /// Receiving the dial-back request or sending a response failed. InboundRequest(InboundFailure), @@ -46,7 +46,7 @@ pub enum InboundProbeError { Response(ResponseError), } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug)] pub enum InboundProbeEvent { /// A dial-back request was received from a remote peer. Request { diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 618fcf3d265..0c4b0e7c465 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -5,6 +5,8 @@ - Allow at most 100 concurrent inbound + outbound streams per instance of `request_response::Behaviour`. This limit is configurable via `Config::with_max_concurrent_streams`. See [PR 3914](https://github.com/libp2p/rust-libp2p/pull/3914). +- Report IO failures on inbound and outbound streams. + See [PR 3914](https://github.com/libp2p/rust-libp2p/pull/3914). ## 0.25.2 diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index e809ce14883..8a9d0c749ee 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -273,6 +273,14 @@ where OutboundTimeout(RequestId), /// An outbound request failed to negotiate a mutually supported protocol. OutboundUnsupportedProtocols(RequestId), + OutboundStreamFailed { + request_id: RequestId, + error: io::Error, + }, + InboundStreamFailed { + request_id: RequestId, + error: io::Error, + }, } impl fmt::Debug for Event { @@ -309,6 +317,16 @@ impl fmt::Debug for Event { .debug_tuple("Event::OutboundUnsupportedProtocols") .field(request_id) .finish(), + Event::OutboundStreamFailed { request_id, error } => f + .debug_struct("Event::OutboundStreamFailed") + .field("request_id", &request_id) + .field("error", &error) + .finish(), + Event::InboundStreamFailed { request_id, error } => f + .debug_struct("Event::InboundStreamFailed") + .field("request_id", &request_id) + .field("error", &error) + .finish(), } } } diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 374dd056a35..1d810274a3a 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -90,7 +90,7 @@ use libp2p_swarm::{ use smallvec::SmallVec; use std::{ collections::{HashMap, HashSet, VecDeque}, - fmt, + fmt, io, sync::{atomic::AtomicU64, Arc}, task::{Context, Poll}, time::Duration, @@ -165,7 +165,7 @@ pub enum Event { /// Possible failures occurring in the context of sending /// an outbound request and receiving the response. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug)] pub enum OutboundFailure { /// The request could not be sent because a dialing attempt failed. DialFailure, @@ -181,6 +181,8 @@ pub enum OutboundFailure { ConnectionClosed, /// The remote supports none of the requested protocols. UnsupportedProtocols, + /// An IO failure happened on an outbound stream. + Io(io::Error), } impl fmt::Display for OutboundFailure { @@ -194,6 +196,7 @@ impl fmt::Display for OutboundFailure { OutboundFailure::UnsupportedProtocols => { write!(f, "The remote supports none of the requested protocols") } + OutboundFailure::Io(e) => write!(f, "IO error on outbound stream: {e}"), } } } @@ -202,7 +205,7 @@ impl std::error::Error for OutboundFailure {} /// Possible failures occurring in the context of receiving an /// inbound request and sending a response. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug)] pub enum InboundFailure { /// The inbound request timed out, either while reading the /// incoming request or before a response is sent, e.g. if @@ -218,6 +221,8 @@ pub enum InboundFailure { /// due to the [`ResponseChannel`] being dropped instead of /// being passed to [`Behaviour::send_response`]. ResponseOmission, + /// An IO failure happened on an inbound stream. + Io(io::Error), } impl fmt::Display for InboundFailure { @@ -237,6 +242,7 @@ impl fmt::Display for InboundFailure { f, "The response channel was dropped without sending a response to the remote" ), + InboundFailure::Io(e) => write!(f, "IO error on inbound stream: {e}"), } } } @@ -907,6 +913,28 @@ where error: OutboundFailure::UnsupportedProtocols, })); } + handler::Event::OutboundStreamFailed { request_id, error } => { + let removed = self.remove_pending_outbound_response(&peer, connection, request_id); + debug_assert!(removed, "Expect request_id to be pending upon failure"); + + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { + peer, + request_id, + error: OutboundFailure::Io(error), + })) + } + handler::Event::InboundStreamFailed { request_id, error } => { + let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); + debug_assert!(removed, "Expect request_id to be pending upon failure"); + + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { + peer, + request_id, + error: InboundFailure::Io(error), + })) + } } } From feccbc29ea614ce9eba014283e6a5c46129dd6dd Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Thu, 26 Oct 2023 13:29:57 +0300 Subject: [PATCH 18/19] fix(request-response): Report failures (#4701) --- Cargo.lock | 1 + examples/file-sharing/src/network.rs | 4 +- protocols/autonat/src/behaviour.rs | 6 +- protocols/autonat/src/behaviour/as_client.rs | 6 +- protocols/autonat/src/behaviour/as_server.rs | 4 +- protocols/autonat/tests/test_client.rs | 8 +- protocols/autonat/tests/test_server.rs | 9 +- protocols/perf/src/client.rs | 6 +- protocols/rendezvous/src/client.rs | 14 +- protocols/request-response/Cargo.toml | 1 + protocols/request-response/src/handler.rs | 92 ++- protocols/request-response/src/lib.rs | 176 +++--- .../request-response/tests/error_reporting.rs | 555 ++++++++++++++++++ protocols/request-response/tests/ping.rs | 7 +- 14 files changed, 756 insertions(+), 133 deletions(-) create mode 100644 protocols/request-response/tests/error_reporting.rs diff --git a/Cargo.lock b/Cargo.lock index 6d4198e00dd..4af6722bdbd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2994,6 +2994,7 @@ dependencies = [ name = "libp2p-request-response" version = "0.26.0" dependencies = [ + "anyhow", "async-std", "async-trait", "cbor4ii", diff --git a/examples/file-sharing/src/network.rs b/examples/file-sharing/src/network.rs index d6adea0ccd6..ffef85d75ae 100644 --- a/examples/file-sharing/src/network.rs +++ b/examples/file-sharing/src/network.rs @@ -8,7 +8,7 @@ use libp2p::{ identity, kad, multiaddr::Protocol, noise, - request_response::{self, ProtocolSupport, RequestId, ResponseChannel}, + request_response::{self, OutboundRequestId, ProtocolSupport, ResponseChannel}, swarm::{NetworkBehaviour, Swarm, SwarmEvent}, tcp, yamux, PeerId, }; @@ -175,7 +175,7 @@ pub(crate) struct EventLoop { pending_start_providing: HashMap>, pending_get_providers: HashMap>>, pending_request_file: - HashMap, Box>>>, + HashMap, Box>>>, } impl EventLoop { diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index 5494ed336bf..f0184fd2fba 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -32,7 +32,7 @@ use instant::Instant; use libp2p_core::{multiaddr::Protocol, ConnectedPoint, Endpoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_request_response::{ - self as request_response, ProtocolSupport, RequestId, ResponseChannel, + self as request_response, InboundRequestId, OutboundRequestId, ProtocolSupport, ResponseChannel, }; use libp2p_swarm::{ behaviour::{ @@ -187,14 +187,14 @@ pub struct Behaviour { PeerId, ( ProbeId, - RequestId, + InboundRequestId, Vec, ResponseChannel, ), >, // Ongoing outbound probes and mapped to the inner request id. - ongoing_outbound: HashMap, + ongoing_outbound: HashMap, // Connected peers with the observed address of each connection. // If the endpoint of a connection is relayed or not global (in case of Config::only_global_ips), diff --git a/protocols/autonat/src/behaviour/as_client.rs b/protocols/autonat/src/behaviour/as_client.rs index 5c6194491e4..a8b5a753a80 100644 --- a/protocols/autonat/src/behaviour/as_client.rs +++ b/protocols/autonat/src/behaviour/as_client.rs @@ -29,7 +29,7 @@ use futures_timer::Delay; use instant::Instant; use libp2p_core::Multiaddr; use libp2p_identity::PeerId; -use libp2p_request_response::{self as request_response, OutboundFailure, RequestId}; +use libp2p_request_response::{self as request_response, OutboundFailure, OutboundRequestId}; use libp2p_swarm::{ConnectionId, ListenAddresses, PollParameters, ToSwarm}; use rand::{seq::SliceRandom, thread_rng}; use std::{ @@ -91,7 +91,7 @@ pub(crate) struct AsClient<'a> { pub(crate) throttled_servers: &'a mut Vec<(PeerId, Instant)>, pub(crate) nat_status: &'a mut NatStatus, pub(crate) confidence: &'a mut usize, - pub(crate) ongoing_outbound: &'a mut HashMap, + pub(crate) ongoing_outbound: &'a mut HashMap, pub(crate) last_probe: &'a mut Option, pub(crate) schedule_probe: &'a mut Delay, pub(crate) listen_addresses: &'a ListenAddresses, @@ -118,7 +118,7 @@ impl<'a> HandleInnerEvent for AsClient<'a> { let probe_id = self .ongoing_outbound .remove(&request_id) - .expect("RequestId exists."); + .expect("OutboundRequestId exists."); let event = match response.result.clone() { Ok(address) => OutboundProbeEvent::Response { diff --git a/protocols/autonat/src/behaviour/as_server.rs b/protocols/autonat/src/behaviour/as_server.rs index 0cbe83f1245..df6e30c318a 100644 --- a/protocols/autonat/src/behaviour/as_server.rs +++ b/protocols/autonat/src/behaviour/as_server.rs @@ -26,7 +26,7 @@ use instant::Instant; use libp2p_core::{multiaddr::Protocol, Multiaddr}; use libp2p_identity::PeerId; use libp2p_request_response::{ - self as request_response, InboundFailure, RequestId, ResponseChannel, + self as request_response, InboundFailure, InboundRequestId, ResponseChannel, }; use libp2p_swarm::{ dial_opts::{DialOpts, PeerCondition}, @@ -85,7 +85,7 @@ pub(crate) struct AsServer<'a> { PeerId, ( ProbeId, - RequestId, + InboundRequestId, Vec, ResponseChannel, ), diff --git a/protocols/autonat/tests/test_client.rs b/protocols/autonat/tests/test_client.rs index 1911d1a6b2d..743f4cc1b51 100644 --- a/protocols/autonat/tests/test_client.rs +++ b/protocols/autonat/tests/test_client.rs @@ -61,7 +61,7 @@ async fn test_auto_probe() { match client.next_behaviour_event().await { Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => { assert!(peer.is_none()); - assert_eq!(error, OutboundProbeError::NoAddresses); + assert!(matches!(error, OutboundProbeError::NoAddresses)); } other => panic!("Unexpected behaviour event: {other:?}."), } @@ -181,10 +181,10 @@ async fn test_confidence() { peer, error, } if !test_public => { - assert_eq!( + assert!(matches!( error, OutboundProbeError::Response(ResponseError::DialError) - ); + )); (peer.unwrap(), probe_id) } other => panic!("Unexpected Outbound Event: {other:?}"), @@ -261,7 +261,7 @@ async fn test_throttle_server_period() { match client.next_behaviour_event().await { Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => { assert!(peer.is_none()); - assert_eq!(error, OutboundProbeError::NoServer); + assert!(matches!(error, OutboundProbeError::NoServer)); } other => panic!("Unexpected behaviour event: {other:?}."), } diff --git a/protocols/autonat/tests/test_server.rs b/protocols/autonat/tests/test_server.rs index 1bb5f624793..fa08bbf3471 100644 --- a/protocols/autonat/tests/test_server.rs +++ b/protocols/autonat/tests/test_server.rs @@ -168,7 +168,10 @@ async fn test_dial_error() { }) => { assert_eq!(probe_id, request_probe_id); assert_eq!(peer, client_id); - assert_eq!(error, InboundProbeError::Response(ResponseError::DialError)); + assert!(matches!( + error, + InboundProbeError::Response(ResponseError::DialError) + )); } other => panic!("Unexpected behaviour event: {other:?}."), } @@ -252,10 +255,10 @@ async fn test_throttle_peer_max() { }) => { assert_eq!(client_id, peer); assert_ne!(first_probe_id, probe_id); - assert_eq!( + assert!(matches!( error, InboundProbeError::Response(ResponseError::DialRefused) - ) + )); } other => panic!("Unexpected behaviour event: {other:?}."), }; diff --git a/protocols/perf/src/client.rs b/protocols/perf/src/client.rs index 6e34a9072f4..670b0e9d299 100644 --- a/protocols/perf/src/client.rs +++ b/protocols/perf/src/client.rs @@ -37,10 +37,10 @@ use crate::{protocol::Response, RunDuration, RunParams}; /// Connection identifier. #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] -pub struct RunId(request_response::RequestId); +pub struct RunId(request_response::OutboundRequestId); -impl From for RunId { - fn from(value: request_response::RequestId) -> Self { +impl From for RunId { + fn from(value: request_response::OutboundRequestId) -> Self { Self(value) } } diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs index 8459dc21c7e..ac2eba05829 100644 --- a/protocols/rendezvous/src/client.rs +++ b/protocols/rendezvous/src/client.rs @@ -26,7 +26,7 @@ use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use libp2p_core::{Endpoint, Multiaddr, PeerRecord}; use libp2p_identity::{Keypair, PeerId, SigningError}; -use libp2p_request_response::{ProtocolSupport, RequestId}; +use libp2p_request_response::{OutboundRequestId, ProtocolSupport}; use libp2p_swarm::{ ConnectionDenied, ConnectionId, ExternalAddresses, FromSwarm, NetworkBehaviour, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, @@ -41,8 +41,8 @@ pub struct Behaviour { keypair: Keypair, - waiting_for_register: HashMap, - waiting_for_discovery: HashMap)>, + waiting_for_register: HashMap, + waiting_for_discovery: HashMap)>, /// Hold addresses of all peers that we have discovered so far. /// @@ -337,7 +337,7 @@ impl NetworkBehaviour for Behaviour { } impl Behaviour { - fn event_for_outbound_failure(&mut self, req_id: &RequestId) -> Option { + fn event_for_outbound_failure(&mut self, req_id: &OutboundRequestId) -> Option { if let Some((rendezvous_node, namespace)) = self.waiting_for_register.remove(req_id) { return Some(Event::RegisterFailed { rendezvous_node, @@ -357,7 +357,11 @@ impl Behaviour { None } - fn handle_response(&mut self, request_id: &RequestId, response: Message) -> Option { + fn handle_response( + &mut self, + request_id: &OutboundRequestId, + response: Message, + ) -> Option { match response { RegisterResponse(Ok(ttl)) => { if let Some((rendezvous_node, namespace)) = diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index a328c3141ce..5c894bcd60f 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -32,6 +32,7 @@ json = ["dep:serde", "dep:serde_json", "libp2p-swarm/macros"] cbor = ["dep:serde", "dep:cbor4ii", "libp2p-swarm/macros"] [dev-dependencies] +anyhow = "1.0.75" async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.10.0" libp2p-noise = { workspace = true } diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 8a9d0c749ee..b754653907b 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -24,7 +24,7 @@ pub use protocol::ProtocolSupport; use crate::codec::Codec; use crate::handler::protocol::Protocol; -use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; +use crate::{InboundRequestId, OutboundRequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; use futures::channel::mpsc; use futures::{channel::oneshot, prelude::*}; @@ -67,13 +67,13 @@ where requested_outbound: VecDeque>, /// A channel for receiving inbound requests. inbound_receiver: mpsc::Receiver<( - RequestId, + InboundRequestId, TCodec::Request, oneshot::Sender, )>, /// The [`mpsc::Sender`] for the above receiver. Cloned for each inbound request. inbound_sender: mpsc::Sender<( - RequestId, + InboundRequestId, TCodec::Request, oneshot::Sender, )>, @@ -83,6 +83,12 @@ where worker_streams: futures_bounded::FuturesMap, io::Error>>, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +enum RequestId { + Inbound(InboundRequestId), + Outbound(OutboundRequestId), +} + impl Handler where TCodec: Codec + Send + Clone + 'static, @@ -112,6 +118,11 @@ where } } + /// Returns the next inbound request ID. + fn next_inbound_request_id(&mut self) -> InboundRequestId { + InboundRequestId(self.inbound_request_id.fetch_add(1, Ordering::Relaxed)) + } + fn on_fully_negotiated_inbound( &mut self, FullyNegotiatedInbound { @@ -123,7 +134,7 @@ where >, ) { let mut codec = self.codec.clone(); - let request_id = RequestId(self.inbound_request_id.fetch_add(1, Ordering::Relaxed)); + let request_id = self.next_inbound_request_id(); let mut sender = self.inbound_sender.clone(); let recv = async move { @@ -153,7 +164,7 @@ where if self .worker_streams - .try_push(request_id, recv.boxed()) + .try_push(RequestId::Inbound(request_id), recv.boxed()) .is_err() { log::warn!("Dropping inbound stream because we are at capacity") @@ -193,7 +204,7 @@ where if self .worker_streams - .try_push(request_id, send.boxed()) + .try_push(RequestId::Outbound(request_id), send.boxed()) .is_err() { log::warn!("Dropping outbound stream because we are at capacity") @@ -254,31 +265,34 @@ where { /// A request has been received. Request { - request_id: RequestId, + request_id: InboundRequestId, request: TCodec::Request, sender: oneshot::Sender, }, /// A response has been received. Response { - request_id: RequestId, + request_id: OutboundRequestId, response: TCodec::Response, }, /// A response to an inbound request has been sent. - ResponseSent(RequestId), + ResponseSent(InboundRequestId), /// A response to an inbound request was omitted as a result /// of dropping the response `sender` of an inbound `Request`. - ResponseOmission(RequestId), + ResponseOmission(InboundRequestId), /// An outbound request timed out while sending the request /// or waiting for the response. - OutboundTimeout(RequestId), + OutboundTimeout(OutboundRequestId), /// An outbound request failed to negotiate a mutually supported protocol. - OutboundUnsupportedProtocols(RequestId), + OutboundUnsupportedProtocols(OutboundRequestId), OutboundStreamFailed { - request_id: RequestId, + request_id: OutboundRequestId, error: io::Error, }, + /// An inbound request timed out while waiting for the request + /// or sending the response. + InboundTimeout(InboundRequestId), InboundStreamFailed { - request_id: RequestId, + request_id: InboundRequestId, error: io::Error, }, } @@ -322,6 +336,10 @@ impl fmt::Debug for Event { .field("request_id", &request_id) .field("error", &error) .finish(), + Event::InboundTimeout(request_id) => f + .debug_tuple("Event::InboundTimeout") + .field(request_id) + .finish(), Event::InboundStreamFailed { request_id, error } => f .debug_struct("Event::InboundStreamFailed") .field("request_id", &request_id) @@ -332,7 +350,7 @@ impl fmt::Debug for Event { } pub struct OutboundMessage { - pub(crate) request_id: RequestId, + pub(crate) request_id: OutboundRequestId, pub(crate) request: TCodec::Request, pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>, } @@ -381,22 +399,40 @@ where cx: &mut Context<'_>, ) -> Poll, (), Self::ToBehaviour, Self::Error>> { - loop { - match self.worker_streams.poll_unpin(cx) { - Poll::Ready((_, Ok(Ok(event)))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)) - } - Poll::Ready((id, Ok(Err(e)))) => { - log::debug!("Stream for request {id} failed: {e}"); - } - Poll::Ready((id, Err(futures_bounded::Timeout { .. }))) => { - log::debug!("Stream for request {id} timed out"); - } - Poll::Pending => break, + match self.worker_streams.poll_unpin(cx) { + Poll::Ready((_, Ok(Ok(event)))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + } + Poll::Ready((RequestId::Inbound(id), Ok(Err(e)))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundStreamFailed { + request_id: id, + error: e, + }, + )); + } + Poll::Ready((RequestId::Outbound(id), Ok(Err(e)))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundStreamFailed { + request_id: id, + error: e, + }, + )); + } + Poll::Ready((RequestId::Inbound(id), Err(futures_bounded::Timeout { .. }))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundTimeout(id), + )); + } + Poll::Ready((RequestId::Outbound(id), Err(futures_bounded::Timeout { .. }))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundTimeout(id), + )); } + Poll::Pending => {} } - // Drain pending events. + // Drain pending events that were produced by `worker_streams`. if let Some(event) = self.pending_events.pop_front() { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 1d810274a3a..e181d452a67 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -102,7 +102,7 @@ pub enum Message { /// A request message. Request { /// The ID of this request. - request_id: RequestId, + request_id: InboundRequestId, /// The request message. request: TRequest, /// The channel waiting for the response. @@ -117,7 +117,7 @@ pub enum Message { /// The ID of the request that produced this response. /// /// See [`Behaviour::send_request`]. - request_id: RequestId, + request_id: OutboundRequestId, /// The response message. response: TResponse, }, @@ -138,7 +138,7 @@ pub enum Event { /// The peer to whom the request was sent. peer: PeerId, /// The (local) ID of the failed request. - request_id: RequestId, + request_id: OutboundRequestId, /// The error that occurred. error: OutboundFailure, }, @@ -147,7 +147,7 @@ pub enum Event { /// The peer from whom the request was received. peer: PeerId, /// The ID of the failed inbound request. - request_id: RequestId, + request_id: InboundRequestId, /// The error that occurred. error: InboundFailure, }, @@ -159,7 +159,7 @@ pub enum Event { /// The peer to whom the response was sent. peer: PeerId, /// The ID of the inbound request whose response was sent. - request_id: RequestId, + request_id: InboundRequestId, }, } @@ -270,17 +270,27 @@ impl ResponseChannel { } } -/// The ID of an inbound or outbound request. +/// The ID of an inbound request. /// -/// Note: [`RequestId`]'s uniqueness is only guaranteed between two -/// inbound and likewise between two outbound requests. There is no -/// uniqueness guarantee in a set of both inbound and outbound -/// [`RequestId`]s nor in a set of inbound or outbound requests -/// originating from different [`Behaviour`]'s. +/// Note: [`InboundRequestId`]'s uniqueness is only guaranteed between +/// inbound requests of the same originating [`Behaviour`]. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub struct RequestId(u64); +pub struct InboundRequestId(u64); -impl fmt::Display for RequestId { +impl fmt::Display for InboundRequestId { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +/// The ID of an outbound request. +/// +/// Note: [`OutboundRequestId`]'s uniqueness is only guaranteed between +/// outbound requests of the same originating [`Behaviour`]. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct OutboundRequestId(u64); + +impl fmt::Display for OutboundRequestId { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}", self.0) } @@ -333,9 +343,9 @@ where /// The supported outbound protocols. outbound_protocols: SmallVec<[TCodec::Protocol; 2]>, /// The next (local) request ID. - next_request_id: RequestId, + next_outbound_request_id: OutboundRequestId, /// The next (inbound) request ID. - next_inbound_id: Arc, + next_inbound_request_id: Arc, /// The protocol configuration. config: Config, /// The protocol codec for reading and writing requests and responses. @@ -389,8 +399,8 @@ where Behaviour { inbound_protocols, outbound_protocols, - next_request_id: RequestId(1), - next_inbound_id: Arc::new(AtomicU64::new(1)), + next_outbound_request_id: OutboundRequestId(1), + next_inbound_request_id: Arc::new(AtomicU64::new(1)), config: cfg, codec, pending_events: VecDeque::new(), @@ -412,8 +422,8 @@ where /// > address discovery, or known addresses of peers must be /// > managed via [`Behaviour::add_address`] and /// > [`Behaviour::remove_address`]. - pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> RequestId { - let request_id = self.next_request_id(); + pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId { + let request_id = self.next_outbound_request_id(); let request = OutboundMessage { request_id, request, @@ -485,14 +495,14 @@ where /// Checks whether an outbound request to the peer with the provided /// [`PeerId`] initiated by [`Behaviour::send_request`] is still /// pending, i.e. waiting for a response. - pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &RequestId) -> bool { + pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &OutboundRequestId) -> bool { // Check if request is already sent on established connection. let est_conn = self .connected .get(peer) .map(|cs| { cs.iter() - .any(|c| c.pending_inbound_responses.contains(request_id)) + .any(|c| c.pending_outbound_responses.contains(request_id)) }) .unwrap_or(false); // Check if request is still pending to be sent. @@ -508,20 +518,20 @@ where /// Checks whether an inbound request from the peer with the provided /// [`PeerId`] is still pending, i.e. waiting for a response by the local /// node through [`Behaviour::send_response`]. - pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &RequestId) -> bool { + pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &InboundRequestId) -> bool { self.connected .get(peer) .map(|cs| { cs.iter() - .any(|c| c.pending_outbound_responses.contains(request_id)) + .any(|c| c.pending_inbound_responses.contains(request_id)) }) .unwrap_or(false) } - /// Returns the next request ID. - fn next_request_id(&mut self) -> RequestId { - let request_id = self.next_request_id; - self.next_request_id.0 += 1; + /// Returns the next outbound request ID. + fn next_outbound_request_id(&mut self) -> OutboundRequestId { + let request_id = self.next_outbound_request_id; + self.next_outbound_request_id.0 += 1; request_id } @@ -539,7 +549,7 @@ where } let ix = (request.request_id.0 as usize) % connections.len(); let conn = &mut connections[ix]; - conn.pending_inbound_responses.insert(request.request_id); + conn.pending_outbound_responses.insert(request.request_id); self.pending_events.push_back(ToSwarm::NotifyHandler { peer_id: *peer, handler: NotifyHandler::One(conn.id), @@ -554,13 +564,13 @@ where /// Remove pending outbound response for the given peer and connection. /// /// Returns `true` if the provided connection to the given peer is still - /// alive and the [`RequestId`] was previously present and is now removed. + /// alive and the [`OutboundRequestId`] was previously present and is now removed. /// Returns `false` otherwise. fn remove_pending_outbound_response( &mut self, peer: &PeerId, connection: ConnectionId, - request: RequestId, + request: OutboundRequestId, ) -> bool { self.get_connection_mut(peer, connection) .map(|c| c.pending_outbound_responses.remove(&request)) @@ -570,16 +580,16 @@ where /// Remove pending inbound response for the given peer and connection. /// /// Returns `true` if the provided connection to the given peer is still - /// alive and the [`RequestId`] was previously present and is now removed. + /// alive and the [`InboundRequestId`] was previously present and is now removed. /// Returns `false` otherwise. fn remove_pending_inbound_response( &mut self, peer: &PeerId, connection: ConnectionId, - request: &RequestId, + request: InboundRequestId, ) -> bool { self.get_connection_mut(peer, connection) - .map(|c| c.pending_inbound_responses.remove(request)) + .map(|c| c.pending_inbound_responses.remove(&request)) .unwrap_or(false) } @@ -645,7 +655,7 @@ where self.connected.remove(&peer_id); } - for request_id in connection.pending_outbound_responses { + for request_id in connection.pending_inbound_responses { self.pending_events .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { peer: peer_id, @@ -654,7 +664,7 @@ where })); } - for request_id in connection.pending_inbound_responses { + for request_id in connection.pending_outbound_responses { self.pending_events .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { peer: peer_id, @@ -698,7 +708,7 @@ where if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer) { for request in pending_requests { connection - .pending_inbound_responses + .pending_outbound_responses .insert(request.request_id); handler.on_behaviour_event(request); } @@ -726,7 +736,7 @@ where self.inbound_protocols.clone(), self.codec.clone(), self.config.request_timeout, - self.next_inbound_id.clone(), + self.next_inbound_request_id.clone(), self.config.max_concurrent_streams, ); @@ -769,7 +779,7 @@ where self.inbound_protocols.clone(), self.codec.clone(), self.config.request_timeout, - self.next_inbound_id.clone(), + self.next_inbound_request_id.clone(), self.config.max_concurrent_streams, ); @@ -814,7 +824,7 @@ where request_id, response, } => { - let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); + let removed = self.remove_pending_outbound_response(&peer, connection, request_id); debug_assert!( removed, "Expect request_id to be pending before receiving response.", @@ -831,35 +841,26 @@ where request_id, request, sender, - } => { - let channel = ResponseChannel { sender }; - let message = Message::Request { - request_id, - request, - channel, - }; - self.pending_events - .push_back(ToSwarm::GenerateEvent(Event::Message { peer, message })); + } => match self.get_connection_mut(&peer, connection) { + Some(connection) => { + let inserted = connection.pending_inbound_responses.insert(request_id); + debug_assert!(inserted, "Expect id of new request to be unknown."); - match self.get_connection_mut(&peer, connection) { - Some(connection) => { - let inserted = connection.pending_outbound_responses.insert(request_id); - debug_assert!(inserted, "Expect id of new request to be unknown."); - } - // Connection closed after `Event::Request` has been emitted. - None => { - self.pending_events.push_back(ToSwarm::GenerateEvent( - Event::InboundFailure { - peer, - request_id, - error: InboundFailure::ConnectionClosed, - }, - )); - } + let channel = ResponseChannel { sender }; + let message = Message::Request { + request_id, + request, + channel, + }; + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::Message { peer, message })); } - } + None => { + log::debug!("Connection ({connection}) closed after `Event::Request` ({request_id}) has been emitted."); + } + }, handler::Event::ResponseSent(request_id) => { - let removed = self.remove_pending_outbound_response(&peer, connection, request_id); + let removed = self.remove_pending_inbound_response(&peer, connection, request_id); debug_assert!( removed, "Expect request_id to be pending before response is sent." @@ -872,7 +873,7 @@ where })); } handler::Event::ResponseOmission(request_id) => { - let removed = self.remove_pending_outbound_response(&peer, connection, request_id); + let removed = self.remove_pending_inbound_response(&peer, connection, request_id); debug_assert!( removed, "Expect request_id to be pending before response is omitted.", @@ -886,7 +887,7 @@ where })); } handler::Event::OutboundTimeout(request_id) => { - let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); + let removed = self.remove_pending_outbound_response(&peer, connection, request_id); debug_assert!( removed, "Expect request_id to be pending before request times out." @@ -900,7 +901,7 @@ where })); } handler::Event::OutboundUnsupportedProtocols(request_id) => { - let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); + let removed = self.remove_pending_outbound_response(&peer, connection, request_id); debug_assert!( removed, "Expect request_id to be pending before failing to connect.", @@ -924,16 +925,35 @@ where error: OutboundFailure::Io(error), })) } + handler::Event::InboundTimeout(request_id) => { + let removed = self.remove_pending_inbound_response(&peer, connection, request_id); + + if removed { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { + peer, + request_id, + error: InboundFailure::Timeout, + })); + } else { + // This happens when timeout is emitted before `read_request` finishes. + log::debug!("Inbound request timeout for an unknown request_id ({request_id})"); + } + } handler::Event::InboundStreamFailed { request_id, error } => { - let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); - debug_assert!(removed, "Expect request_id to be pending upon failure"); + let removed = self.remove_pending_inbound_response(&peer, connection, request_id); - self.pending_events - .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { - peer, - request_id, - error: InboundFailure::Io(error), - })) + if removed { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { + peer, + request_id, + error: InboundFailure::Io(error), + })); + } else { + // This happens when `read_request` fails. + log::debug!("Inbound failure is reported for an unknown request_id ({request_id}): {error}"); + } } } } @@ -966,10 +986,10 @@ struct Connection { /// Pending outbound responses where corresponding inbound requests have /// been received on this connection and emitted via `poll` but have not yet /// been answered. - pending_outbound_responses: HashSet, + pending_outbound_responses: HashSet, /// Pending inbound responses for previously sent requests on this /// connection. - pending_inbound_responses: HashSet, + pending_inbound_responses: HashSet, } impl Connection { diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs new file mode 100644 index 00000000000..cf651d395f5 --- /dev/null +++ b/protocols/request-response/tests/error_reporting.rs @@ -0,0 +1,555 @@ +use anyhow::{bail, Result}; +use async_std::task::sleep; +use async_trait::async_trait; +use futures::prelude::*; +use libp2p_identity::PeerId; +use libp2p_request_response as request_response; +use libp2p_request_response::ProtocolSupport; +use libp2p_swarm::{StreamProtocol, Swarm}; +use libp2p_swarm_test::SwarmExt; +use request_response::{ + Codec, InboundFailure, InboundRequestId, OutboundFailure, OutboundRequestId, ResponseChannel, +}; +use std::pin::pin; +use std::time::Duration; +use std::{io, iter}; + +#[async_std::test] +async fn report_outbound_failure_on_read_response() { + let _ = env_logger::try_init(); + + let (peer1_id, mut swarm1) = new_swarm(); + let (peer2_id, mut swarm2) = new_swarm(); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + let server_task = async move { + let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(action, Action::FailOnReadResponse); + swarm1 + .behaviour_mut() + .send_response(resp_channel, Action::FailOnReadResponse) + .unwrap(); + + let (peer, req_id_done) = wait_response_sent(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(req_id_done, req_id); + + // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead + wait_no_events(&mut swarm1).await; + }; + + // Expects OutboundFailure::Io failure with `FailOnReadResponse` error + let client_task = async move { + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnReadResponse); + + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); + + let error = match error { + OutboundFailure::Io(e) => e, + e => panic!("Unexpected error: {e:?}"), + }; + + assert_eq!(error.kind(), io::ErrorKind::Other); + assert_eq!( + error.into_inner().unwrap().to_string(), + "FailOnReadResponse" + ); + }; + + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; +} + +#[async_std::test] +async fn report_outbound_failure_on_write_request() { + let _ = env_logger::try_init(); + + let (peer1_id, mut swarm1) = new_swarm(); + let (_peer2_id, mut swarm2) = new_swarm(); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + // Expects no events because `Event::Request` is produced after `read_request`. + // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead. + let server_task = wait_no_events(&mut swarm1); + + // Expects OutboundFailure::Io failure with `FailOnWriteRequest` error. + let client_task = async move { + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnWriteRequest); + + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); + + let error = match error { + OutboundFailure::Io(e) => e, + e => panic!("Unexpected error: {e:?}"), + }; + + assert_eq!(error.kind(), io::ErrorKind::Other); + assert_eq!( + error.into_inner().unwrap().to_string(), + "FailOnWriteRequest" + ); + }; + + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; +} + +#[async_std::test] +async fn report_outbound_timeout_on_read_response() { + let _ = env_logger::try_init(); + + // `swarm1` needs to have a bigger timeout to avoid racing + let (peer1_id, mut swarm1) = new_swarm_with_timeout(Duration::from_millis(200)); + let (peer2_id, mut swarm2) = new_swarm_with_timeout(Duration::from_millis(100)); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + let server_task = async move { + let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(action, Action::TimeoutOnReadResponse); + swarm1 + .behaviour_mut() + .send_response(resp_channel, Action::TimeoutOnReadResponse) + .unwrap(); + + let (peer, req_id_done) = wait_response_sent(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(req_id_done, req_id); + + // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead + wait_no_events(&mut swarm1).await; + }; + + // Expects OutboundFailure::Timeout + let client_task = async move { + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::TimeoutOnReadResponse); + + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); + assert!(matches!(error, OutboundFailure::Timeout)); + }; + + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; +} + +#[async_std::test] +async fn report_inbound_failure_on_read_request() { + let _ = env_logger::try_init(); + + let (peer1_id, mut swarm1) = new_swarm(); + let (_peer2_id, mut swarm2) = new_swarm(); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + // Expects no events because `Event::Request` is produced after `read_request`. + // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead. + let server_task = wait_no_events(&mut swarm1); + + // Expects io::ErrorKind::UnexpectedEof + let client_task = async move { + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnReadRequest); + + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); + + match error { + OutboundFailure::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof => {} + e => panic!("Unexpected error: {e:?}"), + }; + }; + + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; +} + +#[async_std::test] +async fn report_inbound_failure_on_write_response() { + let _ = env_logger::try_init(); + + let (peer1_id, mut swarm1) = new_swarm(); + let (peer2_id, mut swarm2) = new_swarm(); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + // Expects OutboundFailure::Io failure with `FailOnWriteResponse` error + let server_task = async move { + let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(action, Action::FailOnWriteResponse); + swarm1 + .behaviour_mut() + .send_response(resp_channel, Action::FailOnWriteResponse) + .unwrap(); + + let (peer, req_id_done, error) = wait_inbound_failure(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(req_id_done, req_id); + + let error = match error { + InboundFailure::Io(e) => e, + e => panic!("Unexpected error: {e:?}"), + }; + + assert_eq!(error.kind(), io::ErrorKind::Other); + assert_eq!( + error.into_inner().unwrap().to_string(), + "FailOnWriteResponse" + ); + }; + + // Expects OutboundFailure::ConnectionClosed or io::ErrorKind::UnexpectedEof + let client_task = async move { + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnWriteResponse); + + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); + + match error { + OutboundFailure::ConnectionClosed => { + // ConnectionClosed is allowed here because we mainly test the behavior + // of `server_task`. + } + OutboundFailure::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof => {} + e => panic!("Unexpected error: {e:?}"), + }; + + // Keep alive the task, so only `server_task` can finish + wait_no_events(&mut swarm2).await; + }; + + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; +} + +#[async_std::test] +async fn report_inbound_timeout_on_write_response() { + let _ = env_logger::try_init(); + + // `swarm2` needs to have a bigger timeout to avoid racing + let (peer1_id, mut swarm1) = new_swarm_with_timeout(Duration::from_millis(100)); + let (peer2_id, mut swarm2) = new_swarm_with_timeout(Duration::from_millis(200)); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + // Expects InboundFailure::Timeout + let server_task = async move { + let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(action, Action::TimeoutOnWriteResponse); + swarm1 + .behaviour_mut() + .send_response(resp_channel, Action::TimeoutOnWriteResponse) + .unwrap(); + + let (peer, req_id_done, error) = wait_inbound_failure(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(req_id_done, req_id); + assert!(matches!(error, InboundFailure::Timeout)); + }; + + // Expects OutboundFailure::ConnectionClosed or io::ErrorKind::UnexpectedEof + let client_task = async move { + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::TimeoutOnWriteResponse); + + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); + + match error { + OutboundFailure::ConnectionClosed => { + // ConnectionClosed is allowed here because we mainly test the behavior + // of `server_task`. + } + OutboundFailure::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof => {} + e => panic!("Unexpected error: {e:?}"), + } + + // Keep alive the task, so only `server_task` can finish + wait_no_events(&mut swarm2).await; + }; + + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; +} + +#[derive(Clone, Default)] +struct TestCodec; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum Action { + FailOnReadRequest, + FailOnReadResponse, + TimeoutOnReadResponse, + FailOnWriteRequest, + FailOnWriteResponse, + TimeoutOnWriteResponse, +} + +impl From for u8 { + fn from(value: Action) -> Self { + match value { + Action::FailOnReadRequest => 0, + Action::FailOnReadResponse => 1, + Action::TimeoutOnReadResponse => 2, + Action::FailOnWriteRequest => 3, + Action::FailOnWriteResponse => 4, + Action::TimeoutOnWriteResponse => 5, + } + } +} + +impl TryFrom for Action { + type Error = io::Error; + + fn try_from(value: u8) -> Result { + match value { + 0 => Ok(Action::FailOnReadRequest), + 1 => Ok(Action::FailOnReadResponse), + 2 => Ok(Action::TimeoutOnReadResponse), + 3 => Ok(Action::FailOnWriteRequest), + 4 => Ok(Action::FailOnWriteResponse), + 5 => Ok(Action::TimeoutOnWriteResponse), + _ => Err(io::Error::new(io::ErrorKind::Other, "invalid action")), + } + } +} + +#[async_trait] +impl Codec for TestCodec { + type Protocol = StreamProtocol; + type Request = Action; + type Response = Action; + + async fn read_request( + &mut self, + _protocol: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut buf = Vec::new(); + io.read_to_end(&mut buf).await?; + + if buf.is_empty() { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + assert_eq!(buf.len(), 1); + + match buf[0].try_into()? { + Action::FailOnReadRequest => { + Err(io::Error::new(io::ErrorKind::Other, "FailOnReadRequest")) + } + action => Ok(action), + } + } + + async fn read_response( + &mut self, + _protocol: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut buf = Vec::new(); + io.read_to_end(&mut buf).await?; + + if buf.is_empty() { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + assert_eq!(buf.len(), 1); + + match buf[0].try_into()? { + Action::FailOnReadResponse => { + Err(io::Error::new(io::ErrorKind::Other, "FailOnReadResponse")) + } + Action::TimeoutOnReadResponse => loop { + sleep(Duration::MAX).await; + }, + action => Ok(action), + } + } + + async fn write_request( + &mut self, + _protocol: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + match req { + Action::FailOnWriteRequest => { + Err(io::Error::new(io::ErrorKind::Other, "FailOnWriteRequest")) + } + action => { + let bytes = [action.into()]; + io.write_all(&bytes).await?; + Ok(()) + } + } + } + + async fn write_response( + &mut self, + _protocol: &Self::Protocol, + io: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + match res { + Action::FailOnWriteResponse => { + Err(io::Error::new(io::ErrorKind::Other, "FailOnWriteResponse")) + } + Action::TimeoutOnWriteResponse => loop { + sleep(Duration::MAX).await; + }, + action => { + let bytes = [action.into()]; + io.write_all(&bytes).await?; + Ok(()) + } + } + } +} + +fn new_swarm_with_timeout( + timeout: Duration, +) -> (PeerId, Swarm>) { + let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); + let cfg = request_response::Config::default().with_request_timeout(timeout); + + let swarm = + Swarm::new_ephemeral(|_| request_response::Behaviour::::new(protocols, cfg)); + let peed_id = *swarm.local_peer_id(); + + (peed_id, swarm) +} + +fn new_swarm() -> (PeerId, Swarm>) { + new_swarm_with_timeout(Duration::from_millis(100)) +} + +async fn wait_no_events(swarm: &mut Swarm>) { + loop { + if let Ok(ev) = swarm.select_next_some().await.try_into_behaviour_event() { + panic!("Unexpected event: {ev:?}") + } + } +} + +async fn wait_request( + swarm: &mut Swarm>, +) -> Result<(PeerId, InboundRequestId, Action, ResponseChannel)> { + loop { + match swarm.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::Message { + peer, + message: + request_response::Message::Request { + request_id, + request, + channel, + }, + }) => { + return Ok((peer, request_id, request, channel)); + } + Ok(ev) => bail!("Unexpected event: {ev:?}"), + Err(..) => {} + } + } +} + +async fn wait_response_sent( + swarm: &mut Swarm>, +) -> Result<(PeerId, InboundRequestId)> { + loop { + match swarm.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::ResponseSent { + peer, request_id, .. + }) => { + return Ok((peer, request_id)); + } + Ok(ev) => bail!("Unexpected event: {ev:?}"), + Err(..) => {} + } + } +} + +async fn wait_inbound_failure( + swarm: &mut Swarm>, +) -> Result<(PeerId, InboundRequestId, InboundFailure)> { + loop { + match swarm.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::InboundFailure { + peer, + request_id, + error, + }) => { + return Ok((peer, request_id, error)); + } + Ok(ev) => bail!("Unexpected event: {ev:?}"), + Err(..) => {} + } + } +} + +async fn wait_outbound_failure( + swarm: &mut Swarm>, +) -> Result<(PeerId, OutboundRequestId, OutboundFailure)> { + loop { + match swarm.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::OutboundFailure { + peer, + request_id, + error, + }) => { + return Ok((peer, request_id, error)); + } + Ok(ev) => bail!("Unexpected event: {ev:?}"), + Err(..) => {} + } + } +} diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index e0424488f48..37f21264d49 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -28,7 +28,7 @@ use libp2p_swarm::{StreamProtocol, Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; use rand::{self, Rng}; use serde::{Deserialize, Serialize}; -use std::iter; +use std::{io, iter}; #[async_std::test] #[cfg(feature = "cbor")] @@ -288,7 +288,10 @@ async fn emits_inbound_connection_closed_if_channel_is_dropped() { e => panic!("unexpected event from peer 2: {e:?}"), }; - assert_eq!(error, request_response::OutboundFailure::ConnectionClosed); + assert!(matches!( + error, + request_response::OutboundFailure::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof, + )); } // Simple Ping-Pong Protocol From c6603a18e42247c1f490bc9e2ff6f8eb7ec40f23 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 26 Oct 2023 21:35:17 +1100 Subject: [PATCH 19/19] Add further changelog entry --- protocols/request-response/CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 113d9c8f406..138401c2f50 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -7,7 +7,8 @@ See [PR 3914](https://github.com/libp2p/rust-libp2p/pull/3914). - Report IO failures on inbound and outbound streams. See [PR 3914](https://github.com/libp2p/rust-libp2p/pull/3914). - +- Introduce dedicated types for `InboundRequestId` and `OutboundRequestId`. + See [PR 3914](https://github.com/libp2p/rust-libp2p/pull/3914). - Keep peer addresses in `HashSet` instead of `SmallVec` to prevent adding duplicate addresses. See [PR 4700](https://github.com/libp2p/rust-libp2p/pull/4700).