Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(network): introduce relay reservation score #2634

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
adb81ad
docs: retrospectively apply changelog for hotfix
jacderida Jan 10, 2025
bfae2a1
feat(metrics): implement custom relay client metrics
RolandSherwin Jan 10, 2025
340a793
Merge pull request #2618 from RolandSherwin/metrics_relay_clientt
jacderida Jan 10, 2025
a0a8981
fix(network): set the record count metric as soon as we restart
RolandSherwin Jan 10, 2025
cacc5f4
Merge pull request #2617 from RolandSherwin/update_record_len_on_restart
jacderida Jan 13, 2025
b3abd1b
fix(network): consider MultiAddrNotSupported as a serious issue if al…
RolandSherwin Jan 14, 2025
819d69b
Merge pull request #2628 from RolandSherwin/outgoing_conn_error_fix
jacderida Jan 14, 2025
23d9ae4
Merge branch 'stable' into rc-2024.12.1-hotfix5
jacderida Jan 20, 2025
f0a1275
chore: tiny fixes to metrics and log
RolandSherwin Jan 13, 2025
aae52e9
feat(network): introduce relay reservation score
RolandSherwin Jan 15, 2025
eea80e6
feat: use a rolling windows for the reservation health
RolandSherwin Jan 15, 2025
e00bb43
fix(network): push identify info when we use a new relay server
RolandSherwin Jan 16, 2025
62029c4
refactor(network): move identify event processing to own module
RolandSherwin Jan 16, 2025
5b2c26d
feat(network): update address in RT on new identify push
RolandSherwin Jan 16, 2025
8deebc9
feat(metrics): expose health metrics and don't disconnect on low succ…
RolandSherwin Jan 17, 2025
277d63f
fix(relay): push identify to all peers
RolandSherwin Jan 17, 2025
563be9a
fix(node_manager): do not use delimiter for env vars
RolandSherwin Jan 18, 2025
c292059
refactor(metrics): feature gate reservation health metric
RolandSherwin Jan 20, 2025
86988e3
chore(network): updated based on comments
RolandSherwin Jan 20, 2025
820b92b
fix(manager): push new identify to just the connected peers
RolandSherwin Jan 21, 2025
c7ab601
fix(network): push identify info to the unique connected peers only
RolandSherwin Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Remove the 'dial error' output on the `file upload` command

## 2024-12-21

### Network

#### Fixed

- Do not dial back when a new peer is detected. This resulted in a large number of open connections,
in turn causing increased CPU usage.

### Client

#### Changed

- Remove the 'dial error' output on the `file upload` command

## 2024-12-18

