From 44b27925849c084b41118edc7d432c7f7e4ce779 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 23 Jul 2024 22:59:54 +0100 Subject: [PATCH 1/4] move main Behaviour to mod.rs for better readibility and remove connection limits checks after connection has been established, as those checks have already been done by connection limits Behaviour. --- .../src/peer_manager/mod.rs | 12 ------ .../src/peer_manager/network_behaviour.rs | 22 +--------- .../src/service/behaviour.rs | 39 ----------------- .../lighthouse_network/src/service/mod.rs | 42 ++++++++++++++++--- 4 files changed, 38 insertions(+), 77 deletions(-) delete mode 100644 beacon_node/lighthouse_network/src/service/behaviour.rs diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 0d9a7c60dd2..a6a53aca58b 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -438,18 +438,6 @@ impl PeerManager { self.network_globals.peers.read().is_connected(peer_id) } - /// Reports whether the peer limit is reached in which case we stop allowing new incoming - /// connections. - pub fn peer_limit_reached(&self, count_dialing: bool) -> bool { - if count_dialing { - // This is an incoming connection so limit by the standard max peers - self.network_globals.connected_or_dialing_peers() >= self.max_peers() - } else { - // We dialed this peer, allow up to max_outbound_dialing_peers - self.network_globals.connected_peers() >= self.max_outbound_dialing_peers() - } - } - /// Updates `PeerInfo` with `identify` information. pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index 3858a2a5392..568a104caa7 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -129,14 +129,7 @@ impl NetworkBehaviour for PeerManager { endpoint, other_established, .. - }) => { - // NOTE: We still need to handle the [`ConnectionEstablished`] because the - // [`NetworkBehaviour::handle_established_inbound_connection`] and - // [`NetworkBehaviour::handle_established_outbound_connection`] are fallible. This - // means another behaviour can kill the connection early, and we can't assume a - // peer as connected until this event is received. - self.on_connection_established(peer_id, endpoint, other_established) - } + }) => self.on_connection_established(peer_id, endpoint, other_established), FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, endpoint, @@ -251,19 +244,6 @@ impl PeerManager { // Count dialing peers in the limit if the peer dialed us. let count_dialing = endpoint.is_listener(); - // Check the connection limits - if self.peer_limit_reached(count_dialing) - && self - .network_globals - .peers - .read() - .peer_info(&peer_id) - .map_or(true, |peer| !peer.has_future_duty()) - { - // Gracefully disconnect the peer. - self.disconnect_peer(peer_id, GoodbyeReason::TooManyPeers); - return; - } // NOTE: We don't register peers that we are disconnecting immediately. The network service // does not need to know about these peers. diff --git a/beacon_node/lighthouse_network/src/service/behaviour.rs b/beacon_node/lighthouse_network/src/service/behaviour.rs deleted file mode 100644 index ab2e43630bb..00000000000 --- a/beacon_node/lighthouse_network/src/service/behaviour.rs +++ /dev/null @@ -1,39 +0,0 @@ -use crate::discovery::Discovery; -use crate::peer_manager::PeerManager; -use crate::rpc::RPC; -use crate::types::SnappyTransform; - -use libp2p::identify; -use libp2p::swarm::behaviour::toggle::Toggle; -use libp2p::swarm::NetworkBehaviour; -use libp2p::upnp::tokio::Behaviour as Upnp; -use types::EthSpec; - -use super::api_types::RequestId; - -pub type SubscriptionFilter = - gossipsub::MaxCountSubscriptionFilter; -pub type Gossipsub = gossipsub::Behaviour; - -#[derive(NetworkBehaviour)] -pub(crate) struct Behaviour -where - E: EthSpec, -{ - /// Keep track of active and pending connections to enforce hard limits. - pub connection_limits: libp2p::connection_limits::Behaviour, - /// The peer manager that keeps track of peer's reputation and status. - pub peer_manager: PeerManager, - /// The Eth2 RPC specified in the wire-0 protocol. - pub eth2_rpc: RPC, - /// Discv5 Discovery protocol. - pub discovery: Discovery, - /// Keep regular connection to peers and disconnect if absent. - // NOTE: The id protocol is used for initial interop. This will be removed by mainnet. - /// Provides IP addresses and peer information. - pub identify: identify::Behaviour, - /// Libp2p UPnP port mapping. - pub upnp: Toggle, - /// The routing pub-sub mechanism for eth2. - pub gossipsub: Gossipsub, -} diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 3d502e6fdce..c3770818559 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1,4 +1,3 @@ -use self::behaviour::Behaviour; use self::gossip_cache::GossipCache; use crate::config::{gossipsub_config, GossipsubConfigParams, NetworkLoad}; use crate::discovery::{ @@ -11,8 +10,6 @@ use crate::peer_manager::{ use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS}; use crate::rpc::methods::MetadataRequest; use crate::rpc::*; -use crate::service::behaviour::BehaviourEvent; -pub use crate::service::behaviour::Gossipsub; use crate::types::{ attestation_sync_committee_topics, fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS, @@ -30,7 +27,8 @@ use gossipsub::{ use gossipsub_scoring_parameters::{lighthouse_gossip_thresholds, PeerScoreSettings}; use libp2p::multiaddr::{self, Multiaddr, Protocol as MProtocol}; use libp2p::swarm::behaviour::toggle::Toggle; -use libp2p::swarm::{Swarm, SwarmEvent}; +use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p::upnp::tokio::Behaviour as Upnp; use libp2p::{identify, PeerId, SwarmBuilder}; use slog::{crit, debug, info, o, trace, warn}; use std::path::PathBuf; @@ -46,7 +44,6 @@ use types::{ use utils::{build_transport, strip_peer_id, Context as ServiceContext, MAX_CONNECTIONS_PER_PEER}; pub mod api_types; -mod behaviour; mod gossip_cache; pub mod gossipsub_scoring_parameters; pub mod utils; @@ -105,6 +102,41 @@ pub enum NetworkEvent { ZeroListeners, } +pub type Gossipsub = gossipsub::Behaviour; +pub type SubscriptionFilter = + gossipsub::MaxCountSubscriptionFilter; + +#[derive(NetworkBehaviour)] +pub(crate) struct Behaviour +where + E: EthSpec, +{ + // NOTE: The order of the following list of behaviours has meaning, + // `NetworkBehaviour::handle_{pending, established}_{inbound, outbound}` methods + // are called sequentially for each behaviour and they are fallible, + // therefore we want `connection_limits` and `peer_manager` running first, + // which are the behaviours that may reject a connection, so that + // when the subsequent behaviours are called they are certain the connection won't be rejected. + + // + /// Keep track of active and pending connections to enforce hard limits. + pub connection_limits: libp2p::connection_limits::Behaviour, + /// The peer manager that keeps track of peer's reputation and status. + pub peer_manager: PeerManager, + /// The Eth2 RPC specified in the wire-0 protocol. + pub eth2_rpc: RPC, + /// Discv5 Discovery protocol. + pub discovery: Discovery, + /// Keep regular connection to peers and disconnect if absent. + // NOTE: The id protocol is used for initial interop. This will be removed by mainnet. + /// Provides IP addresses and peer information. + pub identify: identify::Behaviour, + /// Libp2p UPnP port mapping. + pub upnp: Toggle, + /// The routing pub-sub mechanism for eth2. + pub gossipsub: Gossipsub, +} + /// Builds the network behaviour that manages the core protocols of eth2. /// This core behaviour is managed by `Behaviour` which adds peer management to all core /// behaviours. From 6d7bea479a7327d7b6950af4841bdf9de8bc17f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 23 Jul 2024 23:03:41 +0100 Subject: [PATCH 2/4] improve logging wording wrt dial logic when we call dial_peer we are not yet dialing but just adding the peer to the dial queue --- .../src/peer_manager/mod.rs | 7 ++--- .../src/peer_manager/network_behaviour.rs | 28 ++++++++----------- .../lighthouse_network/src/service/mod.rs | 2 +- 3 files changed, 15 insertions(+), 22 deletions(-) diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index a6a53aca58b..354863165f9 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -320,7 +320,6 @@ impl PeerManager { /// returned here. /// /// This function decides whether or not to dial these peers. - #[allow(clippy::mutable_key_type)] pub fn peers_discovered(&mut self, results: HashMap>) { let mut to_dial_peers = 0; let connected_or_dialing = self.network_globals.connected_or_dialing_peers(); @@ -340,15 +339,15 @@ impl PeerManager { { // This should be updated with the peer dialing. In fact created once the peer is // dialed + let peer_id = enr.peer_id(); if let Some(min_ttl) = min_ttl { self.network_globals .peers .write() - .update_min_ttl(&enr.peer_id(), min_ttl); + .update_min_ttl(&peer_id, min_ttl); } - let peer_id = enr.peer_id(); if self.dial_peer(enr) { - debug!(self.log, "Dialing discovered peer"; "peer_id" => %peer_id); + debug!(self.log, "Added discovered ENR peer to dial queue"; "peer_id" => %peer_id); to_dial_peers += 1; } } diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index 568a104caa7..d663e48bced 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -93,26 +93,20 @@ impl NetworkBehaviour for PeerManager { } if let Some(enr) = self.peers_to_dial.pop() { - let peer_id = enr.peer_id(); - self.inject_peer_connection(&peer_id, ConnectingType::Dialing, Some(enr.clone())); - - let quic_multiaddrs = if self.quic_enabled { - let quic_multiaddrs = enr.multiaddr_quic(); - if !quic_multiaddrs.is_empty() { - debug!(self.log, "Dialing QUIC supported peer"; "peer_id"=> %peer_id, "quic_multiaddrs" => ?quic_multiaddrs); - } - quic_multiaddrs - } else { - Vec::new() - }; + self.inject_peer_connection(&enr.peer_id(), ConnectingType::Dialing, Some(enr.clone())); // Prioritize Quic connections over Tcp ones. - let multiaddrs = quic_multiaddrs - .into_iter() - .chain(enr.multiaddr_tcp()) - .collect(); + let multiaddrs = [ + self.quic_enabled + .then_some(enr.multiaddr_quic()) + .unwrap_or_default(), + enr.multiaddr_tcp(), + ] + .concat(); + + debug!(self.log, "Dialing peer"; "peer_id"=> %enr.peer_id(), "multiaddrs" => ?multiaddrs); return Poll::Ready(ToSwarm::Dial { - opts: DialOpts::peer_id(peer_id) + opts: DialOpts::peer_id(enr.peer_id()) .condition(PeerCondition::Disconnected) .addresses(multiaddrs) .build(), diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index c3770818559..c45ec359496 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1268,7 +1268,7 @@ impl Network { self.discovery_mut().remove_cached_enr(&enr.peer_id()); let peer_id = enr.peer_id(); if self.peer_manager_mut().dial_peer(enr) { - debug!(self.log, "Dialing cached ENR peer"; "peer_id" => %peer_id); + debug!(self.log, "Added cached ENR peer to dial queue"; "peer_id" => %peer_id); } } } From 1df81da9c13ad45eb55e62efef09a358538d05e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 23 Jul 2024 23:10:07 +0100 Subject: [PATCH 3/4] do not use a constant for MAX_CONNECTIONS_PER_PEER we only use it at one place, and the function call is explicit. --- beacon_node/lighthouse_network/src/service/mod.rs | 4 ++-- beacon_node/lighthouse_network/src/service/utils.rs | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index c45ec359496..3cc77c025ea 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -41,7 +41,7 @@ use types::ForkName; use types::{ consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId, }; -use utils::{build_transport, strip_peer_id, Context as ServiceContext, MAX_CONNECTIONS_PER_PEER}; +use utils::{build_transport, strip_peer_id, Context as ServiceContext}; pub mod api_types; mod gossip_cache; @@ -409,7 +409,7 @@ impl Network { (config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR + PRIORITY_PEER_EXCESS)) .ceil() as u32, )) - .with_max_established_per_peer(Some(MAX_CONNECTIONS_PER_PEER)); + .with_max_established_per_peer(Some(1)); libp2p::connection_limits::Behaviour::new(limits) }; diff --git a/beacon_node/lighthouse_network/src/service/utils.rs b/beacon_node/lighthouse_network/src/service/utils.rs index 80187efc103..413aab3299c 100644 --- a/beacon_node/lighthouse_network/src/service/utils.rs +++ b/beacon_node/lighthouse_network/src/service/utils.rs @@ -22,8 +22,6 @@ use std::time::Duration; use types::{ChainSpec, EnrForkId, EthSpec, ForkContext, SubnetId, SyncSubnetId}; pub const NETWORK_KEY_FILENAME: &str = "key"; -/// The maximum simultaneous libp2p connections per peer. -pub const MAX_CONNECTIONS_PER_PEER: u32 = 1; /// The filename to store our local metadata. pub const METADATA_FILENAME: &str = "metadata"; From bda9447cda67c70ef1363960d7bbbff7f00855ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Thu, 25 Jul 2024 23:32:53 +0100 Subject: [PATCH 4/4] address review and re-instate connection limits checks, but do it before the connection has been established. --- .../src/peer_manager/network_behaviour.rs | 44 ++++++++++++++----- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index d663e48bced..6e999d07c41 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -14,7 +14,6 @@ use slog::{debug, error, trace}; use types::EthSpec; use crate::discovery::enr_ext::EnrExt; -use crate::rpc::GoodbyeReason; use crate::types::SyncState; use crate::{metrics, ClearDialError}; @@ -192,6 +191,21 @@ impl NetworkBehaviour for PeerManager { "Connection to peer rejected: peer has a bad score", )); } + + // Check the connection limits + if self.network_globals.connected_or_dialing_peers() >= self.max_peers() + && self + .network_globals + .peers + .read() + .peer_info(&peer_id) + .map_or(true, |peer| !peer.has_future_duty()) + { + return Err(ConnectionDenied::new( + "Connection to peer rejected: too many connections", + )); + } + Ok(ConnectionHandler) } @@ -203,13 +217,26 @@ impl NetworkBehaviour for PeerManager { _role_override: libp2p::core::Endpoint, ) -> Result, libp2p::swarm::ConnectionDenied> { trace!(self.log, "Outbound connection"; "peer_id" => %peer_id, "multiaddr" => %addr); - match self.ban_status(&peer_id) { - Some(cause) => { - error!(self.log, "Connected a banned peer. Rejecting connection"; "peer_id" => %peer_id); - Err(ConnectionDenied::new(cause)) - } - None => Ok(ConnectionHandler), + if let Some(cause) = self.ban_status(&peer_id) { + error!(self.log, "Connected a banned peer. Rejecting connection"; "peer_id" => %peer_id); + return Err(ConnectionDenied::new(cause)); } + + // Check the connection limits + if self.network_globals.connected_peers() >= self.max_outbound_dialing_peers() + && self + .network_globals + .peers + .read() + .peer_info(&peer_id) + .map_or(true, |peer| !peer.has_future_duty()) + { + return Err(ConnectionDenied::new( + "Connection to peer rejected: too many connections", + )); + } + + Ok(ConnectionHandler) } } @@ -236,9 +263,6 @@ impl PeerManager { self.update_peer_count_metrics(); } - // Count dialing peers in the limit if the peer dialed us. - let count_dialing = endpoint.is_listener(); - // NOTE: We don't register peers that we are disconnecting immediately. The network service // does not need to know about these peers. match endpoint {