Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kad] Update local routing table on new connections after protocol confirmation. #1821

Merged
merged 3 commits into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# 0.25.0 [unreleased]

- Upon newly established connections, delay routing table
updates until after the configured protocol name has
been confirmed by the connection handler, i.e. until
after at least one substream has been successfully
negotiated. In configurations with different protocol names,
this avoids undesirable nodes being included in the
local routing table at least temporarily.
[PR 1821](https://github.com/libp2p/rust-libp2p/pull/1821).

- Update dependencies.

# 0.24.0 [2020-10-16]
Expand Down
44 changes: 28 additions & 16 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ mod test;

use crate::K_VALUE;
use crate::addresses::Addresses;
use crate::handler::{KademliaHandler, KademliaHandlerConfig, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn};
use crate::handler::{
KademliaHandlerProto,
KademliaHandlerConfig,
KademliaRequestId,
KademliaHandlerEvent,
KademliaHandlerIn
};
use crate::jobs::*;
use crate::kbucket::{self, KBucketsTable, NodeStatus};
use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer};
Expand All @@ -38,7 +44,6 @@ use libp2p_swarm::{
NetworkBehaviourAction,
NotifyHandler,
PollParameters,
ProtocolsHandler
};
use log::{info, debug, warn};
use smallvec::SmallVec;
Expand Down Expand Up @@ -1428,11 +1433,11 @@ where
for<'a> TStore: RecordStore<'a>,
TStore: Send + 'static,
{
type ProtocolsHandler = KademliaHandler<QueryId>;
type ProtocolsHandler = KademliaHandlerProto<QueryId>;
type OutEvent = KademliaEvent;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
KademliaHandler::new(KademliaHandlerConfig {
KademliaHandlerProto::new(KademliaHandlerConfig {
protocol_config: self.protocol_config.clone(),
allow_listening: true,
idle_timeout: self.connection_idle_timeout,
Expand Down Expand Up @@ -1462,17 +1467,11 @@ where
peer_addrs
}

fn inject_connection_established(&mut self, peer: &PeerId, _: &ConnectionId, endpoint: &ConnectedPoint) {
// The remote's address can only be put into the routing table,
// and thus shared with other nodes, if the local node is the dialer,
// since the remote address on an inbound connection is specific to
// that connection (e.g. typically the TCP port numbers).
let address = match endpoint {
ConnectedPoint::Dialer { address } => Some(address.clone()),
ConnectedPoint::Listener { .. } => None,
};

self.connection_updated(peer.clone(), address, NodeStatus::Connected);
fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {
// When a connection is established, we don't know yet whether the
// remote supports the configured protocol name. Only once a connection
// handler reports [`KademliaHandlerEvent::ProtocolConfirmed`] do we
// update the local routing table.
}

fn inject_connected(&mut self, peer: &PeerId) {
Expand Down Expand Up @@ -1606,6 +1605,19 @@ where
event: KademliaHandlerEvent<QueryId>
) {
match event {
KademliaHandlerEvent::ProtocolConfirmed { endpoint } => {
debug_assert!(self.connected_peers.contains(&source));
// The remote's address can only be put into the routing table,
// and thus shared with other nodes, if the local node is the dialer,
// since the remote address on an inbound connection may be specific
// to that connection (e.g. typically the TCP port numbers).
let address = match endpoint {
ConnectedPoint::Dialer { address } => Some(address.clone()),
ConnectedPoint::Listener { .. } => None,
};
self.connection_updated(source, address, NodeStatus::Connected);
}

KademliaHandlerEvent::FindNodeReq { key, request_id } => {
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
Expand Down Expand Up @@ -1804,7 +1816,7 @@ where

fn poll(&mut self, cx: &mut Context<'_>, parameters: &mut impl PollParameters) -> Poll<
NetworkBehaviourAction<
<KademliaHandler<QueryId> as ProtocolsHandler>::InEvent,
KademliaHandlerIn<QueryId>,
Self::OutEvent,
>,
> {
Expand Down
19 changes: 18 additions & 1 deletion protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1113,10 +1113,27 @@ fn network_behaviour_inject_address_change() {
MemoryStore::new(local_peer_id),
);

let endpoint = ConnectedPoint::Dialer { address: old_address.clone() };

// Mimick a connection being established.
kademlia.inject_connection_established(
&remote_peer_id,
&connection_id,
&ConnectedPoint::Dialer { address: old_address.clone() },
&endpoint,
);
kademlia.inject_connected(&remote_peer_id);

// At this point the remote is not yet known to support the
// configured protocol name, so the peer is not yet in the
// local routing table and hence no addresses are known.
assert!(kademlia.addresses_of_peer(&remote_peer_id).is_empty());

// Mimick the connection handler confirming the protocol for
// the test connection, so that the peer is added to the routing table.
kademlia.inject_event(
remote_peer_id.clone(),
connection_id.clone(),
KademliaHandlerEvent::ProtocolConfirmed { endpoint }
);

assert_eq!(
Expand Down
104 changes: 90 additions & 14 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,54 @@ use crate::protocol::{
use crate::record::{self, Record};
use futures::prelude::*;
use libp2p_swarm::{
NegotiatedSubstream,
IntoProtocolsHandler,
KeepAlive,
NegotiatedSubstream,
SubstreamProtocol,
ProtocolsHandler,
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr
};
use libp2p_core::{
ConnectedPoint,
PeerId,
either::EitherOutput,
upgrade::{self, InboundUpgrade, OutboundUpgrade}
};
use log::trace;
use std::{error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration};
use std::{error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration};
use wasm_timer::Instant;

/// Protocol handler that handles Kademlia communications with the remote.
/// A prototype from which [`KademliaHandler`]s can be constructed.
pub struct KademliaHandlerProto<T> {
config: KademliaHandlerConfig,
_type: PhantomData<T>,
}

impl<T> KademliaHandlerProto<T> {
pub fn new(config: KademliaHandlerConfig) -> Self {
KademliaHandlerProto { config, _type: PhantomData }
}
}

impl<T: Clone + Send + 'static> IntoProtocolsHandler for KademliaHandlerProto<T> {
type Handler = KademliaHandler<T>;

fn into_handler(self, _: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler {
KademliaHandler::new(self.config, endpoint.clone())
}

fn inbound_protocol(&self) -> <Self::Handler as ProtocolsHandler>::InboundProtocol {
if self.config.allow_listening {
upgrade::EitherUpgrade::A(self.config.protocol_config.clone())
} else {
upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade)
}
}
}

/// Protocol handler that manages substreams for the Kademlia protocol
/// on a single connection with a peer.
///
/// The handler will automatically open a Kademlia substream with the remote for each request we
/// make.
Expand All @@ -58,6 +90,27 @@ pub struct KademliaHandler<TUserData> {

/// Until when to keep the connection alive.
keep_alive: KeepAlive,

/// The connected endpoint of the connection that the handler
/// is associated with.
endpoint: ConnectedPoint,

/// The current state of protocol confirmation.
protocol_status: ProtocolStatus,
}

/// The states of protocol confirmation that a connection
/// handler transitions through.
enum ProtocolStatus {
/// It is as yet unknown whether the remote supports the
/// configured protocol name.
Unconfirmed,
/// The configured protocol name has been confirmed by the remote
/// but has not yet been reported to the `Kademlia` behaviour.
Confirmed,
/// The configured protocol has been confirmed by the remote
/// and the confirmation reported to the `Kademlia` behaviour.
Reported,
}

/// Configuration of a [`KademliaHandler`].
Expand Down Expand Up @@ -135,6 +188,15 @@ impl<TUserData> SubstreamState<TUserData> {
/// Event produced by the Kademlia handler.
#[derive(Debug)]
pub enum KademliaHandlerEvent<TUserData> {
/// The configured protocol name has been confirmed by the peer through
/// a successfully negotiated substream.
///
/// This event is only emitted once by a handler upon the first
/// successfully negotiated inbound or outbound substream and
/// indicates that the connected peer participates in the Kademlia
/// overlay network identified by the configured protocol name.
ProtocolConfirmed { endpoint: ConnectedPoint },

/// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes
/// returned is not specified, but should be around 20.
FindNodeReq {
Expand Down Expand Up @@ -379,24 +441,20 @@ struct UniqueConnecId(u64);

impl<TUserData> KademliaHandler<TUserData> {
/// Create a [`KademliaHandler`] using the given configuration.
pub fn new(config: KademliaHandlerConfig) -> Self {
pub fn new(config: KademliaHandlerConfig, endpoint: ConnectedPoint) -> Self {
let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout);

KademliaHandler {
config,
endpoint,
next_connec_unique_id: UniqueConnecId(0),
substreams: Vec::new(),
keep_alive,
protocol_status: ProtocolStatus::Unconfirmed,
}
}
}

impl<TUserData> Default for KademliaHandler<TUserData> {
fn default() -> Self {
KademliaHandler::new(Default::default())
}
}

impl<TUserData> ProtocolsHandler for KademliaHandler<TUserData>
where
TUserData: Clone + Send + 'static,
Expand All @@ -423,8 +481,13 @@ where
protocol: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
(msg, user_data): Self::OutboundOpenInfo,
) {
self.substreams
.push(SubstreamState::OutPendingSend(protocol, msg, user_data));
self.substreams.push(SubstreamState::OutPendingSend(protocol, msg, user_data));
if let ProtocolStatus::Unconfirmed = self.protocol_status {
// Upon the first successfully negotiated substream, we know that the
// remote is configured with the same protocol name and we want
// the behaviour to add this peer to the routing table, if possible.
self.protocol_status = ProtocolStatus::Confirmed;
}
}

fn inject_fully_negotiated_inbound(
Expand All @@ -442,8 +505,13 @@ where
debug_assert!(self.config.allow_listening);
let connec_unique_id = self.next_connec_unique_id;
self.next_connec_unique_id.0 += 1;
self.substreams
.push(SubstreamState::InWaitingMessage(connec_unique_id, protocol));
self.substreams.push(SubstreamState::InWaitingMessage(connec_unique_id, protocol));
if let ProtocolStatus::Unconfirmed = self.protocol_status {
// Upon the first successfully negotiated substream, we know that the
// remote is configured with the same protocol name and we want
// the behaviour to add this peer to the routing table, if possible.
self.protocol_status = ProtocolStatus::Confirmed;
}
}

fn inject_event(&mut self, message: KademliaHandlerIn<TUserData>) {
Expand Down Expand Up @@ -618,6 +686,14 @@ where
return Poll::Pending;
}

if let ProtocolStatus::Confirmed = self.protocol_status {
self.protocol_status = ProtocolStatus::Reported;
return Poll::Ready(ProtocolsHandlerEvent::Custom(
KademliaHandlerEvent::ProtocolConfirmed {
endpoint: self.endpoint.clone()
}))
}

// We remove each element from `substreams` one by one and add them back.
for n in (0..self.substreams.len()).rev() {
let mut substream = self.substreams.swap_remove(n);
Expand Down