Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove duplicated connection limits checks #6156

Merged
merged 6 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 3 additions & 15 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,15 +338,15 @@ impl<E: EthSpec> PeerManager<E> {
{
// This should be updated with the peer dialing. In fact created once the peer is
// dialed
let peer_id = enr.peer_id();
if let Some(min_ttl) = min_ttl {
self.network_globals
.peers
.write()
.update_min_ttl(&enr.peer_id(), min_ttl);
.update_min_ttl(&peer_id, min_ttl);
}
let peer_id = enr.peer_id();
if self.dial_peer(enr) {
debug!(self.log, "Dialing discovered peer"; "peer_id" => %peer_id);
debug!(self.log, "Added discovered ENR peer to dial queue"; "peer_id" => %peer_id);
jxs marked this conversation as resolved.
Show resolved Hide resolved
to_dial_peers += 1;
}
}
Expand Down Expand Up @@ -447,18 +447,6 @@ impl<E: EthSpec> PeerManager<E> {
self.network_globals.peers.read().is_connected(peer_id)
}

/// Reports whether the peer limit is reached in which case we stop allowing new incoming
/// connections.
pub fn peer_limit_reached(&self, count_dialing: bool) -> bool {
if count_dialing {
// This is an incoming connection so limit by the standard max peers
self.network_globals.connected_or_dialing_peers() >= self.max_peers()
} else {
// We dialed this peer, allow up to max_outbound_dialing_peers
self.network_globals.connected_peers() >= self.max_outbound_dialing_peers()
}
}

/// Updates `PeerInfo` with `identify` information.
pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
Expand Down
100 changes: 47 additions & 53 deletions beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use slog::{debug, error, trace};
use types::EthSpec;

use crate::discovery::enr_ext::EnrExt;
use crate::rpc::GoodbyeReason;
use crate::types::SyncState;
use crate::{metrics, ClearDialError};

Expand Down Expand Up @@ -94,26 +93,20 @@ impl<E: EthSpec> NetworkBehaviour for PeerManager<E> {
}

if let Some(enr) = self.peers_to_dial.pop() {
let peer_id = enr.peer_id();
self.inject_peer_connection(&peer_id, ConnectingType::Dialing, Some(enr.clone()));

let quic_multiaddrs = if self.quic_enabled {
let quic_multiaddrs = enr.multiaddr_quic();
if !quic_multiaddrs.is_empty() {
debug!(self.log, "Dialing QUIC supported peer"; "peer_id"=> %peer_id, "quic_multiaddrs" => ?quic_multiaddrs);
}
quic_multiaddrs
} else {
Vec::new()
};
self.inject_peer_connection(&enr.peer_id(), ConnectingType::Dialing, Some(enr.clone()));

// Prioritize Quic connections over Tcp ones.
let multiaddrs = quic_multiaddrs
.into_iter()
.chain(enr.multiaddr_tcp())
.collect();
let multiaddrs = [
self.quic_enabled
.then_some(enr.multiaddr_quic())
.unwrap_or_default(),
enr.multiaddr_tcp(),
]
.concat();

debug!(self.log, "Dialing peer"; "peer_id"=> %enr.peer_id(), "multiaddrs" => ?multiaddrs);
return Poll::Ready(ToSwarm::Dial {
opts: DialOpts::peer_id(peer_id)
opts: DialOpts::peer_id(enr.peer_id())
.condition(PeerCondition::Disconnected)
.addresses(multiaddrs)
.build(),
Expand All @@ -130,14 +123,7 @@ impl<E: EthSpec> NetworkBehaviour for PeerManager<E> {
endpoint,
other_established,
..
}) => {
// NOTE: We still need to handle the [`ConnectionEstablished`] because the
// [`NetworkBehaviour::handle_established_inbound_connection`] and
// [`NetworkBehaviour::handle_established_outbound_connection`] are fallible. This
// means another behaviour can kill the connection early, and we can't assume a
// peer as connected until this event is received.
self.on_connection_established(peer_id, endpoint, other_established)
}
}) => self.on_connection_established(peer_id, endpoint, other_established),
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
endpoint,
Expand Down Expand Up @@ -206,6 +192,21 @@ impl<E: EthSpec> NetworkBehaviour for PeerManager<E> {
"Connection to peer rejected: peer has a bad score",
));
}

