Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Subnet discovery fixes #2095

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion beacon_node/eth2_libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ regex = "1.3.9"
[dependencies.libp2p]
#version = "0.23.0"
git = "https://github.com/sigp/rust-libp2p"
rev = "e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25"
rev = "830e6fabb7ee51281a98f5e092f056668adbef25"
default-features = false
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns", "tcp-tokio"]

Expand Down
20 changes: 15 additions & 5 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use crate::peer_manager::{
};
use crate::rpc::*;
use crate::service::METADATA_FILENAME;
use crate::types::{GossipEncoding, GossipKind, GossipTopic, MessageData, SubnetDiscovery};
use crate::types::{
subnet_id_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, MessageData,
SubnetDiscovery,
};
use crate::Eth2Enr;
use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
use futures::prelude::*;
Expand Down Expand Up @@ -100,8 +103,6 @@ pub enum BehaviourEvent<TSpec: EthSpec> {
/// The message itself.
message: PubsubMessage<TSpec>,
},
/// Subscribed to peer for given topic
PeerSubscribed(PeerId, TopicHash),
/// Inform the network to send a Status to this peer.
StatusPeer(PeerId),
}
Expand Down Expand Up @@ -665,9 +666,15 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
}
}
GossipsubEvent::Subscribed { peer_id, topic } => {
self.add_event(BehaviourEvent::PeerSubscribed(peer_id, topic));
if let Some(subnet_id) = subnet_id_from_topic_hash(&topic) {
self.peer_manager.add_subscription(&peer_id, subnet_id);
}
}
GossipsubEvent::Unsubscribed { peer_id, topic } => {
if let Some(subnet_id) = subnet_id_from_topic_hash(&topic) {
self.peer_manager.remove_subscription(&peer_id, subnet_id);
}
}
GossipsubEvent::Unsubscribed { .. } => {}
}
}

Expand Down Expand Up @@ -1102,6 +1109,9 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
fn inject_disconnected(&mut self, peer_id: &PeerId) {
// If the application/behaviour layers thinks this peer has connected inform it of the disconnect.

// Remove all subnet subscriptions from peerdb for the disconnected peer.
self.peer_manager().remove_all_subscriptions(&peer_id);

if self
.network_globals
.peers
Expand Down
7 changes: 7 additions & 0 deletions beacon_node/eth2_libp2p/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,13 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
match query_result.1 {
Ok(r) if r.is_empty() => {
debug!(self.log, "Grouped subnet discovery query yielded no results."; "subnets_searched_for" => ?subnets_searched_for);
queries.iter().for_each(|query| {
self.add_subnet_query(
query.subnet_id,
query.min_ttl,
query.retries + 1,
);
})
}
Ok(r) => {
debug!(self.log, "Peer grouped subnet discovery request completed"; "peers_found" => r.len(), "subnets_searched_for" => ?subnets_searched_for);
Expand Down
24 changes: 24 additions & 0 deletions beacon_node/eth2_libp2p/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,27 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.status_peers.insert(peer_id.clone());
}

/// Adds a gossipsub subscription to a peer in the peerdb.
pub fn add_subscription(&self, peer_id: &PeerId, subnet_id: SubnetId) {
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
info.subnets.insert(subnet_id);
}
}

/// Removes a gossipsub subscription to a peer in the peerdb.
pub fn remove_subscription(&self, peer_id: &PeerId, subnet_id: SubnetId) {
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
info.subnets.remove(&subnet_id);
}
}

/// Removes all gossipsub subscriptions to a peer in the peerdb.
pub fn remove_all_subscriptions(&self, peer_id: &PeerId) {
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
info.subnets = Default::default();
}
}

/* Notifications from the Swarm */

