From 67fef8b4dec86b3c5eac501eb62c30391fd7ff7d Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 5 May 2022 19:14:03 +0200 Subject: [PATCH 1/7] protocols/kad/: Split into outbound and inbound substreams --- protocols/kad/src/handler.rs | 395 +++++++++++++++++++++++------------ 1 file changed, 265 insertions(+), 130 deletions(-) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 30d22c9067d..3ecbf2227a6 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -84,8 +84,11 @@ pub struct KademliaHandler { /// Next unique ID of a connection. next_connec_unique_id: UniqueConnecId, - /// List of active substreams with the state they are in. - substreams: Vec>, + /// List of active outbound substreams with the state they are in. + outbound_substreams: Vec>, + + /// List of active inbound substreams with the state they are in. + inbound_substreams: Vec, /// Until when to keep the connection alive. keep_alive: KeepAlive, @@ -125,8 +128,8 @@ pub struct KademliaHandlerConfig { pub idle_timeout: Duration, } -/// State of an active substream, opened either by us or by the remote. -enum SubstreamState { +/// State of an active outbound substream. +enum OutboundSubstreamState { /// We haven't started opening the outgoing substream yet. /// Contains the request we want to send, and the user data if we expect an answer. OutPendingOpen(KadRequestMsg, Option), @@ -145,6 +148,10 @@ enum SubstreamState { OutReportError(KademliaHandlerQueryErr, TUserData), /// The substream is being closed. OutClosing(KadOutStreamSink), +} + +/// State of an active inbound substream. +enum InboundSubstreamState { /// Waiting for a request from the remote. InWaitingMessage(UniqueConnecId, KadInStreamSink), /// Waiting for the user to send a `KademliaHandlerIn` event containing the response. @@ -161,29 +168,38 @@ enum SubstreamState { InClosing(KadInStreamSink), } -impl SubstreamState { +impl OutboundSubstreamState { /// Tries to close the substream. /// /// If the substream is not ready to be closed, returns it back. fn try_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { match self { - SubstreamState::OutPendingOpen(_, _) | SubstreamState::OutReportError(_, _) => { - Poll::Ready(()) - } - SubstreamState::OutPendingSend(ref mut stream, _, _) - | SubstreamState::OutPendingFlush(ref mut stream, _) - | SubstreamState::OutWaitingAnswer(ref mut stream, _) - | SubstreamState::OutClosing(ref mut stream) => { + OutboundSubstreamState::OutPendingOpen(_, _) + | OutboundSubstreamState::OutReportError(_, _) => Poll::Ready(()), + OutboundSubstreamState::OutPendingSend(ref mut stream, _, _) + | OutboundSubstreamState::OutPendingFlush(ref mut stream, _) + | OutboundSubstreamState::OutWaitingAnswer(ref mut stream, _) + | OutboundSubstreamState::OutClosing(ref mut stream) => { match Sink::poll_close(Pin::new(stream), cx) { Poll::Ready(_) => Poll::Ready(()), Poll::Pending => Poll::Pending, } } - SubstreamState::InWaitingMessage(_, ref mut stream) - | SubstreamState::InWaitingUser(_, ref mut stream) - | SubstreamState::InPendingSend(_, ref mut stream, _) - | SubstreamState::InPendingFlush(_, ref mut stream) - | SubstreamState::InClosing(ref mut stream) => { + } + } +} + +impl InboundSubstreamState { + /// Tries to close the substream. + /// + /// If the substream is not ready to be closed, returns it back. + fn try_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { + match self { + InboundSubstreamState::InWaitingMessage(_, ref mut stream) + | InboundSubstreamState::InWaitingUser(_, ref mut stream) + | InboundSubstreamState::InPendingSend(_, ref mut stream, _) + | InboundSubstreamState::InPendingFlush(_, ref mut stream) + | InboundSubstreamState::InClosing(ref mut stream) => { match Sink::poll_close(Pin::new(stream), cx) { Poll::Ready(_) => Poll::Ready(()), Poll::Pending => Poll::Pending, @@ -459,7 +475,8 @@ impl KademliaHandler { config, endpoint, next_connec_unique_id: UniqueConnecId(0), - substreams: Vec::new(), + inbound_substreams: Default::default(), + outbound_substreams: Default::default(), keep_alive, protocol_status: ProtocolStatus::Unconfirmed, } @@ -493,8 +510,10 @@ where protocol: >::Output, (msg, user_data): Self::OutboundOpenInfo, ) { - self.substreams - .push(SubstreamState::OutPendingSend(protocol, msg, user_data)); + self.outbound_substreams + .push(OutboundSubstreamState::OutPendingSend( + protocol, msg, user_data, + )); if let ProtocolStatus::Unconfirmed = self.protocol_status { // Upon the first successfully negotiated substream, we know that the // remote is configured with the same protocol name and we want @@ -518,8 +537,12 @@ where debug_assert!(self.config.allow_listening); let connec_unique_id = self.next_connec_unique_id; self.next_connec_unique_id.0 += 1; - self.substreams - .push(SubstreamState::InWaitingMessage(connec_unique_id, protocol)); + // TODO: Limit number + self.inbound_substreams + .push(InboundSubstreamState::InWaitingMessage( + connec_unique_id, + protocol, + )); if let ProtocolStatus::Unconfirmed = self.protocol_status { // Upon the first successfully negotiated substream, we know that the // remote is configured with the same protocol name and we want @@ -531,68 +554,83 @@ where fn inject_event(&mut self, message: KademliaHandlerIn) { match message { KademliaHandlerIn::Reset(request_id) => { - let pos = self.substreams.iter().position(|state| match state { - SubstreamState::InWaitingUser(conn_id, _) => { - conn_id == &request_id.connec_unique_id - } - _ => false, - }); + let pos = self + .inbound_substreams + .iter() + .position(|state| match state { + InboundSubstreamState::InWaitingUser(conn_id, _) => { + conn_id == &request_id.connec_unique_id + } + _ => false, + }); if let Some(pos) = pos { // TODO: we don't properly close down the substream let waker = futures::task::noop_waker(); let mut cx = Context::from_waker(&waker); - let _ = self.substreams.remove(pos).try_close(&mut cx); + let _ = self.inbound_substreams.remove(pos).try_close(&mut cx); } } KademliaHandlerIn::FindNodeReq { key, user_data } => { let msg = KadRequestMsg::FindNode { key }; - self.substreams - .push(SubstreamState::OutPendingOpen(msg, Some(user_data))); + self.outbound_substreams + .push(OutboundSubstreamState::OutPendingOpen(msg, Some(user_data))); } KademliaHandlerIn::FindNodeRes { closer_peers, request_id, } => { - let pos = self.substreams.iter().position(|state| match state { - SubstreamState::InWaitingUser(ref conn_id, _) => { - conn_id == &request_id.connec_unique_id - } - _ => false, - }); + let pos = self + .inbound_substreams + .iter() + .position(|state| match state { + InboundSubstreamState::InWaitingUser(ref conn_id, _) => { + conn_id == &request_id.connec_unique_id + } + _ => false, + }); if let Some(pos) = pos { - let (conn_id, substream) = match self.substreams.remove(pos) { - SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), + let (conn_id, substream) = match self.inbound_substreams.remove(pos) { + InboundSubstreamState::InWaitingUser(conn_id, substream) => { + (conn_id, substream) + } _ => unreachable!(), }; let msg = KadResponseMsg::FindNode { closer_peers }; - self.substreams - .push(SubstreamState::InPendingSend(conn_id, substream, msg)); + self.inbound_substreams + .push(InboundSubstreamState::InPendingSend( + conn_id, substream, msg, + )); } } KademliaHandlerIn::GetProvidersReq { key, user_data } => { let msg = KadRequestMsg::GetProviders { key }; - self.substreams - .push(SubstreamState::OutPendingOpen(msg, Some(user_data))); + self.outbound_substreams + .push(OutboundSubstreamState::OutPendingOpen(msg, Some(user_data))); } KademliaHandlerIn::GetProvidersRes { closer_peers, provider_peers, request_id, } => { - let pos = self.substreams.iter().position(|state| match state { - SubstreamState::InWaitingUser(ref conn_id, _) - if conn_id == &request_id.connec_unique_id => - { - true - } - _ => false, - }); + let pos = self + .inbound_substreams + .iter() + .position(|state| match state { + InboundSubstreamState::InWaitingUser(ref conn_id, _) + if conn_id == &request_id.connec_unique_id => + { + true + } + _ => false, + }); if let Some(pos) = pos { - let (conn_id, substream) = match self.substreams.remove(pos) { - SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), + let (conn_id, substream) = match self.inbound_substreams.remove(pos) { + InboundSubstreamState::InWaitingUser(conn_id, substream) => { + (conn_id, substream) + } _ => unreachable!(), }; @@ -600,40 +638,47 @@ where closer_peers, provider_peers, }; - self.substreams - .push(SubstreamState::InPendingSend(conn_id, substream, msg)); + self.inbound_substreams + .push(InboundSubstreamState::InPendingSend( + conn_id, substream, msg, + )); } } KademliaHandlerIn::AddProvider { key, provider } => { let msg = KadRequestMsg::AddProvider { key, provider }; - self.substreams - .push(SubstreamState::OutPendingOpen(msg, None)); + self.outbound_substreams + .push(OutboundSubstreamState::OutPendingOpen(msg, None)); } KademliaHandlerIn::GetRecord { key, user_data } => { let msg = KadRequestMsg::GetValue { key }; - self.substreams - .push(SubstreamState::OutPendingOpen(msg, Some(user_data))); + self.outbound_substreams + .push(OutboundSubstreamState::OutPendingOpen(msg, Some(user_data))); } KademliaHandlerIn::PutRecord { record, user_data } => { let msg = KadRequestMsg::PutValue { record }; - self.substreams - .push(SubstreamState::OutPendingOpen(msg, Some(user_data))); + self.outbound_substreams + .push(OutboundSubstreamState::OutPendingOpen(msg, Some(user_data))); } KademliaHandlerIn::GetRecordRes { record, closer_peers, request_id, } => { - let pos = self.substreams.iter().position(|state| match state { - SubstreamState::InWaitingUser(ref conn_id, _) => { - conn_id == &request_id.connec_unique_id - } - _ => false, - }); + let pos = self + .inbound_substreams + .iter() + .position(|state| match state { + InboundSubstreamState::InWaitingUser(ref conn_id, _) => { + conn_id == &request_id.connec_unique_id + } + _ => false, + }); if let Some(pos) = pos { - let (conn_id, substream) = match self.substreams.remove(pos) { - SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), + let (conn_id, substream) = match self.inbound_substreams.remove(pos) { + InboundSubstreamState::InWaitingUser(conn_id, substream) => { + (conn_id, substream) + } _ => unreachable!(), }; @@ -641,8 +686,10 @@ where record, closer_peers, }; - self.substreams - .push(SubstreamState::InPendingSend(conn_id, substream, msg)); + self.inbound_substreams + .push(InboundSubstreamState::InPendingSend( + conn_id, substream, msg, + )); } } KademliaHandlerIn::PutRecordRes { @@ -650,24 +697,31 @@ where request_id, value, } => { - let pos = self.substreams.iter().position(|state| match state { - SubstreamState::InWaitingUser(ref conn_id, _) - if conn_id == &request_id.connec_unique_id => - { - true - } - _ => false, - }); + let pos = self + .inbound_substreams + .iter() + .position(|state| match state { + InboundSubstreamState::InWaitingUser(ref conn_id, _) + if conn_id == &request_id.connec_unique_id => + { + true + } + _ => false, + }); if let Some(pos) = pos { - let (conn_id, substream) = match self.substreams.remove(pos) { - SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), + let (conn_id, substream) = match self.inbound_substreams.remove(pos) { + InboundSubstreamState::InWaitingUser(conn_id, substream) => { + (conn_id, substream) + } _ => unreachable!(), }; let msg = KadResponseMsg::PutValue { key, value }; - self.substreams - .push(SubstreamState::InPendingSend(conn_id, substream, msg)); + self.inbound_substreams + .push(InboundSubstreamState::InPendingSend( + conn_id, substream, msg, + )); } } } @@ -681,8 +735,11 @@ where // TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't // continue trying if let Some(user_data) = user_data { - self.substreams - .push(SubstreamState::OutReportError(error.into(), user_data)); + self.outbound_substreams + .push(OutboundSubstreamState::OutReportError( + error.into(), + user_data, + )); } } @@ -701,7 +758,7 @@ where Self::Error, >, > { - if self.substreams.is_empty() { + if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() { return Poll::Pending; } @@ -714,25 +771,26 @@ where )); } - // We remove each element from `substreams` one by one and add them back. - for n in (0..self.substreams.len()).rev() { - let mut substream = self.substreams.swap_remove(n); + // We remove each element from `outbound_substreams` one by one and add them back. + for n in (0..self.outbound_substreams.len()).rev() { + let mut substream = self.outbound_substreams.swap_remove(n); loop { - match advance_substream(substream, self.config.protocol_config.clone(), cx) { + match advance_outbound_substream(substream, self.config.protocol_config.clone(), cx) + { (Some(new_state), Some(event), _) => { - self.substreams.push(new_state); + self.outbound_substreams.push(new_state); return Poll::Ready(event); } (None, Some(event), _) => { - if self.substreams.is_empty() { + if self.outbound_substreams.is_empty() { self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout); } return Poll::Ready(event); } (Some(new_state), None, false) => { - self.substreams.push(new_state); + self.outbound_substreams.push(new_state); break; } (Some(new_state), None, true) => { @@ -746,7 +804,39 @@ where } } - if self.substreams.is_empty() { + // We remove each element from `inbound_substreams` one by one and add them back. + for n in (0..self.inbound_substreams.len()).rev() { + let mut substream = self.inbound_substreams.swap_remove(n); + + loop { + match advance_inbound_substream(substream, cx) { + (Some(new_state), Some(event), _) => { + self.inbound_substreams.push(new_state); + return Poll::Ready(event); + } + (None, Some(event), _) => { + if self.inbound_substreams.is_empty() { + self.keep_alive = + KeepAlive::Until(Instant::now() + self.config.idle_timeout); + } + return Poll::Ready(event); + } + (Some(new_state), None, false) => { + self.inbound_substreams.push(new_state); + break; + } + (Some(new_state), None, true) => { + substream = new_state; + continue; + } + (None, None, _) => { + break; + } + } + } + } + + if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() { // We destroyed all substreams in this function. self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout); } else { @@ -767,16 +857,16 @@ impl Default for KademliaHandlerConfig { } } -/// Advances one substream. +/// Advances one outbound substream. /// /// Returns the new state for that substream, an event to generate, and whether the substream /// should be polled again. -fn advance_substream( - state: SubstreamState, +fn advance_outbound_substream( + state: OutboundSubstreamState, upgrade: KademliaProtocolConfig, cx: &mut Context<'_>, ) -> ( - Option>, + Option>, Option< ConnectionHandlerEvent< KademliaProtocolConfig, @@ -788,17 +878,19 @@ fn advance_substream( bool, ) { match state { - SubstreamState::OutPendingOpen(msg, user_data) => { + OutboundSubstreamState::OutPendingOpen(msg, user_data) => { let ev = ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(upgrade, (msg, user_data)), }; (None, Some(ev), false) } - SubstreamState::OutPendingSend(mut substream, msg, user_data) => { + OutboundSubstreamState::OutPendingSend(mut substream, msg, user_data) => { match Sink::poll_ready(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) { Ok(()) => ( - Some(SubstreamState::OutPendingFlush(substream, user_data)), + Some(OutboundSubstreamState::OutPendingFlush( + substream, user_data, + )), None, true, ), @@ -814,7 +906,9 @@ fn advance_substream( } }, Poll::Pending => ( - Some(SubstreamState::OutPendingSend(substream, msg, user_data)), + Some(OutboundSubstreamState::OutPendingSend( + substream, msg, user_data, + )), None, false, ), @@ -830,21 +924,29 @@ fn advance_substream( } } } - SubstreamState::OutPendingFlush(mut substream, user_data) => { + OutboundSubstreamState::OutPendingFlush(mut substream, user_data) => { match Sink::poll_flush(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => { if let Some(user_data) = user_data { ( - Some(SubstreamState::OutWaitingAnswer(substream, user_data)), + Some(OutboundSubstreamState::OutWaitingAnswer( + substream, user_data, + )), None, true, ) } else { - (Some(SubstreamState::OutClosing(substream)), None, true) + ( + Some(OutboundSubstreamState::OutClosing(substream)), + None, + true, + ) } } Poll::Pending => ( - Some(SubstreamState::OutPendingFlush(substream, user_data)), + Some(OutboundSubstreamState::OutPendingFlush( + substream, user_data, + )), None, false, ), @@ -860,10 +962,10 @@ fn advance_substream( } } } - SubstreamState::OutWaitingAnswer(mut substream, user_data) => { + OutboundSubstreamState::OutWaitingAnswer(mut substream, user_data) => { match Stream::poll_next(Pin::new(&mut substream), cx) { Poll::Ready(Some(Ok(msg))) => { - let new_state = SubstreamState::OutClosing(substream); + let new_state = OutboundSubstreamState::OutClosing(substream); let event = process_kad_response(msg, user_data); ( Some(new_state), @@ -872,7 +974,9 @@ fn advance_substream( ) } Poll::Pending => ( - Some(SubstreamState::OutWaitingAnswer(substream, user_data)), + Some(OutboundSubstreamState::OutWaitingAnswer( + substream, user_data, + )), None, false, ), @@ -892,31 +996,62 @@ fn advance_substream( } } } - SubstreamState::OutReportError(error, user_data) => { + OutboundSubstreamState::OutReportError(error, user_data) => { let event = KademliaHandlerEvent::QueryError { error, user_data }; (None, Some(ConnectionHandlerEvent::Custom(event)), false) } - SubstreamState::OutClosing(mut stream) => match Sink::poll_close(Pin::new(&mut stream), cx) - { - Poll::Ready(Ok(())) => (None, None, false), - Poll::Pending => (Some(SubstreamState::OutClosing(stream)), None, false), - Poll::Ready(Err(_)) => (None, None, false), - }, - SubstreamState::InWaitingMessage(id, mut substream) => { + OutboundSubstreamState::OutClosing(mut stream) => { + match Sink::poll_close(Pin::new(&mut stream), cx) { + Poll::Ready(Ok(())) => (None, None, false), + Poll::Pending => ( + Some(OutboundSubstreamState::OutClosing(stream)), + None, + false, + ), + Poll::Ready(Err(_)) => (None, None, false), + } + } + } +} +/// Advances one inbound substream. +/// +/// Returns the new state for that substream, an event to generate, and whether the substream +/// should be polled again. +fn advance_inbound_substream( + state: InboundSubstreamState, + cx: &mut Context<'_>, +) -> ( + Option, + Option< + ConnectionHandlerEvent< + KademliaProtocolConfig, + (KadRequestMsg, Option), + KademliaHandlerEvent, + io::Error, + >, + >, + bool, +) { + match state { + InboundSubstreamState::InWaitingMessage(id, mut substream) => { match Stream::poll_next(Pin::new(&mut substream), cx) { Poll::Ready(Some(Ok(msg))) => { if let Ok(ev) = process_kad_request(msg, id) { ( - Some(SubstreamState::InWaitingUser(id, substream)), + Some(InboundSubstreamState::InWaitingUser(id, substream)), Some(ConnectionHandlerEvent::Custom(ev)), false, ) } else { - (Some(SubstreamState::InClosing(substream)), None, true) + ( + Some(InboundSubstreamState::InClosing(substream)), + None, + true, + ) } } Poll::Pending => ( - Some(SubstreamState::InWaitingMessage(id, substream)), + Some(InboundSubstreamState::InWaitingMessage(id, substream)), None, false, ), @@ -930,48 +1065,48 @@ fn advance_substream( } } } - SubstreamState::InWaitingUser(id, substream) => ( - Some(SubstreamState::InWaitingUser(id, substream)), + InboundSubstreamState::InWaitingUser(id, substream) => ( + Some(InboundSubstreamState::InWaitingUser(id, substream)), None, false, ), - SubstreamState::InPendingSend(id, mut substream, msg) => { + InboundSubstreamState::InPendingSend(id, mut substream, msg) => { match Sink::poll_ready(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) { Ok(()) => ( - Some(SubstreamState::InPendingFlush(id, substream)), + Some(InboundSubstreamState::InPendingFlush(id, substream)), None, true, ), Err(_) => (None, None, false), }, Poll::Pending => ( - Some(SubstreamState::InPendingSend(id, substream, msg)), + Some(InboundSubstreamState::InPendingSend(id, substream, msg)), None, false, ), Poll::Ready(Err(_)) => (None, None, false), } } - SubstreamState::InPendingFlush(id, mut substream) => { + InboundSubstreamState::InPendingFlush(id, mut substream) => { match Sink::poll_flush(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => ( - Some(SubstreamState::InWaitingMessage(id, substream)), + Some(InboundSubstreamState::InWaitingMessage(id, substream)), None, true, ), Poll::Pending => ( - Some(SubstreamState::InPendingFlush(id, substream)), + Some(InboundSubstreamState::InPendingFlush(id, substream)), None, false, ), Poll::Ready(Err(_)) => (None, None, false), } } - SubstreamState::InClosing(mut stream) => { + InboundSubstreamState::InClosing(mut stream) => { match Sink::poll_close(Pin::new(&mut stream), cx) { Poll::Ready(Ok(())) => (None, None, false), - Poll::Pending => (Some(SubstreamState::InClosing(stream)), None, false), + Poll::Pending => (Some(InboundSubstreamState::InClosing(stream)), None, false), Poll::Ready(Err(_)) => (None, None, false), } } From 9f152b38a8493ccef9fefc80ea6ff7b9ab53807b Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 5 May 2022 19:25:58 +0200 Subject: [PATCH 2/7] protocols/kad: Limit # of inbound substreams to 32 A remote node may still send more than 32 requests in parallel by using more than one connection or by sending more than one request per stream. --- protocols/kad/src/handler.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 3ecbf2227a6..388c2b756f4 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -39,6 +39,8 @@ use std::{ error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration, }; +const MAX_NUM_INBOUND_SUBSTREAMS: usize = 32; + /// A prototype from which [`KademliaHandler`]s can be constructed. pub struct KademliaHandlerProto { config: KademliaHandlerConfig, @@ -534,6 +536,10 @@ where EitherOutput::Second(p) => void::unreachable(p), }; + if self.inbound_substreams.len() == MAX_NUM_INBOUND_SUBSTREAMS { + panic!("New inbound substream exceeds inbound substream limit. Dropping."); + } + debug_assert!(self.config.allow_listening); let connec_unique_id = self.next_connec_unique_id; self.next_connec_unique_id.0 += 1; From cfa733cb91d568d406cf2be5ce3d3e96e76c8449 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 10 May 2022 11:05:54 +0200 Subject: [PATCH 3/7] protocols/kad: Favor new substreams over old ones waiting for reuse When a new inbound substream comes in and the limit of total inbound substreams is hit, try to find an old inbound substream waiting to be reused. In such case, replace the old with the new. In case no such old substream exists, drop the new one. --- protocols/kad/src/handler.rs | 130 +++++++++++++++++++++++------------ 1 file changed, 85 insertions(+), 45 deletions(-) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 388c2b756f4..ce9b598acf9 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -155,7 +155,12 @@ enum OutboundSubstreamState { /// State of an active inbound substream. enum InboundSubstreamState { /// Waiting for a request from the remote. - InWaitingMessage(UniqueConnecId, KadInStreamSink), + InWaitingMessage { + /// Whether it is the first message to be awaited on this stream. + first: bool, + connection_id: UniqueConnecId, + substream: KadInStreamSink, + }, /// Waiting for the user to send a `KademliaHandlerIn` event containing the response. InWaitingUser(UniqueConnecId, KadInStreamSink), /// Waiting to send an answer back to the remote. @@ -197,7 +202,10 @@ impl InboundSubstreamState { /// If the substream is not ready to be closed, returns it back. fn try_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { match self { - InboundSubstreamState::InWaitingMessage(_, ref mut stream) + InboundSubstreamState::InWaitingMessage { + substream: ref mut stream, + .. + } | InboundSubstreamState::InWaitingUser(_, ref mut stream) | InboundSubstreamState::InPendingSend(_, ref mut stream, _) | InboundSubstreamState::InPendingFlush(_, ref mut stream) @@ -536,25 +544,44 @@ where EitherOutput::Second(p) => void::unreachable(p), }; + if let ProtocolStatus::Unconfirmed = self.protocol_status { + // Upon the first successfully negotiated substream, we know that the + // remote is configured with the same protocol name and we want + // the behaviour to add this peer to the routing table, if possible. + self.protocol_status = ProtocolStatus::Confirmed; + } + if self.inbound_substreams.len() == MAX_NUM_INBOUND_SUBSTREAMS { - panic!("New inbound substream exceeds inbound substream limit. Dropping."); + if let Some(position) = self.inbound_substreams.iter().position(|s| { + matches!( + s, + // An inbound substream waiting to be reused. + InboundSubstreamState::InWaitingMessage { first: false, .. } + ) + }) { + self.inbound_substreams.remove(position); + log::warn!( + "New inbound substream exceeds inbound substream limit. \ + Removed older substream waiting to be reused." + ) + } else { + log::warn!( + "New inbound substream exceeds inbound substream limit. \ + No older substream waiting to be reused. Dropping new substream." + ); + return; + } } debug_assert!(self.config.allow_listening); let connec_unique_id = self.next_connec_unique_id; self.next_connec_unique_id.0 += 1; - // TODO: Limit number self.inbound_substreams - .push(InboundSubstreamState::InWaitingMessage( - connec_unique_id, - protocol, - )); - if let ProtocolStatus::Unconfirmed = self.protocol_status { - // Upon the first successfully negotiated substream, we know that the - // remote is configured with the same protocol name and we want - // the behaviour to add this peer to the routing table, if possible. - self.protocol_status = ProtocolStatus::Confirmed; - } + .push(InboundSubstreamState::InWaitingMessage { + first: true, + connection_id: connec_unique_id, + substream: protocol, + }); } fn inject_event(&mut self, message: KademliaHandlerIn) { @@ -1039,38 +1066,47 @@ fn advance_inbound_substream( bool, ) { match state { - InboundSubstreamState::InWaitingMessage(id, mut substream) => { - match Stream::poll_next(Pin::new(&mut substream), cx) { - Poll::Ready(Some(Ok(msg))) => { - if let Ok(ev) = process_kad_request(msg, id) { - ( - Some(InboundSubstreamState::InWaitingUser(id, substream)), - Some(ConnectionHandlerEvent::Custom(ev)), - false, - ) - } else { - ( - Some(InboundSubstreamState::InClosing(substream)), - None, - true, - ) - } - } - Poll::Pending => ( - Some(InboundSubstreamState::InWaitingMessage(id, substream)), - None, - false, - ), - Poll::Ready(None) => { - trace!("Inbound substream: EOF"); - (None, None, false) - } - Poll::Ready(Some(Err(e))) => { - trace!("Inbound substream error: {:?}", e); - (None, None, false) + InboundSubstreamState::InWaitingMessage { + first, + connection_id, + mut substream, + } => match Stream::poll_next(Pin::new(&mut substream), cx) { + Poll::Ready(Some(Ok(msg))) => { + if let Ok(ev) = process_kad_request(msg, connection_id) { + ( + Some(InboundSubstreamState::InWaitingUser( + connection_id, + substream, + )), + Some(ConnectionHandlerEvent::Custom(ev)), + false, + ) + } else { + ( + Some(InboundSubstreamState::InClosing(substream)), + None, + true, + ) } } - } + Poll::Pending => ( + Some(InboundSubstreamState::InWaitingMessage { + first, + connection_id, + substream, + }), + None, + false, + ), + Poll::Ready(None) => { + trace!("Inbound substream: EOF"); + (None, None, false) + } + Poll::Ready(Some(Err(e))) => { + trace!("Inbound substream error: {:?}", e); + (None, None, false) + } + }, InboundSubstreamState::InWaitingUser(id, substream) => ( Some(InboundSubstreamState::InWaitingUser(id, substream)), None, @@ -1097,7 +1133,11 @@ fn advance_inbound_substream( InboundSubstreamState::InPendingFlush(id, mut substream) => { match Sink::poll_flush(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => ( - Some(InboundSubstreamState::InWaitingMessage(id, substream)), + Some(InboundSubstreamState::InWaitingMessage { + first: false, + connection_id: id, + substream, + }), None, true, ), From a6db3bb6d513a233e3f9ee4dd7c724373e3c5a66 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 May 2022 18:00:01 +0200 Subject: [PATCH 4/7] protocols/kad/src/handler: Drop Out* and In* prefixes --- protocols/kad/src/handler.rs | 180 ++++++++++++++--------------------- 1 file changed, 73 insertions(+), 107 deletions(-) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index ce9b598acf9..1682f5beec6 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -134,45 +134,45 @@ pub struct KademliaHandlerConfig { enum OutboundSubstreamState { /// We haven't started opening the outgoing substream yet. /// Contains the request we want to send, and the user data if we expect an answer. - OutPendingOpen(KadRequestMsg, Option), + PendingOpen(KadRequestMsg, Option), /// Waiting to send a message to the remote. - OutPendingSend( + PendingSend( KadOutStreamSink, KadRequestMsg, Option, ), /// Waiting to flush the substream so that the data arrives to the remote. - OutPendingFlush(KadOutStreamSink, Option), + PendingFlush(KadOutStreamSink, Option), /// Waiting for an answer back from the remote. // TODO: add timeout - OutWaitingAnswer(KadOutStreamSink, TUserData), + WaitingAnswer(KadOutStreamSink, TUserData), /// An error happened on the substream and we should report the error to the user. - OutReportError(KademliaHandlerQueryErr, TUserData), + ReportError(KademliaHandlerQueryErr, TUserData), /// The substream is being closed. - OutClosing(KadOutStreamSink), + Closing(KadOutStreamSink), } /// State of an active inbound substream. enum InboundSubstreamState { /// Waiting for a request from the remote. - InWaitingMessage { + WaitingMessage { /// Whether it is the first message to be awaited on this stream. first: bool, connection_id: UniqueConnecId, substream: KadInStreamSink, }, - /// Waiting for the user to send a `KademliaHandlerIn` event containing the response. - InWaitingUser(UniqueConnecId, KadInStreamSink), + /// Waiting for the user to send a [`KademliaHandlerIn`] event containing the response. + WaitingUser(UniqueConnecId, KadInStreamSink), /// Waiting to send an answer back to the remote. - InPendingSend( + PendingSend( UniqueConnecId, KadInStreamSink, KadResponseMsg, ), /// Waiting to flush an answer back to the remote. - InPendingFlush(UniqueConnecId, KadInStreamSink), + PendingFlush(UniqueConnecId, KadInStreamSink), /// The substream is being closed. - InClosing(KadInStreamSink), + Closing(KadInStreamSink), } impl OutboundSubstreamState { @@ -181,12 +181,12 @@ impl OutboundSubstreamState { /// If the substream is not ready to be closed, returns it back. fn try_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { match self { - OutboundSubstreamState::OutPendingOpen(_, _) - | OutboundSubstreamState::OutReportError(_, _) => Poll::Ready(()), - OutboundSubstreamState::OutPendingSend(ref mut stream, _, _) - | OutboundSubstreamState::OutPendingFlush(ref mut stream, _) - | OutboundSubstreamState::OutWaitingAnswer(ref mut stream, _) - | OutboundSubstreamState::OutClosing(ref mut stream) => { + OutboundSubstreamState::PendingOpen(_, _) + | OutboundSubstreamState::ReportError(_, _) => Poll::Ready(()), + OutboundSubstreamState::PendingSend(ref mut stream, _, _) + | OutboundSubstreamState::PendingFlush(ref mut stream, _) + | OutboundSubstreamState::WaitingAnswer(ref mut stream, _) + | OutboundSubstreamState::Closing(ref mut stream) => { match Sink::poll_close(Pin::new(stream), cx) { Poll::Ready(_) => Poll::Ready(()), Poll::Pending => Poll::Pending, @@ -202,14 +202,14 @@ impl InboundSubstreamState { /// If the substream is not ready to be closed, returns it back. fn try_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { match self { - InboundSubstreamState::InWaitingMessage { + InboundSubstreamState::WaitingMessage { substream: ref mut stream, .. } - | InboundSubstreamState::InWaitingUser(_, ref mut stream) - | InboundSubstreamState::InPendingSend(_, ref mut stream, _) - | InboundSubstreamState::InPendingFlush(_, ref mut stream) - | InboundSubstreamState::InClosing(ref mut stream) => { + | InboundSubstreamState::WaitingUser(_, ref mut stream) + | InboundSubstreamState::PendingSend(_, ref mut stream, _) + | InboundSubstreamState::PendingFlush(_, ref mut stream) + | InboundSubstreamState::Closing(ref mut stream) => { match Sink::poll_close(Pin::new(stream), cx) { Poll::Ready(_) => Poll::Ready(()), Poll::Pending => Poll::Pending, @@ -521,7 +521,7 @@ where (msg, user_data): Self::OutboundOpenInfo, ) { self.outbound_substreams - .push(OutboundSubstreamState::OutPendingSend( + .push(OutboundSubstreamState::PendingSend( protocol, msg, user_data, )); if let ProtocolStatus::Unconfirmed = self.protocol_status { @@ -556,7 +556,7 @@ where matches!( s, // An inbound substream waiting to be reused. - InboundSubstreamState::InWaitingMessage { first: false, .. } + InboundSubstreamState::WaitingMessage { first: false, .. } ) }) { self.inbound_substreams.remove(position); @@ -577,7 +577,7 @@ where let connec_unique_id = self.next_connec_unique_id; self.next_connec_unique_id.0 += 1; self.inbound_substreams - .push(InboundSubstreamState::InWaitingMessage { + .push(InboundSubstreamState::WaitingMessage { first: true, connection_id: connec_unique_id, substream: protocol, @@ -591,7 +591,7 @@ where .inbound_substreams .iter() .position(|state| match state { - InboundSubstreamState::InWaitingUser(conn_id, _) => { + InboundSubstreamState::WaitingUser(conn_id, _) => { conn_id == &request_id.connec_unique_id } _ => false, @@ -606,7 +606,7 @@ where KademliaHandlerIn::FindNodeReq { key, user_data } => { let msg = KadRequestMsg::FindNode { key }; self.outbound_substreams - .push(OutboundSubstreamState::OutPendingOpen(msg, Some(user_data))); + .push(OutboundSubstreamState::PendingOpen(msg, Some(user_data))); } KademliaHandlerIn::FindNodeRes { closer_peers, @@ -616,7 +616,7 @@ where .inbound_substreams .iter() .position(|state| match state { - InboundSubstreamState::InWaitingUser(ref conn_id, _) => { + InboundSubstreamState::WaitingUser(ref conn_id, _) => { conn_id == &request_id.connec_unique_id } _ => false, @@ -624,7 +624,7 @@ where if let Some(pos) = pos { let (conn_id, substream) = match self.inbound_substreams.remove(pos) { - InboundSubstreamState::InWaitingUser(conn_id, substream) => { + InboundSubstreamState::WaitingUser(conn_id, substream) => { (conn_id, substream) } _ => unreachable!(), @@ -632,15 +632,13 @@ where let msg = KadResponseMsg::FindNode { closer_peers }; self.inbound_substreams - .push(InboundSubstreamState::InPendingSend( - conn_id, substream, msg, - )); + .push(InboundSubstreamState::PendingSend(conn_id, substream, msg)); } } KademliaHandlerIn::GetProvidersReq { key, user_data } => { let msg = KadRequestMsg::GetProviders { key }; self.outbound_substreams - .push(OutboundSubstreamState::OutPendingOpen(msg, Some(user_data))); + .push(OutboundSubstreamState::PendingOpen(msg, Some(user_data))); } KademliaHandlerIn::GetProvidersRes { closer_peers, @@ -651,7 +649,7 @@ where .inbound_substreams .iter() .position(|state| match state { - InboundSubstreamState::InWaitingUser(ref conn_id, _) + InboundSubstreamState::WaitingUser(ref conn_id, _) if conn_id == &request_id.connec_unique_id => { true @@ -661,7 +659,7 @@ where if let Some(pos) = pos { let (conn_id, substream) = match self.inbound_substreams.remove(pos) { - InboundSubstreamState::InWaitingUser(conn_id, substream) => { + InboundSubstreamState::WaitingUser(conn_id, substream) => { (conn_id, substream) } _ => unreachable!(), @@ -672,25 +670,23 @@ where provider_peers, }; self.inbound_substreams - .push(InboundSubstreamState::InPendingSend( - conn_id, substream, msg, - )); + .push(InboundSubstreamState::PendingSend(conn_id, substream, msg)); } } KademliaHandlerIn::AddProvider { key, provider } => { let msg = KadRequestMsg::AddProvider { key, provider }; self.outbound_substreams - .push(OutboundSubstreamState::OutPendingOpen(msg, None)); + .push(OutboundSubstreamState::PendingOpen(msg, None)); } KademliaHandlerIn::GetRecord { key, user_data } => { let msg = KadRequestMsg::GetValue { key }; self.outbound_substreams - .push(OutboundSubstreamState::OutPendingOpen(msg, Some(user_data))); + .push(OutboundSubstreamState::PendingOpen(msg, Some(user_data))); } KademliaHandlerIn::PutRecord { record, user_data } => { let msg = KadRequestMsg::PutValue { record }; self.outbound_substreams - .push(OutboundSubstreamState::OutPendingOpen(msg, Some(user_data))); + .push(OutboundSubstreamState::PendingOpen(msg, Some(user_data))); } KademliaHandlerIn::GetRecordRes { record, @@ -701,7 +697,7 @@ where .inbound_substreams .iter() .position(|state| match state { - InboundSubstreamState::InWaitingUser(ref conn_id, _) => { + InboundSubstreamState::WaitingUser(ref conn_id, _) => { conn_id == &request_id.connec_unique_id } _ => false, @@ -709,7 +705,7 @@ where if let Some(pos) = pos { let (conn_id, substream) = match self.inbound_substreams.remove(pos) { - InboundSubstreamState::InWaitingUser(conn_id, substream) => { + InboundSubstreamState::WaitingUser(conn_id, substream) => { (conn_id, substream) } _ => unreachable!(), @@ -720,9 +716,7 @@ where closer_peers, }; self.inbound_substreams - .push(InboundSubstreamState::InPendingSend( - conn_id, substream, msg, - )); + .push(InboundSubstreamState::PendingSend(conn_id, substream, msg)); } } KademliaHandlerIn::PutRecordRes { @@ -734,7 +728,7 @@ where .inbound_substreams .iter() .position(|state| match state { - InboundSubstreamState::InWaitingUser(ref conn_id, _) + InboundSubstreamState::WaitingUser(ref conn_id, _) if conn_id == &request_id.connec_unique_id => { true @@ -744,7 +738,7 @@ where if let Some(pos) = pos { let (conn_id, substream) = match self.inbound_substreams.remove(pos) { - InboundSubstreamState::InWaitingUser(conn_id, substream) => { + InboundSubstreamState::WaitingUser(conn_id, substream) => { (conn_id, substream) } _ => unreachable!(), @@ -752,9 +746,7 @@ where let msg = KadResponseMsg::PutValue { key, value }; self.inbound_substreams - .push(InboundSubstreamState::InPendingSend( - conn_id, substream, msg, - )); + .push(InboundSubstreamState::PendingSend(conn_id, substream, msg)); } } } @@ -769,10 +761,7 @@ where // continue trying if let Some(user_data) = user_data { self.outbound_substreams - .push(OutboundSubstreamState::OutReportError( - error.into(), - user_data, - )); + .push(OutboundSubstreamState::ReportError(error.into(), user_data)); } } @@ -911,19 +900,17 @@ fn advance_outbound_substream( bool, ) { match state { - OutboundSubstreamState::OutPendingOpen(msg, user_data) => { + OutboundSubstreamState::PendingOpen(msg, user_data) => { let ev = ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(upgrade, (msg, user_data)), }; (None, Some(ev), false) } - OutboundSubstreamState::OutPendingSend(mut substream, msg, user_data) => { + OutboundSubstreamState::PendingSend(mut substream, msg, user_data) => { match Sink::poll_ready(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) { Ok(()) => ( - Some(OutboundSubstreamState::OutPendingFlush( - substream, user_data, - )), + Some(OutboundSubstreamState::PendingFlush(substream, user_data)), None, true, ), @@ -939,7 +926,7 @@ fn advance_outbound_substream( } }, Poll::Pending => ( - Some(OutboundSubstreamState::OutPendingSend( + Some(OutboundSubstreamState::PendingSend( substream, msg, user_data, )), None, @@ -957,29 +944,21 @@ fn advance_outbound_substream( } } } - OutboundSubstreamState::OutPendingFlush(mut substream, user_data) => { + OutboundSubstreamState::PendingFlush(mut substream, user_data) => { match Sink::poll_flush(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => { if let Some(user_data) = user_data { ( - Some(OutboundSubstreamState::OutWaitingAnswer( - substream, user_data, - )), + Some(OutboundSubstreamState::WaitingAnswer(substream, user_data)), None, true, ) } else { - ( - Some(OutboundSubstreamState::OutClosing(substream)), - None, - true, - ) + (Some(OutboundSubstreamState::Closing(substream)), None, true) } } Poll::Pending => ( - Some(OutboundSubstreamState::OutPendingFlush( - substream, user_data, - )), + Some(OutboundSubstreamState::PendingFlush(substream, user_data)), None, false, ), @@ -995,10 +974,10 @@ fn advance_outbound_substream( } } } - OutboundSubstreamState::OutWaitingAnswer(mut substream, user_data) => { + OutboundSubstreamState::WaitingAnswer(mut substream, user_data) => { match Stream::poll_next(Pin::new(&mut substream), cx) { Poll::Ready(Some(Ok(msg))) => { - let new_state = OutboundSubstreamState::OutClosing(substream); + let new_state = OutboundSubstreamState::Closing(substream); let event = process_kad_response(msg, user_data); ( Some(new_state), @@ -1007,9 +986,7 @@ fn advance_outbound_substream( ) } Poll::Pending => ( - Some(OutboundSubstreamState::OutWaitingAnswer( - substream, user_data, - )), + Some(OutboundSubstreamState::WaitingAnswer(substream, user_data)), None, false, ), @@ -1029,18 +1006,14 @@ fn advance_outbound_substream( } } } - OutboundSubstreamState::OutReportError(error, user_data) => { + OutboundSubstreamState::ReportError(error, user_data) => { let event = KademliaHandlerEvent::QueryError { error, user_data }; (None, Some(ConnectionHandlerEvent::Custom(event)), false) } - OutboundSubstreamState::OutClosing(mut stream) => { + OutboundSubstreamState::Closing(mut stream) => { match Sink::poll_close(Pin::new(&mut stream), cx) { Poll::Ready(Ok(())) => (None, None, false), - Poll::Pending => ( - Some(OutboundSubstreamState::OutClosing(stream)), - None, - false, - ), + Poll::Pending => (Some(OutboundSubstreamState::Closing(stream)), None, false), Poll::Ready(Err(_)) => (None, None, false), } } @@ -1066,7 +1039,7 @@ fn advance_inbound_substream( bool, ) { match state { - InboundSubstreamState::InWaitingMessage { + InboundSubstreamState::WaitingMessage { first, connection_id, mut substream, @@ -1074,23 +1047,16 @@ fn advance_inbound_substream( Poll::Ready(Some(Ok(msg))) => { if let Ok(ev) = process_kad_request(msg, connection_id) { ( - Some(InboundSubstreamState::InWaitingUser( - connection_id, - substream, - )), + Some(InboundSubstreamState::WaitingUser(connection_id, substream)), Some(ConnectionHandlerEvent::Custom(ev)), false, ) } else { - ( - Some(InboundSubstreamState::InClosing(substream)), - None, - true, - ) + (Some(InboundSubstreamState::Closing(substream)), None, true) } } Poll::Pending => ( - Some(InboundSubstreamState::InWaitingMessage { + Some(InboundSubstreamState::WaitingMessage { first, connection_id, substream, @@ -1107,33 +1073,33 @@ fn advance_inbound_substream( (None, None, false) } }, - InboundSubstreamState::InWaitingUser(id, substream) => ( - Some(InboundSubstreamState::InWaitingUser(id, substream)), + InboundSubstreamState::WaitingUser(id, substream) => ( + Some(InboundSubstreamState::WaitingUser(id, substream)), None, false, ), - InboundSubstreamState::InPendingSend(id, mut substream, msg) => { + InboundSubstreamState::PendingSend(id, mut substream, msg) => { match Sink::poll_ready(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) { Ok(()) => ( - Some(InboundSubstreamState::InPendingFlush(id, substream)), + Some(InboundSubstreamState::PendingFlush(id, substream)), None, true, ), Err(_) => (None, None, false), }, Poll::Pending => ( - Some(InboundSubstreamState::InPendingSend(id, substream, msg)), + Some(InboundSubstreamState::PendingSend(id, substream, msg)), None, false, ), Poll::Ready(Err(_)) => (None, None, false), } } - InboundSubstreamState::InPendingFlush(id, mut substream) => { + InboundSubstreamState::PendingFlush(id, mut substream) => { match Sink::poll_flush(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => ( - Some(InboundSubstreamState::InWaitingMessage { + Some(InboundSubstreamState::WaitingMessage { first: false, connection_id: id, substream, @@ -1142,17 +1108,17 @@ fn advance_inbound_substream( true, ), Poll::Pending => ( - Some(InboundSubstreamState::InPendingFlush(id, substream)), + Some(InboundSubstreamState::PendingFlush(id, substream)), None, false, ), Poll::Ready(Err(_)) => (None, None, false), } } - InboundSubstreamState::InClosing(mut stream) => { + InboundSubstreamState::Closing(mut stream) => { match Sink::poll_close(Pin::new(&mut stream), cx) { Poll::Ready(Ok(())) => (None, None, false), - Poll::Pending => (Some(InboundSubstreamState::InClosing(stream)), None, false), + Poll::Pending => (Some(InboundSubstreamState::Closing(stream)), None, false), Poll::Ready(Err(_)) => (None, None, false), } } From f05306239252dfaf13a50f0047a83bfda2735e45 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 7 Jun 2022 22:16:15 +0200 Subject: [PATCH 5/7] protocols/kad: Print remote peer ID when exceeding limit --- protocols/kad/src/handler.rs | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 1682f5beec6..bcadb57f44c 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -59,8 +59,8 @@ impl KademliaHandlerProto { impl IntoConnectionHandler for KademliaHandlerProto { type Handler = KademliaHandler; - fn into_handler(self, _: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { - KademliaHandler::new(self.config, endpoint.clone()) + fn into_handler(self, remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { + KademliaHandler::new(self.config, endpoint.clone(), *remote_peer_id) } fn inbound_protocol(&self) -> ::InboundProtocol { @@ -99,6 +99,9 @@ pub struct KademliaHandler { /// is associated with. endpoint: ConnectedPoint, + /// The [`PeerId`] of the remote. + remote_peer_id: PeerId, + /// The current state of protocol confirmation. protocol_status: ProtocolStatus, } @@ -478,12 +481,17 @@ struct UniqueConnecId(u64); impl KademliaHandler { /// Create a [`KademliaHandler`] using the given configuration. - pub fn new(config: KademliaHandlerConfig, endpoint: ConnectedPoint) -> Self { + pub fn new( + config: KademliaHandlerConfig, + endpoint: ConnectedPoint, + remote_peer_id: PeerId, + ) -> Self { let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout); KademliaHandler { config, endpoint, + remote_peer_id, next_connec_unique_id: UniqueConnecId(0), inbound_substreams: Default::default(), outbound_substreams: Default::default(), @@ -561,13 +569,15 @@ where }) { self.inbound_substreams.remove(position); log::warn!( - "New inbound substream exceeds inbound substream limit. \ - Removed older substream waiting to be reused." + "New inbound substream to {:?} exceeds inbound substream limit. \ + Removed older substream waiting to be reused.", + self.remote_peer_id, ) } else { log::warn!( - "New inbound substream exceeds inbound substream limit. \ - No older substream waiting to be reused. Dropping new substream." + "New inbound substream to {:?} exceeds inbound substream limit. \ + No older substream waiting to be reused. Dropping new substream.", + self.remote_peer_id, ); return; } From 95f4178e82c57c2b5064bda5c51f8f0e508031ac Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 8 Jun 2022 22:08:01 +0200 Subject: [PATCH 6/7] protocols/kad: Bump version and add changelog entry --- protocols/kad/CHANGELOG.md | 4 ++++ protocols/kad/Cargo.toml | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 7140fd1238f..a2397a0b3a2 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.37.1 - unreleased + +- Limit # of inbound substreams to 32. + # 0.37.0 - Update to `libp2p-core` `v0.33.0`. diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index a7410670e62..7480632b4fe 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-kad" edition = "2021" rust-version = "1.56.1" description = "Kademlia protocol for libp2p" -version = "0.37.0" +version = "0.37.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" From c177484cad4274b6ffb5551a4ddf6742bfd702be Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 8 Jun 2022 22:16:07 +0200 Subject: [PATCH 7/7] Update protocols/kad/CHANGELOG.md --- protocols/kad/CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index a2397a0b3a2..77e1cd31be3 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,6 +1,8 @@ # 0.37.1 - unreleased -- Limit # of inbound substreams to 32. +- Limit # of inbound streams to 32. [See PR 2699]. + +[PR 2699]: https://github.com/libp2p/rust-libp2p/pull/2699 # 0.37.0