Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make public addresses go first in authority discovery DHT records #3757

Merged
merged 2 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions cumulus/client/relay-chain-minimal-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ fn build_authority_discovery_service<Block: BlockT>(
prometheus_registry: Option<Registry>,
) -> AuthorityDiscoveryService {
let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
let auth_disc_public_addresses = config.network.public_addresses.clone();
let authority_discovery_role = sc_authority_discovery::Role::Discover;
let dht_event_stream = network.event_stream("authority-discovery").filter_map(|e| async move {
match e {
Expand All @@ -65,6 +66,7 @@ fn build_authority_discovery_service<Block: BlockT>(
let (worker, service) = sc_authority_discovery::new_worker_and_service_with_config(
sc_authority_discovery::WorkerConfig {
publish_non_global_ips: auth_disc_publish_non_global_ips,
public_addresses: auth_disc_public_addresses,
// Require that authority discovery records are signed.
strict_record_validation: true,
..Default::default()
Expand Down
2 changes: 2 additions & 0 deletions polkadot/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,7 @@ pub fn new_full<OverseerGenerator: OverseerGen>(

let shared_voter_state = rpc_setup;
let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
let auth_disc_public_addresses = config.network.public_addresses.clone();
let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network);

let genesis_hash = client.block_hash(0).ok().flatten().expect("Genesis block exists; qed");
Expand Down Expand Up @@ -1061,6 +1062,7 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
let (worker, service) = sc_authority_discovery::new_worker_and_service_with_config(
sc_authority_discovery::WorkerConfig {
publish_non_global_ips: auth_disc_publish_non_global_ips,
public_addresses: auth_disc_public_addresses,
// Require that authority discovery records are signed.
strict_record_validation: true,
..Default::default()
Expand Down
2 changes: 2 additions & 0 deletions substrate/bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ pub fn new_full_base(

let shared_voter_state = rpc_setup;
let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
let auth_disc_public_addresses = config.network.public_addresses.clone();
let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network);
let genesis_hash = client.block_hash(0).ok().flatten().expect("Genesis block exists; qed");

Expand Down Expand Up @@ -610,6 +611,7 @@ pub fn new_full_base(
sc_authority_discovery::new_worker_and_service_with_config(
sc_authority_discovery::WorkerConfig {
publish_non_global_ips: auth_disc_publish_non_global_ips,
public_addresses: auth_disc_public_addresses,
..Default::default()
},
client.clone(),
Expand Down
1 change: 1 addition & 0 deletions substrate/client/authority-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ multihash = { version = "0.18.1", default-features = false, features = [
"sha2",
"std",
] }
linked_hash_set = "0.1.4"
log = { workspace = true, default-features = true }
prost = "0.12"
rand = "0.8.5"
Expand Down
5 changes: 5 additions & 0 deletions substrate/client/authority-discovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ pub struct WorkerConfig {
/// Defaults to `true` to avoid the surprise factor.
pub publish_non_global_ips: bool,

/// Public addresses set by the node operator to always publish first in the authority
/// discovery DHT record.
pub public_addresses: Vec<Multiaddr>,

/// Reject authority discovery records that are not signed by their network identity (PeerId)
///
/// Defaults to `false` to provide compatibility with old versions
Expand All @@ -104,6 +108,7 @@ impl Default for WorkerConfig {
// `authority_discovery_dht_event_received`.
max_query_interval: Duration::from_secs(10 * 60),
publish_non_global_ips: true,
public_addresses: Vec::new(),
strict_record_validation: false,
}
}
Expand Down
92 changes: 71 additions & 21 deletions substrate/client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use addr_cache::AddrCache;
use codec::{Decode, Encode};
use ip_network::IpNetwork;
use libp2p::{core::multiaddr, identity::PublicKey, multihash::Multihash, Multiaddr, PeerId};
use linked_hash_set::LinkedHashSet;
use multihash_codetable::{Code, MultihashDigest};

use log::{debug, error, log_enabled};
Expand Down Expand Up @@ -120,14 +121,22 @@ pub struct Worker<Client, Network, Block, DhtEventStream> {

/// Interval to be proactive, publishing own addresses.
publish_interval: ExpIncInterval,

/// Pro-actively publish our own addresses at this interval, if the keys in the keystore
/// have changed.
publish_if_changed_interval: ExpIncInterval,

/// List of keys onto which addresses have been published at the latest publication.
/// Used to check whether they have changed.
latest_published_keys: HashSet<AuthorityId>,

/// Same value as in the configuration.
publish_non_global_ips: bool,

/// Public addresses set by the node operator to always publish first in the authority
/// discovery DHT record.
public_addresses: LinkedHashSet<Multiaddr>,

/// Same value as in the configuration.
strict_record_validation: bool,

Expand All @@ -136,6 +145,7 @@ pub struct Worker<Client, Network, Block, DhtEventStream> {

/// Queue of throttled lookups pending to be passed to the network.
pending_lookups: Vec<AuthorityId>,

/// Set of in-flight lookups.
in_flight_lookups: HashMap<KademliaKey, AuthorityId>,

Expand Down Expand Up @@ -224,6 +234,29 @@ where
None => None,
};

let public_addresses = {
let local_peer_id: Multihash = network.local_peer_id().into();

config
.public_addresses
.into_iter()
.map(|mut address| {
if let Some(multiaddr::Protocol::P2p(peer_id)) = address.iter().last() {
if peer_id != local_peer_id {
error!(
target: LOG_TARGET,
"Discarding invalid local peer ID in public address {address}.",
);
}
// Always discard `/p2p/...` protocol for proper address comparison (local
// peer id will be added before publishing).
address.pop();
}
address
})
.collect()
};

Worker {
from_service: from_service.fuse(),
client,
Expand All @@ -233,6 +266,7 @@ where
publish_if_changed_interval,
latest_published_keys: HashSet::new(),
publish_non_global_ips: config.publish_non_global_ips,
public_addresses,
strict_record_validation: config.strict_record_validation,
query_interval,
pending_lookups: Vec::new(),
Expand Down Expand Up @@ -304,32 +338,48 @@ where
}

fn addresses_to_publish(&self) -> impl Iterator<Item = Multiaddr> {
let peer_id: Multihash = self.network.local_peer_id().into();
let publish_non_global_ips = self.publish_non_global_ips;
let addresses = self.network.external_addresses().into_iter().filter(move |a| {
if publish_non_global_ips {
return true
}
let addresses = self
.public_addresses
.clone()
.into_iter()
.chain(self.network.external_addresses().into_iter().filter_map(|mut address| {
// Make sure the reported external address does not contain `/p2p/...` protocol.
if let Some(multiaddr::Protocol::P2p(_)) = address.iter().last() {
address.pop();
}

a.iter().all(|p| match p {
// The `ip_network` library is used because its `is_global()` method is stable,
// while `is_global()` in the standard library currently isn't.
multiaddr::Protocol::Ip4(ip) if !IpNetwork::from(ip).is_global() => false,
multiaddr::Protocol::Ip6(ip) if !IpNetwork::from(ip).is_global() => false,
_ => true,
if self.public_addresses.contains(&address) {
// Already added above.
None
} else {
Some(address)
}
}))
.filter(move |address| {
Copy link
Contributor Author

@dmitry-markin dmitry-markin Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, may be we should always publish public addresses provided by user even if they are non-global.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dq: I see we have a public_non_global_ips flag which allows us to publish non-global IP addresse.
When would we want to publish non-global-ips? There is a high chance that a non-global ip could not be reached from the other peers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about a testnet in a private network case. But we can also use public_non_global_ips flag for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah private network. When you run it locally on your computer, you also need authority discovery to work ;)

if publish_non_global_ips {
return true
}

address.iter().all(|protocol| match protocol {
// The `ip_network` library is used because its `is_global()` method is stable,
// while `is_global()` in the standard library currently isn't.
multiaddr::Protocol::Ip4(ip) if !IpNetwork::from(ip).is_global() => false,
multiaddr::Protocol::Ip6(ip) if !IpNetwork::from(ip).is_global() => false,
_ => true,
})
})
});
.collect::<Vec<_>>();

debug!(target: LOG_TARGET, "Authority DHT record peer_id='{:?}' addresses='{:?}'", peer_id, addresses.clone().collect::<Vec<_>>());
let peer_id = self.network.local_peer_id();
debug!(
target: LOG_TARGET,
"Authority DHT record peer_id='{peer_id}' addresses='{addresses:?}'",
);

// The address must include the peer id if not already set.
addresses.map(move |a| {
if a.iter().any(|p| matches!(p, multiaddr::Protocol::P2p(_))) {
a
} else {
a.with(multiaddr::Protocol::P2p(peer_id))
}
})
// The address must include the peer id.
let peer_id: Multihash = peer_id.into();
addresses.into_iter().map(move |a| a.with(multiaddr::Protocol::P2p(peer_id)))
}

/// Publish own public addresses.
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ futures = "0.3.21"
futures-timer = "3.0.2"
ip_network = "0.4.1"
libp2p = { version = "0.51.4", features = ["dns", "identify", "kad", "macros", "mdns", "noise", "ping", "request-response", "tcp", "tokio", "websocket", "yamux"] }
linked_hash_set = "0.1.3"
linked_hash_set = "0.1.4"
log = { workspace = true, default-features = true }
mockall = "0.11.3"
parking_lot = "0.12.1"
Expand Down
19 changes: 14 additions & 5 deletions substrate/client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use libp2p::{
},
PeerId,
};
use linked_hash_set::LinkedHashSet;
use log::{debug, info, trace, warn};
use sp_core::hexdisplay::HexDisplay;
use std::{
Expand Down Expand Up @@ -550,14 +551,20 @@ impl NetworkBehaviour for DiscoveryBehaviour {
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
let Some(peer_id) = maybe_peer else { return Ok(Vec::new()) };

let mut list = self
// Collect addresses into [`LinkedHashSet`] to eliminate duplicate entries preserving the
// order of addresses. Give priority to `permanent_addresses` (used with reserved nodes) and
// `ephemeral_addresses` (used for addresses discovered from other sources, like authority
// discovery DHT records).
let mut list: LinkedHashSet<_> = self
.permanent_addresses
.iter()
.filter_map(|(p, a)| (*p == peer_id).then_some(a.clone()))
.collect::<Vec<_>>();
.collect();

if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) {
list.extend(ephemeral_addresses.clone());
ephemeral_addresses.iter().for_each(|address| {
list.insert_if_absent(address.clone());
});
}

{
Expand All @@ -583,12 +590,14 @@ impl NetworkBehaviour for DiscoveryBehaviour {
});
}

list.extend(list_to_filter);
list_to_filter.into_iter().for_each(|address| {
list.insert_if_absent(address);
});
}

trace!(target: "sub-libp2p", "Addresses of {:?}: {:?}", peer_id, list);

Ok(list)
Ok(list.into_iter().collect())
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
Expand Down
Loading