/// Updates the state of the peer as disconnected.
Expand Down Expand Up @@ -535,6 +556,9 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
} else {
debug!(self.log, "Received old metadata";
"peer_id" => %peer_id, "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number);
// Updating metadata even in this case to prevent storing
// incorrect `metadata.attnets` for a peer
peer_info.meta_data = Some(meta_data);
}
} else {
// we have no meta-data for this peer, update
Expand Down
12 changes: 10 additions & 2 deletions beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub struct PeerInfo<T: EthSpec> {
/// The ENR subnet bitfield of the peer. This may be determined after it's initial
/// connection.
pub meta_data: Option<MetaData<T>>,
/// Subnets the peer is connected to.
pub subnets: HashSet<SubnetId>,
/// The time we would like to retain this peer. After this time, the peer is no longer
/// necessary.
#[serde(skip)]
Expand All @@ -60,6 +62,7 @@ impl<TSpec: EthSpec> Default for PeerInfo<TSpec> {
connection_status: Default::default(),
listening_addresses: Vec::new(),
seen_addresses: HashSet::new(),
subnets: HashSet::new(),
sync_status: PeerSyncStatus::Unknown,
meta_data: None,
min_ttl: None,
Expand All @@ -80,14 +83,19 @@ impl<T: EthSpec> PeerInfo<T> {
}
}

/// Returns if the peer is subscribed to a given `SubnetId`
pub fn on_subnet(&self, subnet_id: SubnetId) -> bool {
/// Returns if the peer is subscribed to a given `SubnetId` from the metadata attnets field.
pub fn on_subnet_metadata(&self, subnet_id: SubnetId) -> bool {
if let Some(meta_data) = &self.meta_data {
return meta_data.attnets.get(*subnet_id as usize).unwrap_or(false);
}
false
}

/// Returns if the peer is subscribed to a given `SubnetId` from the gossipsub subscriptions.
pub fn on_subnet_gossipsub(&self, subnet_id: SubnetId) -> bool {
self.subnets.contains(&subnet_id)
}

/// Returns the seen IP addresses of the peer.
pub fn seen_addresses<'a>(&'a self) -> impl Iterator<Item = IpAddr> + 'a {
self.seen_addresses
Expand Down
8 changes: 6 additions & 2 deletions beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,11 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
self.peers
.iter()
.filter(move |(_, info)| {
info.is_connected() && info.on_subnet(subnet_id) && info.is_good_gossipsub_peer()
// We check both the metadata and gossipsub data as we only want to count long-lived subscribed peers
info.is_connected()
&& info.on_subnet_metadata(subnet_id)
&& info.on_subnet_gossipsub(subnet_id)
&& info.is_good_gossipsub_peer()
})
.map(|(peer_id, _)| peer_id)
}
Expand Down Expand Up @@ -357,7 +361,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
let log = &self.log;
self.peers.iter_mut()
.filter(move |(_, info)| {
info.is_connected() && info.on_subnet(subnet_id)
info.is_connected() && info.on_subnet_metadata(subnet_id) && info.on_subnet_gossipsub(subnet_id)
})
.for_each(|(peer_id,info)| {
if info.min_ttl.is_none() || Some(min_ttl) > info.min_ttl {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/eth2_libp2p/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ pub use globals::NetworkGlobals;
pub use pubsub::{MessageData, PubsubMessage};
pub use subnet::SubnetDiscovery;
pub use sync_state::SyncState;
pub use topics::{GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS};
pub use topics::{subnet_id_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS};
23 changes: 22 additions & 1 deletion beacon_node/eth2_libp2p/src/types/topics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use libp2p::gossipsub::IdentTopic as Topic;
use libp2p::gossipsub::{IdentTopic as Topic, TopicHash};
use serde_derive::{Deserialize, Serialize};
use types::SubnetId;

Expand Down Expand Up @@ -193,6 +193,15 @@ impl From<SubnetId> for GossipKind {

// helper functions

/// Get subnet id from an attestation subnet topic hash.
pub fn subnet_id_from_topic_hash(topic_hash: &TopicHash) -> Option<SubnetId> {
let gossip_topic = GossipTopic::decode(topic_hash.as_str()).ok()?;
if let GossipKind::Attestation(subnet_id) = gossip_topic.kind() {
return Some(*subnet_id);
}
None
}

// Determines if a string is a committee topic.
fn committee_topic_index(topic: &str) -> Option<SubnetId> {
if topic.starts_with(BEACON_ATTESTATION_PREFIX) {
Expand Down Expand Up @@ -289,4 +298,16 @@ mod tests {
// Empty parts
assert!(GossipTopic::decode("////").is_err());
}

#[test]
fn test_subnet_id_from_topic_hash() {
let topic_hash = TopicHash::from_raw("/eth2/e1925f3b/beacon_block/ssz_snappy");
assert!(subnet_id_from_topic_hash(&topic_hash).is_none());

let topic_hash = TopicHash::from_raw("/eth2/e1925f3b/beacon_attestation_42/ssz_snappy");
assert_eq!(
subnet_id_from_topic_hash(&topic_hash),
Some(SubnetId::new(42))
);
}
}
15 changes: 7 additions & 8 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,13 +465,13 @@ fn spawn_service<T: BeaconChainTypes>(
// We currently do not perform any action here.
},
BehaviourEvent::PeerDisconnected(peer_id) => {
let _ = service
.router_send
.send(RouterMessage::PeerDisconnected(peer_id))
.map_err(|_| {
debug!(service.log, "Failed to send peer disconnect to router");
});
},
let _ = service
.router_send
.send(RouterMessage::PeerDisconnected(peer_id))
.map_err(|_| {
debug!(service.log, "Failed to send peer disconnect to router");
});
},
BehaviourEvent::RequestReceived{peer_id, id, request} => {
let _ = service
.router_send
Expand Down Expand Up @@ -543,7 +543,6 @@ fn spawn_service<T: BeaconChainTypes>(
}
}
}
BehaviourEvent::PeerSubscribed(_, _) => {},
}
Libp2pEvent::NewListenAddr(multiaddr) => {
service.network_globals.listen_multiaddrs.write().push(multiaddr);
Expand Down