// Check the connection limits
if self.network_globals.connected_or_dialing_peers() >= self.max_peers()
&& self
.network_globals
.peers
.read()
.peer_info(&peer_id)
.map_or(true, |peer| !peer.has_future_duty())
{
return Err(ConnectionDenied::new(
"Connection to peer rejected: too many connections",
));
}

Ok(ConnectionHandler)
}

Expand All @@ -218,13 +219,26 @@ impl<E: EthSpec> NetworkBehaviour for PeerManager<E> {
_port_use: PortUse,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
trace!(self.log, "Outbound connection"; "peer_id" => %peer_id, "multiaddr" => %addr);
match self.ban_status(&peer_id) {
Some(cause) => {
error!(self.log, "Connected a banned peer. Rejecting connection"; "peer_id" => %peer_id);
Err(ConnectionDenied::new(cause))
}
None => Ok(ConnectionHandler),
if let Some(cause) = self.ban_status(&peer_id) {
error!(self.log, "Connected a banned peer. Rejecting connection"; "peer_id" => %peer_id);
return Err(ConnectionDenied::new(cause));
}

// Check the connection limits
if self.network_globals.connected_peers() >= self.max_outbound_dialing_peers()
&& self
.network_globals
.peers
.read()
.peer_info(&peer_id)
.map_or(true, |peer| !peer.has_future_duty())
{
return Err(ConnectionDenied::new(
"Connection to peer rejected: too many connections",
));
}

Ok(ConnectionHandler)
}
}

