Skip to content

Commit

Permalink
Subnet discovery fixes (#2095)
Browse files Browse the repository at this point in the history
## Issue Addressed

N/A

## Proposed Changes

Fixes multiple issues related to discovering of subnet peers.
1. Subnet discovery retries after yielding no results
2. Metadata updates if peer send older metadata
3. peerdb stores the peer subscriptions from gossipsub
  • Loading branch information
pawanjay176 committed Dec 17, 2020
1 parent ca08fc7 commit f998eff
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 37 deletions.
34 changes: 17 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion beacon_node/eth2_libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ regex = "1.3.9"
[dependencies.libp2p]
#version = "0.23.0"
git = "https://github.com/sigp/rust-libp2p"
rev = "e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25"
rev = "830e6fabb7ee51281a98f5e092f056668adbef25"
default-features = false
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns", "tcp-tokio"]

Expand Down
20 changes: 15 additions & 5 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use crate::peer_manager::{
};
use crate::rpc::*;
use crate::service::METADATA_FILENAME;
use crate::types::{GossipEncoding, GossipKind, GossipTopic, MessageData, SubnetDiscovery};
use crate::types::{
subnet_id_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, MessageData,
SubnetDiscovery,
};
use crate::Eth2Enr;
use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
use futures::prelude::*;
Expand Down Expand Up @@ -100,8 +103,6 @@ pub enum BehaviourEvent<TSpec: EthSpec> {
/// The message itself.
message: PubsubMessage<TSpec>,
},
/// Subscribed to peer for given topic
PeerSubscribed(PeerId, TopicHash),
/// Inform the network to send a Status to this peer.
StatusPeer(PeerId),
}
Expand Down Expand Up @@ -665,9 +666,15 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
}
}
GossipsubEvent::Subscribed { peer_id, topic } => {
self.add_event(BehaviourEvent::PeerSubscribed(peer_id, topic));
if let Some(subnet_id) = subnet_id_from_topic_hash(&topic) {
self.peer_manager.add_subscription(&peer_id, subnet_id);
}
}
GossipsubEvent::Unsubscribed { peer_id, topic } => {
if let Some(subnet_id) = subnet_id_from_topic_hash(&topic) {
self.peer_manager.remove_subscription(&peer_id, subnet_id);
}
}
GossipsubEvent::Unsubscribed { .. } => {}
}
}

Expand Down Expand Up @@ -1102,6 +1109,9 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
fn inject_disconnected(&mut self, peer_id: &PeerId) {
// If the application/behaviour layers thinks this peer has connected inform it of the disconnect.

// Remove all subnet subscriptions from peerdb for the disconnected peer.
self.peer_manager().remove_all_subscriptions(&peer_id);

if self
.network_globals
.peers
Expand Down
7 changes: 7 additions & 0 deletions beacon_node/eth2_libp2p/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,13 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
match query_result.1 {
Ok(r) if r.is_empty() => {
debug!(self.log, "Grouped subnet discovery query yielded no results."; "subnets_searched_for" => ?subnets_searched_for);
queries.iter().for_each(|query| {
self.add_subnet_query(
query.subnet_id,
query.min_ttl,
query.retries + 1,
);
})
}
Ok(r) => {
debug!(self.log, "Peer grouped subnet discovery request completed"; "peers_found" => r.len(), "subnets_searched_for" => ?subnets_searched_for);
Expand Down
24 changes: 24 additions & 0 deletions beacon_node/eth2_libp2p/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,27 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.status_peers.insert(peer_id.clone());
}

/// Adds a gossipsub subscription to a peer in the peerdb.
pub fn add_subscription(&self, peer_id: &PeerId, subnet_id: SubnetId) {
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
info.subnets.insert(subnet_id);
}
}

/// Removes a gossipsub subscription to a peer in the peerdb.
pub fn remove_subscription(&self, peer_id: &PeerId, subnet_id: SubnetId) {
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
info.subnets.remove(&subnet_id);
}
}

/// Removes all gossipsub subscriptions to a peer in the peerdb.
pub fn remove_all_subscriptions(&self, peer_id: &PeerId) {
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
info.subnets = Default::default();
}
}

/* Notifications from the Swarm */

