diff --git a/Cargo.lock b/Cargo.lock index 6d4198e00dd..4af6722bdbd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2994,6 +2994,7 @@ dependencies = [ name = "libp2p-request-response" version = "0.26.0" dependencies = [ + "anyhow", "async-std", "async-trait", "cbor4ii", diff --git a/examples/file-sharing/src/network.rs b/examples/file-sharing/src/network.rs index d6adea0ccd6..ffef85d75ae 100644 --- a/examples/file-sharing/src/network.rs +++ b/examples/file-sharing/src/network.rs @@ -8,7 +8,7 @@ use libp2p::{ identity, kad, multiaddr::Protocol, noise, - request_response::{self, ProtocolSupport, RequestId, ResponseChannel}, + request_response::{self, OutboundRequestId, ProtocolSupport, ResponseChannel}, swarm::{NetworkBehaviour, Swarm, SwarmEvent}, tcp, yamux, PeerId, }; @@ -175,7 +175,7 @@ pub(crate) struct EventLoop { pending_start_providing: HashMap>, pending_get_providers: HashMap>>, pending_request_file: - HashMap, Box>>>, + HashMap, Box>>>, } impl EventLoop { diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index 5494ed336bf..f0184fd2fba 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -32,7 +32,7 @@ use instant::Instant; use libp2p_core::{multiaddr::Protocol, ConnectedPoint, Endpoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_request_response::{ - self as request_response, ProtocolSupport, RequestId, ResponseChannel, + self as request_response, InboundRequestId, OutboundRequestId, ProtocolSupport, ResponseChannel, }; use libp2p_swarm::{ behaviour::{ @@ -187,14 +187,14 @@ pub struct Behaviour { PeerId, ( ProbeId, - RequestId, + InboundRequestId, Vec, ResponseChannel, ), >, // Ongoing outbound probes and mapped to the inner request id. - ongoing_outbound: HashMap, + ongoing_outbound: HashMap, // Connected peers with the observed address of each connection. // If the endpoint of a connection is relayed or not global (in case of Config::only_global_ips), diff --git a/protocols/autonat/src/behaviour/as_client.rs b/protocols/autonat/src/behaviour/as_client.rs index 5c6194491e4..a8b5a753a80 100644 --- a/protocols/autonat/src/behaviour/as_client.rs +++ b/protocols/autonat/src/behaviour/as_client.rs @@ -29,7 +29,7 @@ use futures_timer::Delay; use instant::Instant; use libp2p_core::Multiaddr; use libp2p_identity::PeerId; -use libp2p_request_response::{self as request_response, OutboundFailure, RequestId}; +use libp2p_request_response::{self as request_response, OutboundFailure, OutboundRequestId}; use libp2p_swarm::{ConnectionId, ListenAddresses, PollParameters, ToSwarm}; use rand::{seq::SliceRandom, thread_rng}; use std::{ @@ -91,7 +91,7 @@ pub(crate) struct AsClient<'a> { pub(crate) throttled_servers: &'a mut Vec<(PeerId, Instant)>, pub(crate) nat_status: &'a mut NatStatus, pub(crate) confidence: &'a mut usize, - pub(crate) ongoing_outbound: &'a mut HashMap, + pub(crate) ongoing_outbound: &'a mut HashMap, pub(crate) last_probe: &'a mut Option, pub(crate) schedule_probe: &'a mut Delay, pub(crate) listen_addresses: &'a ListenAddresses, @@ -118,7 +118,7 @@ impl<'a> HandleInnerEvent for AsClient<'a> { let probe_id = self .ongoing_outbound .remove(&request_id) - .expect("RequestId exists."); + .expect("OutboundRequestId exists."); let event = match response.result.clone() { Ok(address) => OutboundProbeEvent::Response { diff --git a/protocols/autonat/src/behaviour/as_server.rs b/protocols/autonat/src/behaviour/as_server.rs index 0cbe83f1245..df6e30c318a 100644 --- a/protocols/autonat/src/behaviour/as_server.rs +++ b/protocols/autonat/src/behaviour/as_server.rs @@ -26,7 +26,7 @@ use instant::Instant; use libp2p_core::{multiaddr::Protocol, Multiaddr}; use libp2p_identity::PeerId; use libp2p_request_response::{ - self as request_response, InboundFailure, RequestId, ResponseChannel, + self as request_response, InboundFailure, InboundRequestId, ResponseChannel, }; use libp2p_swarm::{ dial_opts::{DialOpts, PeerCondition}, @@ -85,7 +85,7 @@ pub(crate) struct AsServer<'a> { PeerId, ( ProbeId, - RequestId, + InboundRequestId, Vec, ResponseChannel, ), diff --git a/protocols/autonat/tests/test_client.rs b/protocols/autonat/tests/test_client.rs index 1911d1a6b2d..743f4cc1b51 100644 --- a/protocols/autonat/tests/test_client.rs +++ b/protocols/autonat/tests/test_client.rs @@ -61,7 +61,7 @@ async fn test_auto_probe() { match client.next_behaviour_event().await { Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => { assert!(peer.is_none()); - assert_eq!(error, OutboundProbeError::NoAddresses); + assert!(matches!(error, OutboundProbeError::NoAddresses)); } other => panic!("Unexpected behaviour event: {other:?}."), } @@ -181,10 +181,10 @@ async fn test_confidence() { peer, error, } if !test_public => { - assert_eq!( + assert!(matches!( error, OutboundProbeError::Response(ResponseError::DialError) - ); + )); (peer.unwrap(), probe_id) } other => panic!("Unexpected Outbound Event: {other:?}"), @@ -261,7 +261,7 @@ async fn test_throttle_server_period() { match client.next_behaviour_event().await { Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => { assert!(peer.is_none()); - assert_eq!(error, OutboundProbeError::NoServer); + assert!(matches!(error, OutboundProbeError::NoServer)); } other => panic!("Unexpected behaviour event: {other:?}."), } diff --git a/protocols/autonat/tests/test_server.rs b/protocols/autonat/tests/test_server.rs index 1bb5f624793..fa08bbf3471 100644 --- a/protocols/autonat/tests/test_server.rs +++ b/protocols/autonat/tests/test_server.rs @@ -168,7 +168,10 @@ async fn test_dial_error() { }) => { assert_eq!(probe_id, request_probe_id); assert_eq!(peer, client_id); - assert_eq!(error, InboundProbeError::Response(ResponseError::DialError)); + assert!(matches!( + error, + InboundProbeError::Response(ResponseError::DialError) + )); } other => panic!("Unexpected behaviour event: {other:?}."), } @@ -252,10 +255,10 @@ async fn test_throttle_peer_max() { }) => { assert_eq!(client_id, peer); assert_ne!(first_probe_id, probe_id); - assert_eq!( + assert!(matches!( error, InboundProbeError::Response(ResponseError::DialRefused) - ) + )); } other => panic!("Unexpected behaviour event: {other:?}."), }; diff --git a/protocols/perf/src/client.rs b/protocols/perf/src/client.rs index 6e34a9072f4..670b0e9d299 100644 --- a/protocols/perf/src/client.rs +++ b/protocols/perf/src/client.rs @@ -37,10 +37,10 @@ use crate::{protocol::Response, RunDuration, RunParams}; /// Connection identifier. #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] -pub struct RunId(request_response::RequestId); +pub struct RunId(request_response::OutboundRequestId); -impl From for RunId { - fn from(value: request_response::RequestId) -> Self { +impl From for RunId { + fn from(value: request_response::OutboundRequestId) -> Self { Self(value) } } diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs index 8459dc21c7e..ac2eba05829 100644 --- a/protocols/rendezvous/src/client.rs +++ b/protocols/rendezvous/src/client.rs @@ -26,7 +26,7 @@ use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use libp2p_core::{Endpoint, Multiaddr, PeerRecord}; use libp2p_identity::{Keypair, PeerId, SigningError}; -use libp2p_request_response::{ProtocolSupport, RequestId}; +use libp2p_request_response::{OutboundRequestId, ProtocolSupport}; use libp2p_swarm::{ ConnectionDenied, ConnectionId, ExternalAddresses, FromSwarm, NetworkBehaviour, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, @@ -41,8 +41,8 @@ pub struct Behaviour { keypair: Keypair, - waiting_for_register: HashMap, - waiting_for_discovery: HashMap)>, + waiting_for_register: HashMap, + waiting_for_discovery: HashMap)>, /// Hold addresses of all peers that we have discovered so far. /// @@ -337,7 +337,7 @@ impl NetworkBehaviour for Behaviour { } impl Behaviour { - fn event_for_outbound_failure(&mut self, req_id: &RequestId) -> Option { + fn event_for_outbound_failure(&mut self, req_id: &OutboundRequestId) -> Option { if let Some((rendezvous_node, namespace)) = self.waiting_for_register.remove(req_id) { return Some(Event::RegisterFailed { rendezvous_node, @@ -357,7 +357,11 @@ impl Behaviour { None } - fn handle_response(&mut self, request_id: &RequestId, response: Message) -> Option { + fn handle_response( + &mut self, + request_id: &OutboundRequestId, + response: Message, + ) -> Option { match response { RegisterResponse(Ok(ttl)) => { if let Some((rendezvous_node, namespace)) = diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index a328c3141ce..5c894bcd60f 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -32,6 +32,7 @@ json = ["dep:serde", "dep:serde_json", "libp2p-swarm/macros"] cbor = ["dep:serde", "dep:cbor4ii", "libp2p-swarm/macros"] [dev-dependencies] +anyhow = "1.0.75" async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.10.0" libp2p-noise = { workspace = true } diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 8a9d0c749ee..b754653907b 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -24,7 +24,7 @@ pub use protocol::ProtocolSupport; use crate::codec::Codec; use crate::handler::protocol::Protocol; -use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; +use crate::{InboundRequestId, OutboundRequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; use futures::channel::mpsc; use futures::{channel::oneshot, prelude::*}; @@ -67,13 +67,13 @@ where requested_outbound: VecDeque>, /// A channel for receiving inbound requests. inbound_receiver: mpsc::Receiver<( - RequestId, + InboundRequestId, TCodec::Request, oneshot::Sender, )>, /// The [`mpsc::Sender`] for the above receiver. Cloned for each inbound request. inbound_sender: mpsc::Sender<( - RequestId, + InboundRequestId, TCodec::Request, oneshot::Sender, )>, @@ -83,6 +83,12 @@ where worker_streams: futures_bounded::FuturesMap, io::Error>>, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +enum RequestId { + Inbound(InboundRequestId), + Outbound(OutboundRequestId), +} + impl Handler where TCodec: Codec + Send + Clone + 'static, @@ -112,6 +118,11 @@ where } } + /// Returns the next inbound request ID. + fn next_inbound_request_id(&mut self) -> InboundRequestId { + InboundRequestId(self.inbound_request_id.fetch_add(1, Ordering::Relaxed)) + } + fn on_fully_negotiated_inbound( &mut self, FullyNegotiatedInbound { @@ -123,7 +134,7 @@ where >, ) { let mut codec = self.codec.clone(); - let request_id = RequestId(self.inbound_request_id.fetch_add(1, Ordering::Relaxed)); + let request_id = self.next_inbound_request_id(); let mut sender = self.inbound_sender.clone(); let recv = async move { @@ -153,7 +164,7 @@ where if self .worker_streams - .try_push(request_id, recv.boxed()) + .try_push(RequestId::Inbound(request_id), recv.boxed()) .is_err() { log::warn!("Dropping inbound stream because we are at capacity") @@ -193,7 +204,7 @@ where if self .worker_streams - .try_push(request_id, send.boxed()) + .try_push(RequestId::Outbound(request_id), send.boxed()) .is_err() { log::warn!("Dropping outbound stream because we are at capacity") @@ -254,31 +265,34 @@ where { /// A request has been received. Request { - request_id: RequestId, + request_id: InboundRequestId, request: TCodec::Request, sender: oneshot::Sender, }, /// A response has been received. Response { - request_id: RequestId, + request_id: OutboundRequestId, response: TCodec::Response, }, /// A response to an inbound request has been sent. - ResponseSent(RequestId), + ResponseSent(InboundRequestId), /// A response to an inbound request was omitted as a result /// of dropping the response `sender` of an inbound `Request`. - ResponseOmission(RequestId), + ResponseOmission(InboundRequestId), /// An outbound request timed out while sending the request /// or waiting for the response. - OutboundTimeout(RequestId), + OutboundTimeout(OutboundRequestId), /// An outbound request failed to negotiate a mutually supported protocol. - OutboundUnsupportedProtocols(RequestId), + OutboundUnsupportedProtocols(OutboundRequestId), OutboundStreamFailed { - request_id: RequestId, + request_id: OutboundRequestId, error: io::Error, }, + /// An inbound request timed out while waiting for the request + /// or sending the response. + InboundTimeout(InboundRequestId), InboundStreamFailed { - request_id: RequestId, + request_id: InboundRequestId, error: io::Error, }, } @@ -322,6 +336,10 @@ impl fmt::Debug for Event { .field("request_id", &request_id) .field("error", &error) .finish(), + Event::InboundTimeout(request_id) => f + .debug_tuple("Event::InboundTimeout") + .field(request_id) + .finish(), Event::InboundStreamFailed { request_id, error } => f .debug_struct("Event::InboundStreamFailed") .field("request_id", &request_id) @@ -332,7 +350,7 @@ impl fmt::Debug for Event { } pub struct OutboundMessage { - pub(crate) request_id: RequestId, + pub(crate) request_id: OutboundRequestId, pub(crate) request: TCodec::Request, pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>, } @@ -381,22 +399,40 @@ where cx: &mut Context<'_>, ) -> Poll, (), Self::ToBehaviour, Self::Error>> { - loop { - match self.worker_streams.poll_unpin(cx) { - Poll::Ready((_, Ok(Ok(event)))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)) - } - Poll::Ready((id, Ok(Err(e)))) => { - log::debug!("Stream for request {id} failed: {e}"); - } - Poll::Ready((id, Err(futures_bounded::Timeout { .. }))) => { - log::debug!("Stream for request {id} timed out"); - } - Poll::Pending => break, + match self.worker_streams.poll_unpin(cx) { + Poll::Ready((_, Ok(Ok(event)))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + } + Poll::Ready((RequestId::Inbound(id), Ok(Err(e)))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundStreamFailed { + request_id: id, + error: e, + }, + )); + } + Poll::Ready((RequestId::Outbound(id), Ok(Err(e)))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundStreamFailed { + request_id: id, + error: e, + }, + )); + } + Poll::Ready((RequestId::Inbound(id), Err(futures_bounded::Timeout { .. }))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundTimeout(id), + )); + } + Poll::Ready((RequestId::Outbound(id), Err(futures_bounded::Timeout { .. }))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundTimeout(id), + )); } + Poll::Pending => {} } - // Drain pending events. + // Drain pending events that were produced by `worker_streams`. if let Some(event) = self.pending_events.pop_front() { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 1d810274a3a..e181d452a67 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -102,7 +102,7 @@ pub enum Message { /// A request message. Request { /// The ID of this request. - request_id: RequestId, + request_id: InboundRequestId, /// The request message. request: TRequest, /// The channel waiting for the response. @@ -117,7 +117,7 @@ pub enum Message { /// The ID of the request that produced this response. /// /// See [`Behaviour::send_request`]. - request_id: RequestId, + request_id: OutboundRequestId, /// The response message. response: TResponse, }, @@ -138,7 +138,7 @@ pub enum Event { /// The peer to whom the request was sent. peer: PeerId, /// The (local) ID of the failed request. - request_id: RequestId, + request_id: OutboundRequestId, /// The error that occurred. error: OutboundFailure, }, @@ -147,7 +147,7 @@ pub enum Event { /// The peer from whom the request was received. peer: PeerId, /// The ID of the failed inbound request. - request_id: RequestId, + request_id: InboundRequestId, /// The error that occurred. error: InboundFailure, }, @@ -159,7 +159,7 @@ pub enum Event { /// The peer to whom the response was sent. peer: PeerId, /// The ID of the inbound request whose response was sent. - request_id: RequestId, + request_id: InboundRequestId, }, } @@ -270,17 +270,27 @@ impl ResponseChannel { } } -/// The ID of an inbound or outbound request. +/// The ID of an inbound request. /// -/// Note: [`RequestId`]'s uniqueness is only guaranteed between two -/// inbound and likewise between two outbound requests. There is no -/// uniqueness guarantee in a set of both inbound and outbound -/// [`RequestId`]s nor in a set of inbound or outbound requests -/// originating from different [`Behaviour`]'s. +/// Note: [`InboundRequestId`]'s uniqueness is only guaranteed between +/// inbound requests of the same originating [`Behaviour`]. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub struct RequestId(u64); +pub struct InboundRequestId(u64); -impl fmt::Display for RequestId { +impl fmt::Display for InboundRequestId { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +/// The ID of an outbound request. +/// +/// Note: [`OutboundRequestId`]'s uniqueness is only guaranteed between +/// outbound requests of the same originating [`Behaviour`]. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct OutboundRequestId(u64); + +impl fmt::Display for OutboundRequestId { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}", self.0) } @@ -333,9 +343,9 @@ where /// The supported outbound protocols. outbound_protocols: SmallVec<[TCodec::Protocol; 2]>, /// The next (local) request ID. - next_request_id: RequestId, + next_outbound_request_id: OutboundRequestId, /// The next (inbound) request ID. - next_inbound_id: Arc, + next_inbound_request_id: Arc, /// The protocol configuration. config: Config, /// The protocol codec for reading and writing requests and responses. @@ -389,8 +399,8 @@ where Behaviour { inbound_protocols, outbound_protocols, - next_request_id: RequestId(1), - next_inbound_id: Arc::new(AtomicU64::new(1)), + next_outbound_request_id: OutboundRequestId(1), + next_inbound_request_id: Arc::new(AtomicU64::new(1)), config: cfg, codec, pending_events: VecDeque::new(), @@ -412,8 +422,8 @@ where /// > address discovery, or known addresses of peers must be /// > managed via [`Behaviour::add_address`] and /// > [`Behaviour::remove_address`]. - pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> RequestId { - let request_id = self.next_request_id(); + pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId { + let request_id = self.next_outbound_request_id(); let request = OutboundMessage { request_id, request, @@ -485,14 +495,14 @@ where /// Checks whether an outbound request to the peer with the provided /// [`PeerId`] initiated by [`Behaviour::send_request`] is still /// pending, i.e. waiting for a response. - pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &RequestId) -> bool { + pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &OutboundRequestId) -> bool { // Check if request is already sent on established connection. let est_conn = self .connected .get(peer) .map(|cs| { cs.iter() - .any(|c| c.pending_inbound_responses.contains(request_id)) + .any(|c| c.pending_outbound_responses.contains(request_id)) }) .unwrap_or(false); // Check if request is still pending to be sent. @@ -508,20 +518,20 @@ where /// Checks whether an inbound request from the peer with the provided /// [`PeerId`] is still pending, i.e. waiting for a response by the local /// node through [`Behaviour::send_response`]. - pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &RequestId) -> bool { + pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &InboundRequestId) -> bool { self.connected .get(peer) .map(|cs| { cs.iter() - .any(|c| c.pending_outbound_responses.contains(request_id)) + .any(|c| c.pending_inbound_responses.contains(request_id)) }) .unwrap_or(false) } - /// Returns the next request ID. - fn next_request_id(&mut self) -> RequestId { - let request_id = self.next_request_id; - self.next_request_id.0 += 1; + /// Returns the next outbound request ID. + fn next_outbound_request_id(&mut self) -> OutboundRequestId { + let request_id = self.next_outbound_request_id; + self.next_outbound_request_id.0 += 1; request_id } @@ -539,7 +549,7 @@ where } let ix = (request.request_id.0 as usize) % connections.len(); let conn = &mut connections[ix]; - conn.pending_inbound_responses.insert(request.request_id); + conn.pending_outbound_responses.insert(request.request_id); self.pending_events.push_back(ToSwarm::NotifyHandler { peer_id: *peer, handler: NotifyHandler::One(conn.id), @@ -554,13 +564,13 @@ where /// Remove pending outbound response for the given peer and connection. /// /// Returns `true` if the provided connection to the given peer is still - /// alive and the [`RequestId`] was previously present and is now removed. + /// alive and the [`OutboundRequestId`] was previously present and is now removed. /// Returns `false` otherwise. fn remove_pending_outbound_response( &mut self, peer: &PeerId, connection: ConnectionId, - request: RequestId, + request: OutboundRequestId, ) -> bool { self.get_connection_mut(peer, connection) .map(|c| c.pending_outbound_responses.remove(&request)) @@ -570,16 +580,16 @@ where /// Remove pending inbound response for the given peer and connection. /// /// Returns `true` if the provided connection to the given peer is still - /// alive and the [`RequestId`] was previously present and is now removed. + /// alive and the [`InboundRequestId`] was previously present and is now removed. /// Returns `false` otherwise. fn remove_pending_inbound_response( &mut self, peer: &PeerId, connection: ConnectionId, - request: &RequestId, + request: InboundRequestId, ) -> bool { self.get_connection_mut(peer, connection) - .map(|c| c.pending_inbound_responses.remove(request)) + .map(|c| c.pending_inbound_responses.remove(&request)) .unwrap_or(false) } @@ -645,7 +655,7 @@ where self.connected.remove(&peer_id); } - for request_id in connection.pending_outbound_responses { + for request_id in connection.pending_inbound_responses { self.pending_events .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { peer: peer_id, @@ -654,7 +664,7 @@ where })); } - for request_id in connection.pending_inbound_responses { + for request_id in connection.pending_outbound_responses { self.pending_events .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { peer: peer_id, @@ -698,7 +708,7 @@ where if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer) { for request in pending_requests { connection - .pending_inbound_responses + .pending_outbound_responses .insert(request.request_id); handler.on_behaviour_event(request); } @@ -726,7 +736,7 @@ where self.inbound_protocols.clone(), self.codec.clone(), self.config.request_timeout, - self.next_inbound_id.clone(), + self.next_inbound_request_id.clone(), self.config.max_concurrent_streams, ); @@ -769,7 +779,7 @@ where self.inbound_protocols.clone(), self.codec.clone(), self.config.request_timeout, - self.next_inbound_id.clone(), + self.next_inbound_request_id.clone(), self.config.max_concurrent_streams, ); @@ -814,7 +824,7 @@ where request_id, response, } => { - let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); + let removed = self.remove_pending_outbound_response(&peer, connection, request_id); debug_assert!( removed, "Expect request_id to be pending before receiving response.", @@ -831,35 +841,26 @@ where request_id, request, sender, - } => { - let channel = ResponseChannel { sender }; - let message = Message::Request { - request_id, - request, - channel, - }; - self.pending_events - .push_back(ToSwarm::GenerateEvent(Event::Message { peer, message })); + } => match self.get_connection_mut(&peer, connection) { + Some(connection) => { + let inserted = connection.pending_inbound_responses.insert(request_id); + debug_assert!(inserted, "Expect id of new request to be unknown."); - match self.get_connection_mut(&peer, connection) { - Some(connection) => { - let inserted = connection.pending_outbound_responses.insert(request_id); - debug_assert!(inserted, "Expect id of new request to be unknown."); - } - // Connection closed after `Event::Request` has been emitted. - None => { - self.pending_events.push_back(ToSwarm::GenerateEvent( - Event::InboundFailure { - peer, - request_id, - error: InboundFailure::ConnectionClosed, - }, - )); - } + let channel = ResponseChannel { sender }; + let message = Message::Request { + request_id, + request, + channel, + }; + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::Message { peer, message })); } - } + None => { + log::debug!("Connection ({connection}) closed after `Event::Request` ({request_id}) has been emitted."); + } + }, handler::Event::ResponseSent(request_id) => { - let removed = self.remove_pending_outbound_response(&peer, connection, request_id); + let removed = self.remove_pending_inbound_response(&peer, connection, request_id); debug_assert!( removed, "Expect request_id to be pending before response is sent." @@ -872,7 +873,7 @@ where })); } handler::Event::ResponseOmission(request_id) => { - let removed = self.remove_pending_outbound_response(&peer, connection, request_id); + let removed = self.remove_pending_inbound_response(&peer, connection, request_id); debug_assert!( removed, "Expect request_id to be pending before response is omitted.", @@ -886,7 +887,7 @@ where })); } handler::Event::OutboundTimeout(request_id) => { - let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); + let removed = self.remove_pending_outbound_response(&peer, connection, request_id); debug_assert!( removed, "Expect request_id to be pending before request times out." @@ -900,7 +901,7 @@ where })); } handler::Event::OutboundUnsupportedProtocols(request_id) => { - let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); + let removed = self.remove_pending_outbound_response(&peer, connection, request_id); debug_assert!( removed, "Expect request_id to be pending before failing to connect.", @@ -924,16 +925,35 @@ where error: OutboundFailure::Io(error), })) } + handler::Event::InboundTimeout(request_id) => { + let removed = self.remove_pending_inbound_response(&peer, connection, request_id); + + if removed { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { + peer, + request_id, + error: InboundFailure::Timeout, + })); + } else { + // This happens when timeout is emitted before `read_request` finishes. + log::debug!("Inbound request timeout for an unknown request_id ({request_id})"); + } + } handler::Event::InboundStreamFailed { request_id, error } => { - let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); - debug_assert!(removed, "Expect request_id to be pending upon failure"); + let removed = self.remove_pending_inbound_response(&peer, connection, request_id); - self.pending_events - .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { - peer, - request_id, - error: InboundFailure::Io(error), - })) + if removed { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { + peer, + request_id, + error: InboundFailure::Io(error), + })); + } else { + // This happens when `read_request` fails. + log::debug!("Inbound failure is reported for an unknown request_id ({request_id}): {error}"); + } } } } @@ -966,10 +986,10 @@ struct Connection { /// Pending outbound responses where corresponding inbound requests have /// been received on this connection and emitted via `poll` but have not yet /// been answered. - pending_outbound_responses: HashSet, + pending_outbound_responses: HashSet, /// Pending inbound responses for previously sent requests on this /// connection. - pending_inbound_responses: HashSet, + pending_inbound_responses: HashSet, } impl Connection { diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs new file mode 100644 index 00000000000..cf651d395f5 --- /dev/null +++ b/protocols/request-response/tests/error_reporting.rs @@ -0,0 +1,555 @@ +use anyhow::{bail, Result}; +use async_std::task::sleep; +use async_trait::async_trait; +use futures::prelude::*; +use libp2p_identity::PeerId; +use libp2p_request_response as request_response; +use libp2p_request_response::ProtocolSupport; +use libp2p_swarm::{StreamProtocol, Swarm}; +use libp2p_swarm_test::SwarmExt; +use request_response::{ + Codec, InboundFailure, InboundRequestId, OutboundFailure, OutboundRequestId, ResponseChannel, +}; +use std::pin::pin; +use std::time::Duration; +use std::{io, iter}; + +#[async_std::test] +async fn report_outbound_failure_on_read_response() { + let _ = env_logger::try_init(); + + let (peer1_id, mut swarm1) = new_swarm(); + let (peer2_id, mut swarm2) = new_swarm(); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + let server_task = async move { + let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(action, Action::FailOnReadResponse); + swarm1 + .behaviour_mut() + .send_response(resp_channel, Action::FailOnReadResponse) + .unwrap(); + + let (peer, req_id_done) = wait_response_sent(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(req_id_done, req_id); + + // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead + wait_no_events(&mut swarm1).await; + }; + + // Expects OutboundFailure::Io failure with `FailOnReadResponse` error + let client_task = async move { + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnReadResponse); + + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); + + let error = match error { + OutboundFailure::Io(e) => e, + e => panic!("Unexpected error: {e:?}"), + }; + + assert_eq!(error.kind(), io::ErrorKind::Other); + assert_eq!( + error.into_inner().unwrap().to_string(), + "FailOnReadResponse" + ); + }; + + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; +} + +#[async_std::test] +async fn report_outbound_failure_on_write_request() { + let _ = env_logger::try_init(); + + let (peer1_id, mut swarm1) = new_swarm(); + let (_peer2_id, mut swarm2) = new_swarm(); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + // Expects no events because `Event::Request` is produced after `read_request`. + // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead. + let server_task = wait_no_events(&mut swarm1); + + // Expects OutboundFailure::Io failure with `FailOnWriteRequest` error. + let client_task = async move { + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnWriteRequest); + + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); + + let error = match error { + OutboundFailure::Io(e) => e, + e => panic!("Unexpected error: {e:?}"), + }; + + assert_eq!(error.kind(), io::ErrorKind::Other); + assert_eq!( + error.into_inner().unwrap().to_string(), + "FailOnWriteRequest" + ); + }; + + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; +} + +#[async_std::test] +async fn report_outbound_timeout_on_read_response() { + let _ = env_logger::try_init(); + + // `swarm1` needs to have a bigger timeout to avoid racing + let (peer1_id, mut swarm1) = new_swarm_with_timeout(Duration::from_millis(200)); + let (peer2_id, mut swarm2) = new_swarm_with_timeout(Duration::from_millis(100)); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + let server_task = async move { + let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(action, Action::TimeoutOnReadResponse); + swarm1 + .behaviour_mut() + .send_response(resp_channel, Action::TimeoutOnReadResponse) + .unwrap(); + + let (peer, req_id_done) = wait_response_sent(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(req_id_done, req_id); + + // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead + wait_no_events(&mut swarm1).await; + }; + + // Expects OutboundFailure::Timeout + let client_task = async move { + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::TimeoutOnReadResponse); + + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); + assert!(matches!(error, OutboundFailure::Timeout)); + }; + + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; +} + +#[async_std::test] +async fn report_inbound_failure_on_read_request() { + let _ = env_logger::try_init(); + + let (peer1_id, mut swarm1) = new_swarm(); + let (_peer2_id, mut swarm2) = new_swarm(); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + // Expects no events because `Event::Request` is produced after `read_request`. + // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead. + let server_task = wait_no_events(&mut swarm1); + + // Expects io::ErrorKind::UnexpectedEof + let client_task = async move { + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnReadRequest); + + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); + + match error { + OutboundFailure::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof => {} + e => panic!("Unexpected error: {e:?}"), + }; + }; + + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; +} + +#[async_std::test] +async fn report_inbound_failure_on_write_response() { + let _ = env_logger::try_init(); + + let (peer1_id, mut swarm1) = new_swarm(); + let (peer2_id, mut swarm2) = new_swarm(); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + // Expects OutboundFailure::Io failure with `FailOnWriteResponse` error + let server_task = async move { + let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(action, Action::FailOnWriteResponse); + swarm1 + .behaviour_mut() + .send_response(resp_channel, Action::FailOnWriteResponse) + .unwrap(); + + let (peer, req_id_done, error) = wait_inbound_failure(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(req_id_done, req_id); + + let error = match error { + InboundFailure::Io(e) => e, + e => panic!("Unexpected error: {e:?}"), + }; + + assert_eq!(error.kind(), io::ErrorKind::Other); + assert_eq!( + error.into_inner().unwrap().to_string(), + "FailOnWriteResponse" + ); + }; + + // Expects OutboundFailure::ConnectionClosed or io::ErrorKind::UnexpectedEof + let client_task = async move { + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnWriteResponse); + + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); + + match error { + OutboundFailure::ConnectionClosed => { + // ConnectionClosed is allowed here because we mainly test the behavior + // of `server_task`. + } + OutboundFailure::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof => {} + e => panic!("Unexpected error: {e:?}"), + }; + + // Keep alive the task, so only `server_task` can finish + wait_no_events(&mut swarm2).await; + }; + + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; +} + +#[async_std::test] +async fn report_inbound_timeout_on_write_response() { + let _ = env_logger::try_init(); + + // `swarm2` needs to have a bigger timeout to avoid racing + let (peer1_id, mut swarm1) = new_swarm_with_timeout(Duration::from_millis(100)); + let (peer2_id, mut swarm2) = new_swarm_with_timeout(Duration::from_millis(200)); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + // Expects InboundFailure::Timeout + let server_task = async move { + let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(action, Action::TimeoutOnWriteResponse); + swarm1 + .behaviour_mut() + .send_response(resp_channel, Action::TimeoutOnWriteResponse) + .unwrap(); + + let (peer, req_id_done, error) = wait_inbound_failure(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(req_id_done, req_id); + assert!(matches!(error, InboundFailure::Timeout)); + }; + + // Expects OutboundFailure::ConnectionClosed or io::ErrorKind::UnexpectedEof + let client_task = async move { + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::TimeoutOnWriteResponse); + + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); + + match error { + OutboundFailure::ConnectionClosed => { + // ConnectionClosed is allowed here because we mainly test the behavior + // of `server_task`. + } + OutboundFailure::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof => {} + e => panic!("Unexpected error: {e:?}"), + } + + // Keep alive the task, so only `server_task` can finish + wait_no_events(&mut swarm2).await; + }; + + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; +} + +#[derive(Clone, Default)] +struct TestCodec; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum Action { + FailOnReadRequest, + FailOnReadResponse, + TimeoutOnReadResponse, + FailOnWriteRequest, + FailOnWriteResponse, + TimeoutOnWriteResponse, +} + +impl From for u8 { + fn from(value: Action) -> Self { + match value { + Action::FailOnReadRequest => 0, + Action::FailOnReadResponse => 1, + Action::TimeoutOnReadResponse => 2, + Action::FailOnWriteRequest => 3, + Action::FailOnWriteResponse => 4, + Action::TimeoutOnWriteResponse => 5, + } + } +} + +impl TryFrom for Action { + type Error = io::Error; + + fn try_from(value: u8) -> Result { + match value { + 0 => Ok(Action::FailOnReadRequest), + 1 => Ok(Action::FailOnReadResponse), + 2 => Ok(Action::TimeoutOnReadResponse), + 3 => Ok(Action::FailOnWriteRequest), + 4 => Ok(Action::FailOnWriteResponse), + 5 => Ok(Action::TimeoutOnWriteResponse), + _ => Err(io::Error::new(io::ErrorKind::Other, "invalid action")), + } + } +} + +#[async_trait] +impl Codec for TestCodec { + type Protocol = StreamProtocol; + type Request = Action; + type Response = Action; + + async fn read_request( + &mut self, + _protocol: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut buf = Vec::new(); + io.read_to_end(&mut buf).await?; + + if buf.is_empty() { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + assert_eq!(buf.len(), 1); + + match buf[0].try_into()? { + Action::FailOnReadRequest => { + Err(io::Error::new(io::ErrorKind::Other, "FailOnReadRequest")) + } + action => Ok(action), + } + } + + async fn read_response( + &mut self, + _protocol: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut buf = Vec::new(); + io.read_to_end(&mut buf).await?; + + if buf.is_empty() { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + assert_eq!(buf.len(), 1); + + match buf[0].try_into()? { + Action::FailOnReadResponse => { + Err(io::Error::new(io::ErrorKind::Other, "FailOnReadResponse")) + } + Action::TimeoutOnReadResponse => loop { + sleep(Duration::MAX).await; + }, + action => Ok(action), + } + } + + async fn write_request( + &mut self, + _protocol: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + match req { + Action::FailOnWriteRequest => { + Err(io::Error::new(io::ErrorKind::Other, "FailOnWriteRequest")) + } + action => { + let bytes = [action.into()]; + io.write_all(&bytes).await?; + Ok(()) + } + } + } + + async fn write_response( + &mut self, + _protocol: &Self::Protocol, + io: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + match res { + Action::FailOnWriteResponse => { + Err(io::Error::new(io::ErrorKind::Other, "FailOnWriteResponse")) + } + Action::TimeoutOnWriteResponse => loop { + sleep(Duration::MAX).await; + }, + action => { + let bytes = [action.into()]; + io.write_all(&bytes).await?; + Ok(()) + } + } + } +} + +fn new_swarm_with_timeout( + timeout: Duration, +) -> (PeerId, Swarm>) { + let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); + let cfg = request_response::Config::default().with_request_timeout(timeout); + + let swarm = + Swarm::new_ephemeral(|_| request_response::Behaviour::::new(protocols, cfg)); + let peed_id = *swarm.local_peer_id(); + + (peed_id, swarm) +} + +fn new_swarm() -> (PeerId, Swarm>) { + new_swarm_with_timeout(Duration::from_millis(100)) +} + +async fn wait_no_events(swarm: &mut Swarm>) { + loop { + if let Ok(ev) = swarm.select_next_some().await.try_into_behaviour_event() { + panic!("Unexpected event: {ev:?}") + } + } +} + +async fn wait_request( + swarm: &mut Swarm>, +) -> Result<(PeerId, InboundRequestId, Action, ResponseChannel)> { + loop { + match swarm.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::Message { + peer, + message: + request_response::Message::Request { + request_id, + request, + channel, + }, + }) => { + return Ok((peer, request_id, request, channel)); + } + Ok(ev) => bail!("Unexpected event: {ev:?}"), + Err(..) => {} + } + } +} + +async fn wait_response_sent( + swarm: &mut Swarm>, +) -> Result<(PeerId, InboundRequestId)> { + loop { + match swarm.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::ResponseSent { + peer, request_id, .. + }) => { + return Ok((peer, request_id)); + } + Ok(ev) => bail!("Unexpected event: {ev:?}"), + Err(..) => {} + } + } +} + +async fn wait_inbound_failure( + swarm: &mut Swarm>, +) -> Result<(PeerId, InboundRequestId, InboundFailure)> { + loop { + match swarm.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::InboundFailure { + peer, + request_id, + error, + }) => { + return Ok((peer, request_id, error)); + } + Ok(ev) => bail!("Unexpected event: {ev:?}"), + Err(..) => {} + } + } +} + +async fn wait_outbound_failure( + swarm: &mut Swarm>, +) -> Result<(PeerId, OutboundRequestId, OutboundFailure)> { + loop { + match swarm.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::OutboundFailure { + peer, + request_id, + error, + }) => { + return Ok((peer, request_id, error)); + } + Ok(ev) => bail!("Unexpected event: {ev:?}"), + Err(..) => {} + } + } +} diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index e0424488f48..37f21264d49 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -28,7 +28,7 @@ use libp2p_swarm::{StreamProtocol, Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; use rand::{self, Rng}; use serde::{Deserialize, Serialize}; -use std::iter; +use std::{io, iter}; #[async_std::test] #[cfg(feature = "cbor")] @@ -288,7 +288,10 @@ async fn emits_inbound_connection_closed_if_channel_is_dropped() { e => panic!("unexpected event from peer 2: {e:?}"), }; - assert_eq!(error, request_response::OutboundFailure::ConnectionClosed); + assert!(matches!( + error, + request_response::OutboundFailure::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof, + )); } // Simple Ping-Pong Protocol