Expand All @@ -233,7 +247,7 @@ impl<E: EthSpec> PeerManager<E> {
&mut self,
peer_id: PeerId,
endpoint: &ConnectedPoint,
other_established: usize,
_other_established: usize,
) {
debug!(self.log, "Connection established"; "peer_id" => %peer_id,
"multiaddr" => %endpoint.get_remote_address(),
Expand All @@ -247,26 +261,6 @@ impl<E: EthSpec> PeerManager<E> {
self.update_peer_count_metrics();
}

// Count dialing peers in the limit if the peer dialed us.
let count_dialing = endpoint.is_listener();
// Check the connection limits
if self.peer_limit_reached(count_dialing)
jxs marked this conversation as resolved.
Show resolved Hide resolved
&& self
.network_globals
.peers
.read()
.peer_info(&peer_id)
.map_or(true, |peer| !peer.has_future_duty())
{
// Gracefully disconnect the peer.
self.disconnect_peer(peer_id, GoodbyeReason::TooManyPeers);
return;
}

if other_established == 0 {
self.events.push(PeerManagerEvent::MetaData(peer_id));
}

// NOTE: We don't register peers that we are disconnecting immediately. The network service
// does not need to know about these peers.
match endpoint {
Expand Down
39 changes: 0 additions & 39 deletions beacon_node/lighthouse_network/src/service/behaviour.rs

This file was deleted.

48 changes: 40 additions & 8 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use self::behaviour::Behaviour;
use self::gossip_cache::GossipCache;
use crate::config::{gossipsub_config, GossipsubConfigParams, NetworkLoad};
use crate::discovery::{
Expand All @@ -14,8 +13,6 @@ use crate::rpc::{
self, GoodbyeReason, HandlerErr, NetworkParams, Protocol, RPCError, RPCMessage, RPCReceived,
RequestType, ResponseTermination, RpcErrorResponse, RpcResponse, RpcSuccessResponse, RPC,
};
use crate::service::behaviour::BehaviourEvent;
pub use crate::service::behaviour::Gossipsub;
use crate::types::{
attestation_sync_committee_topics, fork_core_topics, subnet_from_topic_hash, GossipEncoding,
GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS,
Expand All @@ -33,7 +30,8 @@ use gossipsub::{
use gossipsub_scoring_parameters::{lighthouse_gossip_thresholds, PeerScoreSettings};
use libp2p::multiaddr::{self, Multiaddr, Protocol as MProtocol};
use libp2p::swarm::behaviour::toggle::Toggle;
use libp2p::swarm::{Swarm, SwarmEvent};
use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent};
use libp2p::upnp::tokio::Behaviour as Upnp;
use libp2p::{identify, PeerId, SwarmBuilder};
use slog::{crit, debug, info, o, trace, warn};
use std::num::{NonZeroU8, NonZeroUsize};
Expand All @@ -47,10 +45,9 @@ use types::{
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId,
};
use types::{ChainSpec, ForkName};
use utils::{build_transport, strip_peer_id, Context as ServiceContext, MAX_CONNECTIONS_PER_PEER};
use utils::{build_transport, strip_peer_id, Context as ServiceContext};

pub mod api_types;
mod behaviour;
mod gossip_cache;
pub mod gossipsub_scoring_parameters;
pub mod utils;
Expand Down Expand Up @@ -109,6 +106,41 @@ pub enum NetworkEvent<E: EthSpec> {
ZeroListeners,
}

pub type Gossipsub = gossipsub::Behaviour<SnappyTransform, SubscriptionFilter>;
pub type SubscriptionFilter =
gossipsub::MaxCountSubscriptionFilter<gossipsub::WhitelistSubscriptionFilter>;

#[derive(NetworkBehaviour)]
pub(crate) struct Behaviour<E>
where
E: EthSpec,
{
// NOTE: The order of the following list of behaviours has meaning,
// `NetworkBehaviour::handle_{pending, established}_{inbound, outbound}` methods
// are called sequentially for each behaviour and they are fallible,
// therefore we want `connection_limits` and `peer_manager` running first,
// which are the behaviours that may reject a connection, so that
// when the subsequent behaviours are called they are certain the connection won't be rejected.

//
/// Keep track of active and pending connections to enforce hard limits.
pub connection_limits: libp2p::connection_limits::Behaviour,
/// The peer manager that keeps track of peer's reputation and status.
pub peer_manager: PeerManager<E>,
/// The Eth2 RPC specified in the wire-0 protocol.
pub eth2_rpc: RPC<RequestId, E>,
/// Discv5 Discovery protocol.
pub discovery: Discovery<E>,
/// Keep regular connection to peers and disconnect if absent.
// NOTE: The id protocol is used for initial interop. This will be removed by mainnet.
/// Provides IP addresses and peer information.
pub identify: identify::Behaviour,
/// Libp2p UPnP port mapping.
pub upnp: Toggle<Upnp>,
/// The routing pub-sub mechanism for eth2.
pub gossipsub: Gossipsub,
}

/// Builds the network behaviour that manages the core protocols of eth2.
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
/// behaviours.
Expand Down Expand Up @@ -396,7 +428,7 @@ impl<E: EthSpec> Network<E> {
(config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR + PRIORITY_PEER_EXCESS))
.ceil() as u32,
))
.with_max_established_per_peer(Some(MAX_CONNECTIONS_PER_PEER));
.with_max_established_per_peer(Some(1));

libp2p::connection_limits::Behaviour::new(limits)
};
Expand Down Expand Up @@ -1198,7 +1230,7 @@ impl<E: EthSpec> Network<E> {
self.discovery_mut().remove_cached_enr(&enr.peer_id());
let peer_id = enr.peer_id();
if self.peer_manager_mut().dial_peer(enr) {
debug!(self.log, "Dialing cached ENR peer"; "peer_id" => %peer_id);
debug!(self.log, "Added cached ENR peer to dial queue"; "peer_id" => %peer_id);
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions beacon_node/lighthouse_network/src/service/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ use types::{
};

pub const NETWORK_KEY_FILENAME: &str = "key";
/// The maximum simultaneous libp2p connections per peer.
pub const MAX_CONNECTIONS_PER_PEER: u32 = 1;
/// The filename to store our local metadata.
pub const METADATA_FILENAME: &str = "metadata";

Expand Down
Loading