Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kad): Limit number of active outbound streams #3287

Merged
3 changes: 3 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

- Remove lifetime from `RecordStore` and use GATs instead. See [PR 3239].

- Limit number of active outbound streams to 32. See [PR 3287].

- Bump MSRV to 1.65.0.

[PR 3239]: https://github.com/libp2p/rust-libp2p/pull/3239
[PR 3287]: https://github.com/libp2p/rust-libp2p/pull/3287

# 0.42.1

Expand Down
80 changes: 44 additions & 36 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ use libp2p_swarm::{
KeepAlive, NegotiatedSubstream, SubstreamProtocol,
};
use log::trace;
use std::collections::VecDeque;
use std::task::Waker;
use std::{
error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration,
};

const MAX_NUM_INBOUND_SUBSTREAMS: usize = 32;
const MAX_NUM_SUBSTREAMS: usize = 32;

/// A prototype from which [`KademliaHandler`]s can be constructed.
pub struct KademliaHandlerProto<T> {
Expand Down Expand Up @@ -92,6 +93,14 @@ pub struct KademliaHandler<TUserData> {
/// List of active outbound substreams with the state they are in.
outbound_substreams: SelectAll<OutboundSubstreamState<TUserData>>,

/// Number of outbound streams being upgraded right now.
num_requested_outbound_streams: usize,

/// List of outbound substreams that are waiting to become active next.
/// Contains the request we want to send, and the user data if we expect an answer.
requested_streams:
VecDeque<SubstreamProtocol<KademliaProtocolConfig, (KadRequestMsg, Option<TUserData>)>>,

/// List of active inbound substreams with the state they are in.
inbound_substreams: SelectAll<InboundSubstreamState<TUserData>>,

Expand Down Expand Up @@ -138,9 +147,6 @@ pub struct KademliaHandlerConfig {

/// State of an active outbound substream.
enum OutboundSubstreamState<TUserData> {
/// We haven't started opening the outgoing substream yet.
/// Contains the request we want to send, and the user data if we expect an answer.
PendingOpen(SubstreamProtocol<KademliaProtocolConfig, (KadRequestMsg, Option<TUserData>)>),
/// Waiting to send a message to the remote.
PendingSend(
KadOutStreamSink<NegotiatedSubstream>,
Expand Down Expand Up @@ -523,6 +529,8 @@ where
next_connec_unique_id: UniqueConnecId(0),
inbound_substreams: Default::default(),
outbound_substreams: Default::default(),
num_requested_outbound_streams: 0,
requested_streams: Default::default(),
keep_alive,
protocol_status: ProtocolStatus::Unconfirmed,
}
Expand All @@ -542,6 +550,7 @@ where
.push(OutboundSubstreamState::PendingSend(
protocol, msg, user_data,
));
self.num_requested_outbound_streams -= 1;
if let ProtocolStatus::Unconfirmed = self.protocol_status {
// Upon the first successfully negotiated substream, we know that the
// remote is configured with the same protocol name and we want
Expand Down Expand Up @@ -571,7 +580,7 @@ where
self.protocol_status = ProtocolStatus::Confirmed;
}

if self.inbound_substreams.len() == MAX_NUM_INBOUND_SUBSTREAMS {
if self.inbound_substreams.len() == MAX_NUM_SUBSTREAMS {
if let Some(s) = self.inbound_substreams.iter_mut().find(|s| {
matches!(
s,
Expand Down Expand Up @@ -623,6 +632,7 @@ where
self.outbound_substreams
.push(OutboundSubstreamState::ReportError(error.into(), user_data));
}
self.num_requested_outbound_streams -= 1;
}
}

Expand Down Expand Up @@ -666,23 +676,21 @@ where
}
KademliaHandlerIn::FindNodeReq { key, user_data } => {
let msg = KadRequestMsg::FindNode { key };
self.outbound_substreams
.push(OutboundSubstreamState::PendingOpen(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
)));
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
}
KademliaHandlerIn::FindNodeRes {
closer_peers,
request_id,
} => self.answer_pending_request(request_id, KadResponseMsg::FindNode { closer_peers }),
KademliaHandlerIn::GetProvidersReq { key, user_data } => {
let msg = KadRequestMsg::GetProviders { key };
self.outbound_substreams
.push(OutboundSubstreamState::PendingOpen(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
)));
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
}
KademliaHandlerIn::GetProvidersRes {
closer_peers,
Expand All @@ -697,27 +705,24 @@ where
),
KademliaHandlerIn::AddProvider { key, provider } => {
let msg = KadRequestMsg::AddProvider { key, provider };
self.outbound_substreams
.push(OutboundSubstreamState::PendingOpen(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, None),
)));
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, None),
));
}
KademliaHandlerIn::GetRecord { key, user_data } => {
let msg = KadRequestMsg::GetValue { key };
self.outbound_substreams
.push(OutboundSubstreamState::PendingOpen(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
)));
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
}
KademliaHandlerIn::PutRecord { record, user_data } => {
let msg = KadRequestMsg::PutValue { record };
self.outbound_substreams
.push(OutboundSubstreamState::PendingOpen(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
)));
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
}
KademliaHandlerIn::GetRecordRes {
record,
Expand Down Expand Up @@ -774,6 +779,15 @@ where
return Poll::Ready(event);
}

let num_in_progress_outbound_substreams =
self.outbound_substreams.len() + self.num_requested_outbound_streams;
if MAX_NUM_SUBSTREAMS.saturating_sub(num_in_progress_outbound_substreams) > 0 {
if let Some(protocol) = self.requested_streams.pop_front() {
self.num_requested_outbound_streams += 1;
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol });
}
}

if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() {
// We destroyed all substreams in this function.
self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout);
Expand Down Expand Up @@ -852,12 +866,6 @@ where

loop {
match std::mem::replace(this, OutboundSubstreamState::Poisoned) {
OutboundSubstreamState::PendingOpen(protocol) => {
*this = OutboundSubstreamState::Done;
return Poll::Ready(Some(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol,
}));
}
OutboundSubstreamState::PendingSend(mut substream, msg, user_data) => {
match substream.poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => match substream.start_send_unpin(msg) {
Expand Down