Skip to content

Commit

Permalink
refactor(metrics): feature gate reservation health metric
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed Jan 20, 2025
1 parent f7b3521 commit 5cad2bf
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 31 deletions.
3 changes: 3 additions & 0 deletions ant-networking/src/event/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ impl SwarmDriver {
} => {
event_string = "incoming";
debug!("IncomingConnection ({connection_id:?}) with local_addr: {local_addr:?} send_back_addr: {send_back_addr:?}");
#[cfg(feature = "open-metrics")]
if let Some(relay_manager) = self.relay_manager.as_mut() {
relay_manager.on_incoming_connection(
&connection_id,
Expand All @@ -229,6 +230,7 @@ impl SwarmDriver {
.on_established_incoming_connection(local_addr.clone());
}
}
#[cfg(feature = "open-metrics")]
if let Some(relay_manager) = self.relay_manager.as_mut() {
relay_manager.on_connection_established(&peer_id, &connection_id);
}
Expand Down Expand Up @@ -434,6 +436,7 @@ impl SwarmDriver {
external_addr_manager
.on_incoming_connection_error(local_addr.clone(), &mut self.swarm);
}
#[cfg(feature = "open-metrics")]
if let Some(relay_manager) = self.relay_manager.as_mut() {
relay_manager.on_incomming_connection_error(&send_back_addr, &connection_id);
}
Expand Down
77 changes: 46 additions & 31 deletions ant-networking/src/relay_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,37 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::driver::{BadNodes, NodeBehaviour};
#[cfg(feature = "open-metrics")]
use libp2p::swarm::ConnectionId;
use libp2p::{
core::transport::ListenerId, multiaddr::Protocol, swarm::ConnectionId, Multiaddr, PeerId,
StreamProtocol, Swarm,
core::transport::ListenerId, multiaddr::Protocol, Multiaddr, PeerId, StreamProtocol, Swarm,
};
#[cfg(feature = "open-metrics")]
use prometheus_client::metrics::gauge::Gauge;
use rand::Rng;
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
#[cfg(feature = "open-metrics")]
use std::sync::atomic::AtomicU64;
use std::{
collections::{btree_map::Entry, BTreeMap, HashMap, HashSet, VecDeque},
time::SystemTime,
};
#[cfg(feature = "open-metrics")]
use std::{collections::btree_map::Entry, time::SystemTime};

const MAX_CONCURRENT_RELAY_CONNECTIONS: usize = 4;
const MAX_POTENTIAL_CANDIDATES: usize = 1000;

/// We could get multiple incoming connections from the same peer through multiple relay servers, and only one of them
/// would succeed. So we wait and collect all such connections 'from' peer, instead of just recording the
/// success/failure for each connection.
#[cfg(feature = "open-metrics")]
const MAX_DURATION_TO_TRACK_INCOMING_CONNECTIONS_PER_PEER: std::time::Duration =
std::time::Duration::from_secs(20);

#[cfg(feature = "open-metrics")]
const RESERVATION_SCORE_ROLLING_WINDOW: usize = 100;

/// The connections from a single peer through multiple relay servers.
/// It is a vector of (relay server, connection id, time of connection, Option<success/failure>).
/// A None value for success/failure means that the connection is still pending.
#[cfg(feature = "open-metrics")]
type ConnectionsFromPeer = Vec<(PeerId, ConnectionId, SystemTime, Option<bool>)>;

pub(crate) fn is_a_relayed_peer(addrs: &HashSet<Multiaddr>) -> bool {
Expand All @@ -56,12 +59,13 @@ pub(crate) struct RelayManager {
connected_relay_servers: BTreeMap<PeerId, Multiaddr>,
/// Tracker for the relayed listen addresses.
relayed_listener_id_map: HashMap<ListenerId, PeerId>,
/// Health of the relayed connections. If the relayed connection is not healthy, we should try to connect to another
/// relay.
reservation_health: RelayReservationHealth,
#[cfg(feature = "open-metrics")]
/// Health of the relayed connections.
reservation_health: Option<RelayReservationHealth>,
}

#[derive(Debug, Default)]
#[cfg(feature = "open-metrics")]
#[derive(Debug)]
struct RelayReservationHealth {
/// We could have multiple incoming connections from the same peer through multiple relay servers. But we could
/// just have a single one as well.
Expand All @@ -70,15 +74,16 @@ struct RelayReservationHealth {
/// A rolling window of reservation score per relay server.
reservation_score: BTreeMap<PeerId, ReservationStat>,
/// To track the avg health of all the reservations.
#[cfg(feature = "open-metrics")]
relay_reservation_health_metric: Option<Gauge<f64, AtomicU64>>,
relay_reservation_health_metric: Gauge<f64, AtomicU64>,
}

#[cfg(feature = "open-metrics")]
#[derive(Debug, Default, Clone)]
struct ReservationStat {
stat: VecDeque<bool>,
}

#[cfg(feature = "open-metrics")]
impl ReservationStat {
fn record_value(&mut self, value: bool) {
self.stat.push_back(value);
Expand Down Expand Up @@ -107,13 +112,18 @@ impl RelayManager {
waiting_for_reservation: Default::default(),
relay_server_candidates: Default::default(),
relayed_listener_id_map: Default::default(),
reservation_health: Default::default(),
#[cfg(feature = "open-metrics")]
reservation_health: None,
}
}

#[cfg(feature = "open-metrics")]
pub(crate) fn set_reservation_health_metrics(&mut self, gauge: Gauge<f64, AtomicU64>) {
self.reservation_health.relay_reservation_health_metric = Some(gauge);
self.reservation_health = Some(RelayReservationHealth {
incoming_connections_from_remote_peer: Default::default(),
reservation_score: Default::default(),
relay_reservation_health_metric: gauge,
})
}

/// Should we keep this peer alive? Closing a connection to that peer would remove that server from the listen addr.
Expand Down Expand Up @@ -276,34 +286,40 @@ impl RelayManager {
}

/// Track the incoming connections to monitor the health of a reservation.
#[cfg(feature = "open-metrics")]
pub(crate) fn on_incoming_connection(
&mut self,
connection_id: &ConnectionId,
local_addr: &Multiaddr,
send_back_addr: &Multiaddr,
) {
self.reservation_health
.on_incoming_connection(connection_id, local_addr, send_back_addr);
if let Some(reservation_health) = &mut self.reservation_health {
reservation_health.on_incoming_connection(connection_id, local_addr, send_back_addr);
}
}

/// Track the connection established to monitor the health of a reservation.
#[cfg(feature = "open-metrics")]
pub(crate) fn on_connection_established(
&mut self,
from_peer: &PeerId,
connection_id: &ConnectionId,
) {
self.reservation_health
.on_connection_established(from_peer, connection_id);
if let Some(reservation_health) = &mut self.reservation_health {
reservation_health.on_connection_established(from_peer, connection_id);
}
}

/// Track the connection error to monitor the health of a reservation.
#[cfg(feature = "open-metrics")]
pub(crate) fn on_incomming_connection_error(
&mut self,
send_back_addr: &Multiaddr,
connection_id: &ConnectionId,
) {
self.reservation_health
.on_incomming_connection_error(send_back_addr, connection_id);
if let Some(reservation_health) = &mut self.reservation_health {
reservation_health.on_incomming_connection_error(send_back_addr, connection_id);
}
}

fn does_it_support_relay_server_protocol(protocols: &Vec<StreamProtocol>) -> bool {
Expand Down Expand Up @@ -345,12 +361,13 @@ impl RelayManager {
}
}

#[cfg(feature = "open-metrics")]
impl RelayReservationHealth {
fn on_incoming_connection(
&mut self,
connection_id: &ConnectionId,
// The local addr would look something like this
// /ip4/138.68.152.2/udp/39821/quic-v1/p2p/12D3KooWHHVo7euYruLYEZHiwZcHG6p99XqHzjyt8MaZPiEKk5Sp/p2p-circuit
// /ip4/<ip>/udp/39821/quic-v1/p2p/12D3KooWHHVo7euYruLYEZHiwZcHG6p99XqHzjyt8MaZPiEKk5Sp/p2p-circuit
local_addr: &Multiaddr,
// The send back addr would not contain the ip addr, but just the peer ids for private nodes.
// send_back_addr: /p2p/12D3KooWGsKUTLCp6Vi8e9hxUMxAtU5CjPynYKqg77KBco5qBMqD
Expand Down Expand Up @@ -523,16 +540,14 @@ impl RelayReservationHealth {
}

#[cfg(feature = "open-metrics")]
if let Some(metric) = &self.relay_reservation_health_metric {
// calculate avg health of all the reservations
let avg_health = self
.reservation_score
.values()
.map(|stat| stat.success_rate())
.sum::<f64>()
/ self.reservation_score.len() as f64;
metric.set(avg_health);
}
// calculate avg health of all the reservations
let avg_health = self
.reservation_score
.values()
.map(|stat| stat.success_rate())
.sum::<f64>()
/ self.reservation_score.len() as f64;
self.relay_reservation_health_metric.set(avg_health);

self.log_reservation_score();
}
Expand Down

0 comments on commit 5cad2bf

Please sign in to comment.