Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(request-response): Report failures #4701

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 57 additions & 8 deletions protocols/request-response/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,14 @@ where

inbound_request_id: Arc<AtomicU64>,

worker_streams: futures_bounded::FuturesMap<RequestId, Result<Event<TCodec>, io::Error>>,
worker_streams:
futures_bounded::FuturesMap<(RequestId, Direction), Result<Event<TCodec>, io::Error>>,
}

#[derive(Clone, Copy, PartialEq, Eq, Hash)]
enum Direction {
Inbound,
Outbound,
}

impl<TCodec> Handler<TCodec>
Expand Down Expand Up @@ -153,9 +160,12 @@ where

if self
.worker_streams
.try_push(request_id, recv.boxed())
.is_err()
.try_push((request_id, Direction::Inbound), recv.boxed())
.is_ok()
{
self.pending_events
.push_back(Event::IncomingRequest { request_id });
} else {
log::warn!("Dropping inbound stream because we are at capacity")
}
}
Expand Down Expand Up @@ -193,7 +203,7 @@ where

if self
.worker_streams
.try_push(request_id, send.boxed())
.try_push((request_id, Direction::Outbound), send.boxed())
.is_err()
{
log::warn!("Dropping outbound stream because we are at capacity")
Expand Down Expand Up @@ -252,6 +262,8 @@ pub enum Event<TCodec>
where
TCodec: Codec,
{
/// A request is going to be received.
IncomingRequest { request_id: RequestId },
/// A request has been received.
Request {
request_id: RequestId,
Expand Down Expand Up @@ -286,6 +298,10 @@ where
impl<TCodec: Codec> fmt::Debug for Event<TCodec> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Event::IncomingRequest { request_id } => f
.debug_struct("Event::IncomingRequest")
.field("request_id", request_id)
.finish(),
Event::Request {
request_id,
request: _,
Expand Down Expand Up @@ -381,22 +397,55 @@ where
cx: &mut Context<'_>,
) -> Poll<ConnectionHandlerEvent<Protocol<TCodec::Protocol>, (), Self::ToBehaviour, Self::Error>>
{
// Drain pending events that were produced before poll.
// E.g. `Event::IncomingRequest` produced by `on_fully_negotiated_inbound`.
//
// NOTE: This is needed because if `read_request` fails before reaching a
// `.await` point, the incoming request will never register and `debug_assert`
// in `InboundStreamFailed` will panic.
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
} else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
self.pending_events.shrink_to_fit();
}

loop {
match self.worker_streams.poll_unpin(cx) {
Poll::Ready((_, Ok(Ok(event)))) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event))
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
}
Poll::Ready((id, Ok(Err(e)))) => {
Poll::Ready(((id, direction), Ok(Err(e)))) => {
log::debug!("Stream for request {id} failed: {e}");

let event = match direction {
Direction::Inbound => Event::InboundStreamFailed {
request_id: id,
error: e,
},
Direction::Outbound => Event::OutboundStreamFailed {
request_id: id,
error: e,
},
};

// TODO: How should we handle errors produced after ConnectionClose event?
// `ConnectionClose` will generate its own error. But only one of the two
// should be forwarded to the upper layer.
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
}
Poll::Ready((id, Err(futures_bounded::Timeout { .. }))) => {
Poll::Ready(((id, direction), Err(futures_bounded::Timeout { .. }))) => {
log::debug!("Stream for request {id} timed out");

if direction == Direction::Outbound {
let event = Event::OutboundTimeout(id);
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
}
}
Poll::Pending => break,
}
}

// Drain pending events.
// Drain pending events that were produced by `worker_streams`.
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
} else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
Expand Down
54 changes: 23 additions & 31 deletions protocols/request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ where
.get(peer)
.map(|cs| {
cs.iter()
.any(|c| c.pending_inbound_responses.contains(request_id))
.any(|c| c.pending_outbound_responses.contains(request_id))
})
.unwrap_or(false);
// Check if request is still pending to be sent.
Expand All @@ -513,7 +513,7 @@ where
.get(peer)
.map(|cs| {
cs.iter()
.any(|c| c.pending_outbound_responses.contains(request_id))
.any(|c| c.pending_inbound_responses.contains(request_id))
})
.unwrap_or(false)
}
Expand All @@ -539,7 +539,7 @@ where
}
let ix = (request.request_id.0 as usize) % connections.len();
let conn = &mut connections[ix];
conn.pending_inbound_responses.insert(request.request_id);
conn.pending_outbound_responses.insert(request.request_id);
self.pending_events.push_back(ToSwarm::NotifyHandler {
peer_id: *peer,
handler: NotifyHandler::One(conn.id),
Expand Down Expand Up @@ -576,10 +576,10 @@ where
&mut self,
peer: &PeerId,
connection: ConnectionId,
request: &RequestId,
request: RequestId,
) -> bool {
self.get_connection_mut(peer, connection)
.map(|c| c.pending_inbound_responses.remove(request))
.map(|c| c.pending_inbound_responses.remove(&request))
.unwrap_or(false)
}

Expand Down Expand Up @@ -645,7 +645,7 @@ where
self.connected.remove(&peer_id);
}