### General
Expand Down
20 changes: 13 additions & 7 deletions ant-networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,19 +594,17 @@ impl NetworkBuilder {
let kademlia = {
match record_store_cfg {
Some(store_cfg) => {
#[cfg(feature = "open-metrics")]
let record_stored_metrics =
metrics_recorder.as_ref().map(|r| r.records_stored.clone());
let node_record_store = NodeRecordStore::with_config(
peer_id,
store_cfg,
network_event_sender.clone(),
local_swarm_cmd_sender.clone(),
#[cfg(feature = "open-metrics")]
record_stored_metrics,
);
#[cfg(feature = "open-metrics")]
let mut node_record_store = node_record_store;
#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = &metrics_recorder {
node_record_store = node_record_store
.set_record_count_metric(metrics_recorder.records_stored.clone());
}

let store = UnifiedRecordStore::Node(node_record_store);
debug!("Using Kademlia with NodeRecordStore!");
Expand Down Expand Up @@ -704,6 +702,14 @@ impl NetworkBuilder {
// Enable relay manager for nodes behind home network
let relay_manager = if !is_client && self.is_behind_home_network {
let relay_manager = RelayManager::new(peer_id);
#[cfg(feature = "open-metrics")]
let mut relay_manager = relay_manager;
#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = &metrics_recorder {
relay_manager.set_reservation_health_metrics(
metrics_recorder.relay_reservation_health.clone(),
);
}
Some(relay_manager)
} else {
info!("Relay manager is disabled for this node.");
Expand Down
225 changes: 225 additions & 0 deletions ant-networking/src/event/identify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// Copyright 2024 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::relay_manager::is_a_relayed_peer;
use crate::{multiaddr_is_global, multiaddr_strip_p2p, NetworkEvent, SwarmDriver};
use ant_protocol::version::{IDENTIFY_NODE_VERSION_STR, IDENTIFY_PROTOCOL_STR};
use libp2p::identify::Info;
use libp2p::kad::K_VALUE;
use libp2p::multiaddr::Protocol;
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
use libp2p::Multiaddr;
use std::collections::HashSet;
use std::time::Instant;

impl SwarmDriver {
pub(super) fn handle_identify_event(&mut self, identify_event: libp2p::identify::Event) {
match identify_event {
libp2p::identify::Event::Received {
peer_id,
info,
connection_id,
} => {
let start = Instant::now();
self.handle_identify_received(peer_id, info, connection_id);
trace!("SwarmEvent handled in {:?}: identify", start.elapsed());
}
// Log the other Identify events.
libp2p::identify::Event::Sent { .. } => debug!("identify: {identify_event:?}"),
libp2p::identify::Event::Pushed { .. } => debug!("identify: {identify_event:?}"),
libp2p::identify::Event::Error { .. } => debug!("identify: {identify_event:?}"),
}
}

fn handle_identify_received(
&mut self,
peer_id: libp2p::PeerId,
info: Info,
connection_id: libp2p::swarm::ConnectionId,
) {
debug!(conn_id=%connection_id, %peer_id, ?info, "identify: received info");

let our_identify_protocol = IDENTIFY_PROTOCOL_STR.read().expect("IDENTIFY_PROTOCOL_STR has been locked to write. A call to set_network_id performed. This should not happen.").to_string();

if info.protocol_version != our_identify_protocol {
warn!(?info.protocol_version, "identify: {peer_id:?} does not have the same protocol. Our IDENTIFY_PROTOCOL_STR: {our_identify_protocol:?}");

self.send_event(NetworkEvent::PeerWithUnsupportedProtocol {
our_protocol: our_identify_protocol,
their_protocol: info.protocol_version,
});
// Block the peer from any further communication.
self.swarm.behaviour_mut().blocklist.block_peer(peer_id);
if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) {
error!("Clearing out a protocol mismatch peer from RT. The peer pushed an incorrect identify info after being added: {peer_id:?}");
self.update_on_peer_removal(*dead_peer.node.key.preimage());
}

return;
}

let our_agent_version = IDENTIFY_NODE_VERSION_STR.read().expect("IDENTIFY_NODE_VERSION_STR has been locked to write. A call to set_network_id performed. This should not happen.").to_string();
// if client, return.
if info.agent_version != our_agent_version {
return;
}

let has_dialed = self.dialed_peers.contains(&peer_id);

// If we're not in local mode, only add globally reachable addresses.
// Strip the `/p2p/...` part of the multiaddresses.
// Collect into a HashSet directly to avoid multiple allocations and handle deduplication.
let mut addrs: HashSet<Multiaddr> = match self.local {
true => info
.listen_addrs
.into_iter()
.map(|addr| multiaddr_strip_p2p(&addr))
.collect(),
false => info
.listen_addrs
.into_iter()
.filter(multiaddr_is_global)
.map(|addr| multiaddr_strip_p2p(&addr))
.collect(),
};

let is_relayed_peer = is_a_relayed_peer(&addrs);

let is_bootstrap_peer = self
.bootstrap_peers
.iter()
.any(|(_ilog2, peers)| peers.contains(&peer_id));

// Do not use an `already relayed` peer as `potential relay candidate`.
if !is_relayed_peer && !is_bootstrap_peer {
if let Some(relay_manager) = self.relay_manager.as_mut() {
debug!("Adding candidate relay server {peer_id:?}, it's not a bootstrap node");
relay_manager.add_potential_candidates(&peer_id, &addrs, &info.protocols);
}
}

let (kbucket_full, already_present_in_rt, ilog2) =
if let Some(kbucket) = self.swarm.behaviour_mut().kademlia.kbucket(peer_id) {
let ilog2 = kbucket.range().0.ilog2();
let num_peers = kbucket.num_entries();
let is_bucket_full = num_peers >= K_VALUE.into();

// check if peer_id is already a part of RT
let already_present_in_rt = kbucket
.iter()
.any(|entry| entry.node.key.preimage() == &peer_id);

(is_bucket_full, already_present_in_rt, ilog2)
} else {
return;
};

// If the peer is part already of the RT, try updating the addresses based on the new push info.
// We don't have to dial it back.
if already_present_in_rt {
debug!("Received identify for {peer_id:?} that is already part of the RT. Checking if the addresses {addrs:?} are new.");

self.update_pre_existing_peer(peer_id, addrs.clone());
return;
}

// When received an identify from un-dialed peer, try to dial it
// The dial shall trigger the same identify to be sent again and confirm
// peer is external accessible, hence safe to be added into RT.
if !self.local && !has_dialed {
// Only need to dial back for not fulfilled kbucket
if kbucket_full {
debug!("received identify for a full bucket {ilog2:?}, not dialing {peer_id:?} on {addrs:?}");
return;
}

info!(%peer_id, ?addrs, "received identify info from undialed peer for not full kbucket {ilog2:?}, dial back to confirm external accessible");
if let Err(err) = self.swarm.dial(
DialOpts::peer_id(peer_id)
.condition(PeerCondition::NotDialing)
.addresses(addrs.iter().cloned().collect())
.build(),
) {
warn!(%peer_id, ?addrs, "dialing error: {err:?}");
}
} else
// We care only for peers that we dialed and thus are reachable.
// Or if we are local, we can add the peer directly.
{
// A bad node cannot establish a connection with us. So we can add it to the RT directly.

// With the new bootstrap cache, the workload is distributed,
// hence no longer need to replace bootstrap nodes for workload share.
// self.remove_bootstrap_from_full(peer_id);

// Avoid have `direct link format` addrs co-exists with `relay` addr
if is_relayed_peer {
addrs.retain(|multiaddr| {
multiaddr.iter().any(|p| matches!(p, Protocol::P2pCircuit))
});
}

debug!(%peer_id, ?addrs, "identify: attempting to add addresses to routing table");

// Attempt to add the addresses to the routing table.
for multiaddr in addrs {
let _routing_update = self
.swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, multiaddr);
}
}
}

/// If the peer is part already of the RT, try updating the addresses based on the new push info.
fn update_pre_existing_peer(&mut self, peer_id: libp2p::PeerId, new_addrs: HashSet<Multiaddr>) {
if let Some(kbucket) = self.swarm.behaviour_mut().kademlia.kbucket(peer_id) {
let mut addresses_to_add = Vec::new();
let mut addresses_to_remove = Vec::new();

let Some(entry) = kbucket
.iter()
.find(|entry| entry.node.key.preimage() == &peer_id)
else {
warn!("Peer {peer_id:?} is not part of the RT. Cannot update addresses.");
return;
};
let existing_addrs = entry
.node
.value
.iter()
.map(multiaddr_strip_p2p)
.collect::<HashSet<_>>();
addresses_to_add.extend(new_addrs.difference(&existing_addrs));
addresses_to_remove.extend(existing_addrs.difference(&new_addrs));

if !addresses_to_remove.is_empty() {
debug!("Removing addresses from RT for {peer_id:?} as the new identify does not contain them: {addresses_to_remove:?}");
for multiaddr in addresses_to_remove {
let _routing_update = self
.swarm
.behaviour_mut()
.kademlia
.remove_address(&peer_id, multiaddr);
}
}

if !addresses_to_add.is_empty() {
debug!("Adding addresses to RT for {peer_id:?} as the new identify contains them: {addresses_to_add:?}");
for multiaddr in addresses_to_add {
let _routing_update = self
.swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, multiaddr.clone());
}
}
}
}
}
1 change: 1 addition & 0 deletions ant-networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

mod identify;
mod kad;
mod request_response;
mod swarm;
Expand Down
Loading
Loading