/// Updates the state of the peer as disconnected.
Expand Down Expand Up @@ -541,6 +562,9 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
} else {
debug!(self.log, "Received old metadata";
"peer_id" => %peer_id, "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number);
// Updating metadata even in this case to prevent storing
// incorrect `metadata.attnets` for a peer
peer_info.meta_data = Some(meta_data);
}
} else {
// we have no meta-data for this peer, update
Expand Down
12 changes: 10 additions & 2 deletions beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub struct PeerInfo<T: EthSpec> {
/// The ENR subnet bitfield of the peer. This may be determined after it's initial
/// connection.
pub meta_data: Option<MetaData<T>>,
/// Subnets the peer is connected to.
pub subnets: HashSet<SubnetId>,
/// The time we would like to retain this peer. After this time, the peer is no longer
/// necessary.
#[serde(skip)]
Expand All @@ -60,6 +62,7 @@ impl<TSpec: EthSpec> Default for PeerInfo<TSpec> {
connection_status: Default::default(),
listening_addresses: Vec::new(),
seen_addresses: HashSet::new(),
subnets: HashSet::new(),
sync_status: PeerSyncStatus::Unknown,
meta_data: None,
min_ttl: None,
Expand All @@ -80,14 +83,19 @@ impl<T: EthSpec> PeerInfo<T> {
}
}

/// Returns if the peer is subscribed to a given `SubnetId`
pub fn on_subnet(&self, subnet_id: SubnetId) -> bool {
/// Returns if the peer is subscribed to a given `SubnetId` from the metadata attnets field.
pub fn on_subnet_metadata(&self, subnet_id: SubnetId) -> bool {
if let Some(meta_data) = &self.meta_data {
return meta_data.attnets.get(*subnet_id as usize).unwrap_or(false);
}
false
}

/// Returns if the peer is subscribed to a given `SubnetId` from the gossipsub subscriptions.
pub fn on_subnet_gossipsub(&self, subnet_id: SubnetId) -> bool {
self.subnets.contains(&subnet_id)
}

/// Returns the seen IP addresses of the peer.
pub fn seen_addresses<'a>(&'a self) -> impl Iterator<Item = IpAddr> + 'a {
self.seen_addresses
Expand Down
8 changes: 6 additions & 2 deletions beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,11 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
self.peers
.iter()
.filter(move |(_, info)| {
info.is_connected() && info.on_subnet(subnet_id) && info.is_good_gossipsub_peer()
// We check both the metadata and gossipsub data as we only want to count long-lived subscribed peers
info.is_connected()
&& info.on_subnet_metadata(subnet_id)
&& info.on_subnet_gossipsub(subnet_id)
&& info.is_good_gossipsub_peer()
})
.map(|(peer_id, _)| peer_id)
}
Expand Down Expand Up @@ -357,7 +361,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
let log = &self.log;
self.peers.iter_mut()
.filter(move |(_, info)| {
info.is_connected() && info.on_subnet(subnet_id)
info.is_connected() && info.on_subnet_metadata(subnet_id) && info.on_subnet_gossipsub(subnet_id)
})
.for_each(|(peer_id,info)| {
if info.min_ttl.is_none() || Some(min_ttl) > info.min_ttl {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/eth2_libp2p/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ pub use globals::NetworkGlobals;
pub use pubsub::{MessageData, PubsubMessage};
pub use subnet::SubnetDiscovery;
pub use sync_state::SyncState;
pub use topics::{GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS};
pub use topics::{subnet_id_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS};
23 changes: 22 additions & 1 deletion beacon_node/eth2_libp2p/src/types/topics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use libp2p::gossipsub::IdentTopic as Topic;
use libp2p::gossipsub::{IdentTopic as Topic, TopicHash};
use serde_derive::{Deserialize, Serialize};
use types::SubnetId;

Expand Down Expand Up @@ -193,6 +193,15 @@ impl From<SubnetId> for GossipKind {

// helper functions

/// Get subnet id from an attestation subnet topic hash.
pub fn subnet_id_from_topic_hash(topic_hash: &TopicHash) -> Option<SubnetId> {
let gossip_topic = GossipTopic::decode(topic_hash.as_str()).ok()?;
if let GossipKind::Attestation(subnet_id) = gossip_topic.kind() {
return Some(*subnet_id);
}
None
}

// Determines if a string is a committee topic.
fn committee_topic_index(topic: &str) -> Option<SubnetId> {
if topic.starts_with(BEACON_ATTESTATION_PREFIX) {
Expand Down Expand Up @@ -289,4 +298,16 @@ mod tests {
// Empty parts
assert!(GossipTopic::decode("////").is_err());
}

#[test]
fn test_subnet_id_from_topic_hash() {
let topic_hash = TopicHash::from_raw("/eth2/e1925f3b/beacon_block/ssz_snappy");
assert!(subnet_id_from_topic_hash(&topic_hash).is_none());

let topic_hash = TopicHash::from_raw("/eth2/e1925f3b/beacon_attestation_42/ssz_snappy");
assert_eq!(
subnet_id_from_topic_hash(&topic_hash),
Some(SubnetId::new(42))
);
}
}
15 changes: 7 additions & 8 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,13 +465,13 @@ fn spawn_service<T: BeaconChainTypes>(
// We currently do not perform any action here.
},
BehaviourEvent::PeerDisconnected(peer_id) => {
let _ = service
.router_send
.send(RouterMessage::PeerDisconnected(peer_id))
.map_err(|_| {
debug!(service.log, "Failed to send peer disconnect to router");
});
},
let _ = service
.router_send
.send(RouterMessage::PeerDisconnected(peer_id))
.map_err(|_| {
debug!(service.log, "Failed to send peer disconnect to router");
});
},
BehaviourEvent::RequestReceived{peer_id, id, request} => {
let _ = service
.router_send
Expand Down Expand Up @@ -543,7 +543,6 @@ fn spawn_service<T: BeaconChainTypes>(
}
}
}
BehaviourEvent::PeerSubscribed(_, _) => {},
}
Libp2pEvent::NewListenAddr(multiaddr) => {
service.network_globals.listen_multiaddrs.write().push(multiaddr);
Expand Down

0 comments on commit f998eff

Please sign in to comment.