for request_id in connection.pending_outbound_responses {
for request_id in connection.pending_inbound_responses {
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
peer: peer_id,
Expand All @@ -654,7 +654,7 @@ where
}));
}

for request_id in connection.pending_inbound_responses {
for request_id in connection.pending_outbound_responses {
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
peer: peer_id,
Expand Down Expand Up @@ -698,7 +698,7 @@ where
if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer) {
for request in pending_requests {
connection
.pending_inbound_responses
.pending_outbound_responses
.insert(request.request_id);
handler.on_behaviour_event(request);
}
Expand Down Expand Up @@ -814,7 +814,7 @@ where
request_id,
response,
} => {
let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
debug_assert!(
removed,
"Expect request_id to be pending before receiving response.",
Expand All @@ -827,6 +827,15 @@ where
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::Message { peer, message }));
}
handler::Event::IncomingRequest { request_id } => {
// This event was emmited before `Handler::pool` and it is handled before
// its task gets polled (i.e. `worker_tasks`). That means at this point no
// error should be emmited, because it will be generated by the task itself.
if let Some(connection) = self.get_connection_mut(&peer, connection) {
let inserted = connection.pending_inbound_responses.insert(request_id);
debug_assert!(inserted, "Expect id of new request to be unknown.");
}
}
handler::Event::Request {
request_id,
request,
Expand All @@ -840,26 +849,9 @@ where
};
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::Message { peer, message }));

match self.get_connection_mut(&peer, connection) {
Some(connection) => {
let inserted = connection.pending_outbound_responses.insert(request_id);
debug_assert!(inserted, "Expect id of new request to be unknown.");
}
// Connection closed after `Event::Request` has been emitted.
None => {
self.pending_events.push_back(ToSwarm::GenerateEvent(
Event::InboundFailure {
peer,
request_id,
error: InboundFailure::ConnectionClosed,
},
));
}
}
}
handler::Event::ResponseSent(request_id) => {
let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
let removed = self.remove_pending_inbound_response(&peer, connection, request_id);
debug_assert!(
removed,
"Expect request_id to be pending before response is sent."
Expand All @@ -872,7 +864,7 @@ where
}));
}
handler::Event::ResponseOmission(request_id) => {
let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
let removed = self.remove_pending_inbound_response(&peer, connection, request_id);
debug_assert!(
removed,
"Expect request_id to be pending before response is omitted.",
Expand All @@ -886,7 +878,7 @@ where
}));
}
handler::Event::OutboundTimeout(request_id) => {
let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
debug_assert!(
removed,
"Expect request_id to be pending before request times out."
Expand All @@ -900,7 +892,7 @@ where
}));
}
handler::Event::OutboundUnsupportedProtocols(request_id) => {
let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
debug_assert!(
removed,
"Expect request_id to be pending before failing to connect.",
Expand All @@ -925,7 +917,7 @@ where
}))
}
handler::Event::InboundStreamFailed { request_id, error } => {
let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
let removed = self.remove_pending_inbound_response(&peer, connection, request_id);
debug_assert!(removed, "Expect request_id to be pending upon failure");

self.pending_events
Expand Down
Loading