diff --git a/Cargo.lock b/Cargo.lock index 25701fe059d..7dd74c9aab5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3431,7 +3431,7 @@ dependencies = [ [[package]] name = "libp2p" version = "0.30.1" -source = "git+https://github.com/sigp/rust-libp2p?rev=e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25#e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25" +source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" dependencies = [ "atomic", "bytes 0.5.6", @@ -3448,7 +3448,7 @@ dependencies = [ "libp2p-tcp", "libp2p-websocket", "libp2p-yamux", - "parity-multiaddr 0.9.6 (git+https://github.com/sigp/rust-libp2p?rev=e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25)", + "parity-multiaddr 0.9.6 (git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25)", "parking_lot", "pin-project 1.0.2", "smallvec", @@ -3492,7 +3492,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.25.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25#e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25" +source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" dependencies = [ "asn1_der", "bs58 0.4.0", @@ -3506,8 +3506,8 @@ dependencies = [ "libsecp256k1", "log", "multihash 0.13.2", - "multistream-select 0.8.5 (git+https://github.com/sigp/rust-libp2p?rev=e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25)", - "parity-multiaddr 0.9.6 (git+https://github.com/sigp/rust-libp2p?rev=e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25)", + "multistream-select 0.8.5 (git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25)", + "parity-multiaddr 0.9.6 (git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25)", "parking_lot", "pin-project 1.0.2", "prost", @@ -3526,7 +3526,7 @@ dependencies = [ [[package]] name = "libp2p-core-derive" version = "0.20.2" -source = "git+https://github.com/sigp/rust-libp2p?rev=e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25#e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25" +source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" dependencies = [ "quote", "syn", @@ -3535,7 +3535,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.25.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25#e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25" +source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" dependencies = [ "futures 0.3.8", "libp2p-core 0.25.0", @@ -3545,7 +3545,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" version = "0.25.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25#e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25" +source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" dependencies = [ "base64 0.13.0", "byteorder", @@ -3569,7 +3569,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.25.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25#e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25" +source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" dependencies = [ "futures 0.3.8", "libp2p-core 0.25.0", @@ -3584,7 +3584,7 @@ dependencies = [ [[package]] name = "libp2p-mplex" version = "0.25.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25#e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25" +source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" dependencies = [ "bytes 0.5.6", "futures 0.3.8", @@ -3601,7 +3601,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.27.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25#e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25" +source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" dependencies = [ "bytes 0.5.6", "curve25519-dalek", @@ -3622,7 +3622,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.25.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25#e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25" +source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" dependencies = [ "either", "futures 0.3.8", @@ -3637,7 +3637,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.25.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25#e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25" +source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" dependencies = [ "futures 0.3.8", "futures-timer", @@ -3652,7 +3652,7 @@ dependencies = [ [[package]] name = "libp2p-websocket" version = "0.26.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25#e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25" +source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" dependencies = [ "async-tls", "either", @@ -3671,7 +3671,7 @@ dependencies = [ [[package]] name = "libp2p-yamux" version = "0.28.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25#e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25" +source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" dependencies = [ "futures 0.3.8", "libp2p-core 0.25.0", @@ -4136,7 +4136,7 @@ dependencies = [ [[package]] name = "multistream-select" version = "0.8.5" -source = "git+https://github.com/sigp/rust-libp2p?rev=e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25#e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25" +source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" dependencies = [ "bytes 0.5.6", "futures 0.3.8", @@ -4481,7 +4481,7 @@ dependencies = [ [[package]] name = "parity-multiaddr" version = "0.9.6" -source = "git+https://github.com/sigp/rust-libp2p?rev=e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25#e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25" +source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" dependencies = [ "arrayref", "bs58 0.4.0", diff --git a/beacon_node/eth2_libp2p/Cargo.toml b/beacon_node/eth2_libp2p/Cargo.toml index a0807b257fb..81ca15040f5 100644 --- a/beacon_node/eth2_libp2p/Cargo.toml +++ b/beacon_node/eth2_libp2p/Cargo.toml @@ -42,7 +42,7 @@ regex = "1.3.9" [dependencies.libp2p] #version = "0.23.0" git = "https://github.com/sigp/rust-libp2p" -rev = "e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25" +rev = "830e6fabb7ee51281a98f5e092f056668adbef25" default-features = false features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns", "tcp-tokio"] diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 5f24734c3df..b29aa6755a2 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -5,7 +5,10 @@ use crate::peer_manager::{ }; use crate::rpc::*; use crate::service::METADATA_FILENAME; -use crate::types::{GossipEncoding, GossipKind, GossipTopic, MessageData, SubnetDiscovery}; +use crate::types::{ + subnet_id_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, MessageData, + SubnetDiscovery, +}; use crate::Eth2Enr; use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; use futures::prelude::*; @@ -100,8 +103,6 @@ pub enum BehaviourEvent { /// The message itself. message: PubsubMessage, }, - /// Subscribed to peer for given topic - PeerSubscribed(PeerId, TopicHash), /// Inform the network to send a Status to this peer. StatusPeer(PeerId), } @@ -665,9 +666,15 @@ impl Behaviour { } } GossipsubEvent::Subscribed { peer_id, topic } => { - self.add_event(BehaviourEvent::PeerSubscribed(peer_id, topic)); + if let Some(subnet_id) = subnet_id_from_topic_hash(&topic) { + self.peer_manager.add_subscription(&peer_id, subnet_id); + } + } + GossipsubEvent::Unsubscribed { peer_id, topic } => { + if let Some(subnet_id) = subnet_id_from_topic_hash(&topic) { + self.peer_manager.remove_subscription(&peer_id, subnet_id); + } } - GossipsubEvent::Unsubscribed { .. } => {} } } @@ -1102,6 +1109,9 @@ impl NetworkBehaviour for Behaviour { fn inject_disconnected(&mut self, peer_id: &PeerId) { // If the application/behaviour layers thinks this peer has connected inform it of the disconnect. + // Remove all subnet subscriptions from peerdb for the disconnected peer. + self.peer_manager().remove_all_subscriptions(&peer_id); + if self .network_globals .peers diff --git a/beacon_node/eth2_libp2p/src/discovery/mod.rs b/beacon_node/eth2_libp2p/src/discovery/mod.rs index 60ef003ffd9..c22c7d6cdc7 100644 --- a/beacon_node/eth2_libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2_libp2p/src/discovery/mod.rs @@ -767,6 +767,13 @@ impl Discovery { match query_result.1 { Ok(r) if r.is_empty() => { debug!(self.log, "Grouped subnet discovery query yielded no results."; "subnets_searched_for" => ?subnets_searched_for); + queries.iter().for_each(|query| { + self.add_subnet_query( + query.subnet_id, + query.min_ttl, + query.retries + 1, + ); + }) } Ok(r) => { debug!(self.log, "Peer grouped subnet discovery request completed"; "peers_found" => r.len(), "subnets_searched_for" => ?subnets_searched_for); diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index ae318e9154c..e6dd60f4e8c 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -285,6 +285,27 @@ impl PeerManager { self.status_peers.insert(peer_id.clone()); } + /// Adds a gossipsub subscription to a peer in the peerdb. + pub fn add_subscription(&self, peer_id: &PeerId, subnet_id: SubnetId) { + if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { + info.subnets.insert(subnet_id); + } + } + + /// Removes a gossipsub subscription to a peer in the peerdb. + pub fn remove_subscription(&self, peer_id: &PeerId, subnet_id: SubnetId) { + if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { + info.subnets.remove(&subnet_id); + } + } + + /// Removes all gossipsub subscriptions to a peer in the peerdb. + pub fn remove_all_subscriptions(&self, peer_id: &PeerId) { + if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { + info.subnets = Default::default(); + } + } + /* Notifications from the Swarm */ /// Updates the state of the peer as disconnected. @@ -535,6 +556,9 @@ impl PeerManager { } else { debug!(self.log, "Received old metadata"; "peer_id" => %peer_id, "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); + // Updating metadata even in this case to prevent storing + // incorrect `metadata.attnets` for a peer + peer_info.meta_data = Some(meta_data); } } else { // we have no meta-data for this peer, update diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs index a5af1fba6cd..92b39966c1d 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs @@ -38,6 +38,8 @@ pub struct PeerInfo { /// The ENR subnet bitfield of the peer. This may be determined after it's initial /// connection. pub meta_data: Option>, + /// Subnets the peer is connected to. + pub subnets: HashSet, /// The time we would like to retain this peer. After this time, the peer is no longer /// necessary. #[serde(skip)] @@ -60,6 +62,7 @@ impl Default for PeerInfo { connection_status: Default::default(), listening_addresses: Vec::new(), seen_addresses: HashSet::new(), + subnets: HashSet::new(), sync_status: PeerSyncStatus::Unknown, meta_data: None, min_ttl: None, @@ -80,14 +83,19 @@ impl PeerInfo { } } - /// Returns if the peer is subscribed to a given `SubnetId` - pub fn on_subnet(&self, subnet_id: SubnetId) -> bool { + /// Returns if the peer is subscribed to a given `SubnetId` from the metadata attnets field. + pub fn on_subnet_metadata(&self, subnet_id: SubnetId) -> bool { if let Some(meta_data) = &self.meta_data { return meta_data.attnets.get(*subnet_id as usize).unwrap_or(false); } false } + /// Returns if the peer is subscribed to a given `SubnetId` from the gossipsub subscriptions. + pub fn on_subnet_gossipsub(&self, subnet_id: SubnetId) -> bool { + self.subnets.contains(&subnet_id) + } + /// Returns the seen IP addresses of the peer. pub fn seen_addresses<'a>(&'a self) -> impl Iterator + 'a { self.seen_addresses diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index 5382b178c53..f5e93b5b29d 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -246,7 +246,11 @@ impl PeerDB { self.peers .iter() .filter(move |(_, info)| { - info.is_connected() && info.on_subnet(subnet_id) && info.is_good_gossipsub_peer() + // We check both the metadata and gossipsub data as we only want to count long-lived subscribed peers + info.is_connected() + && info.on_subnet_metadata(subnet_id) + && info.on_subnet_gossipsub(subnet_id) + && info.is_good_gossipsub_peer() }) .map(|(peer_id, _)| peer_id) } @@ -357,7 +361,7 @@ impl PeerDB { let log = &self.log; self.peers.iter_mut() .filter(move |(_, info)| { - info.is_connected() && info.on_subnet(subnet_id) + info.is_connected() && info.on_subnet_metadata(subnet_id) && info.on_subnet_gossipsub(subnet_id) }) .for_each(|(peer_id,info)| { if info.min_ttl.is_none() || Some(min_ttl) > info.min_ttl { diff --git a/beacon_node/eth2_libp2p/src/types/mod.rs b/beacon_node/eth2_libp2p/src/types/mod.rs index 6d51a1abab4..097c6c87c73 100644 --- a/beacon_node/eth2_libp2p/src/types/mod.rs +++ b/beacon_node/eth2_libp2p/src/types/mod.rs @@ -16,4 +16,4 @@ pub use globals::NetworkGlobals; pub use pubsub::{MessageData, PubsubMessage}; pub use subnet::SubnetDiscovery; pub use sync_state::SyncState; -pub use topics::{GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS}; +pub use topics::{subnet_id_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS}; diff --git a/beacon_node/eth2_libp2p/src/types/topics.rs b/beacon_node/eth2_libp2p/src/types/topics.rs index dd1a4bee4ee..13a0dd62f97 100644 --- a/beacon_node/eth2_libp2p/src/types/topics.rs +++ b/beacon_node/eth2_libp2p/src/types/topics.rs @@ -1,4 +1,4 @@ -use libp2p::gossipsub::IdentTopic as Topic; +use libp2p::gossipsub::{IdentTopic as Topic, TopicHash}; use serde_derive::{Deserialize, Serialize}; use types::SubnetId; @@ -193,6 +193,15 @@ impl From for GossipKind { // helper functions +/// Get subnet id from an attestation subnet topic hash. +pub fn subnet_id_from_topic_hash(topic_hash: &TopicHash) -> Option { + let gossip_topic = GossipTopic::decode(topic_hash.as_str()).ok()?; + if let GossipKind::Attestation(subnet_id) = gossip_topic.kind() { + return Some(*subnet_id); + } + None +} + // Determines if a string is a committee topic. fn committee_topic_index(topic: &str) -> Option { if topic.starts_with(BEACON_ATTESTATION_PREFIX) { @@ -289,4 +298,16 @@ mod tests { // Empty parts assert!(GossipTopic::decode("////").is_err()); } + + #[test] + fn test_subnet_id_from_topic_hash() { + let topic_hash = TopicHash::from_raw("/eth2/e1925f3b/beacon_block/ssz_snappy"); + assert!(subnet_id_from_topic_hash(&topic_hash).is_none()); + + let topic_hash = TopicHash::from_raw("/eth2/e1925f3b/beacon_attestation_42/ssz_snappy"); + assert_eq!( + subnet_id_from_topic_hash(&topic_hash), + Some(SubnetId::new(42)) + ); + } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 3f725a2295e..0b766fb82bb 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -465,13 +465,13 @@ fn spawn_service( // We currently do not perform any action here. }, BehaviourEvent::PeerDisconnected(peer_id) => { - let _ = service - .router_send - .send(RouterMessage::PeerDisconnected(peer_id)) - .map_err(|_| { - debug!(service.log, "Failed to send peer disconnect to router"); - }); - }, + let _ = service + .router_send + .send(RouterMessage::PeerDisconnected(peer_id)) + .map_err(|_| { + debug!(service.log, "Failed to send peer disconnect to router"); + }); + }, BehaviourEvent::RequestReceived{peer_id, id, request} => { let _ = service .router_send @@ -543,7 +543,6 @@ fn spawn_service( } } } - BehaviourEvent::PeerSubscribed(_, _) => {}, } Libp2pEvent::NewListenAddr(multiaddr) => { service.network_globals.listen_multiaddrs.write().push(multiaddr);