diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index 0547953dc1..2f54e459ce 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -499,7 +499,10 @@ impl MagicSock { let dest = QuicMappedAddr(dest); let mut transmits_sent = 0; - match self.node_map.get_send_addrs(dest) { + match self + .node_map + .get_send_addrs(dest, self.ipv6_reported.load(Ordering::Relaxed)) + { Some((public_key, udp_addr, relay_url, mut msgs)) => { let mut pings_sent = false; // If we have pings to send, we *have* to send them out first. diff --git a/iroh-net/src/magicsock/node_map.rs b/iroh-net/src/magicsock/node_map.rs index 7632dce0c5..3550f34bfb 100644 --- a/iroh-net/src/magicsock/node_map.rs +++ b/iroh-net/src/magicsock/node_map.rs @@ -30,6 +30,7 @@ use crate::{ mod best_addr; mod node_state; +mod udp_paths; pub use node_state::{ConnectionType, ControlMsg, DirectAddrInfo, NodeInfo}; pub(super) use node_state::{DiscoPingPurpose, PingAction, PingRole, SendPing}; @@ -186,6 +187,7 @@ impl NodeMap { pub(super) fn get_send_addrs( &self, addr: QuicMappedAddr, + have_ipv6: bool, ) -> Option<( PublicKey, Option, @@ -195,7 +197,7 @@ impl NodeMap { let mut inner = self.inner.lock(); let ep = inner.get_mut(NodeStateKey::QuicMappedAddr(addr))?; let public_key = *ep.public_key(); - let (udp_addr, relay_url, msgs) = ep.get_send_addrs(); + let (udp_addr, relay_url, msgs) = ep.get_send_addrs(have_ipv6); Some((public_key, udp_addr, relay_url, msgs)) } diff --git a/iroh-net/src/magicsock/node_map/node_state.rs b/iroh-net/src/magicsock/node_map/node_state.rs index df3e88a82d..7a69b57117 100644 --- a/iroh-net/src/magicsock/node_map/node_state.rs +++ b/iroh-net/src/magicsock/node_map/node_state.rs @@ -1,9 +1,7 @@ -use std::{ - collections::{btree_map::Entry, BTreeMap, BTreeSet, HashMap}, - hash::Hash, - net::{IpAddr, SocketAddr}, - time::{Duration, Instant}, -}; +use std::collections::{btree_map::Entry, BTreeMap, BTreeSet, HashMap}; +use std::hash::Hash; +use std::net::{IpAddr, SocketAddr}; +use std::time::{Duration, Instant}; use iroh_metrics::inc; use serde::{Deserialize, Serialize}; @@ -11,21 +9,17 @@ use tokio::sync::mpsc; use tracing::{debug, event, info, instrument, trace, warn, Level}; use watchable::{Watchable, WatcherStream}; -use crate::{ - disco::{self, SendAddr}, - endpoint::AddrInfo, - key::PublicKey, - magicsock::{Timer, HEARTBEAT_INTERVAL}, - net::ip::is_unicast_link_local, - relay::RelayUrl, - stun, - util::relay_only_mode, - NodeAddr, NodeId, -}; - -use crate::magicsock::{metrics::Metrics as MagicsockMetrics, ActorMessage, QuicMappedAddr}; - -use super::best_addr::{self, BestAddr, ClearReason, Source}; +use crate::disco::{self, SendAddr}; +use crate::endpoint::AddrInfo; +use crate::key::PublicKey; +use crate::magicsock::{ActorMessage, MagicsockMetrics, QuicMappedAddr, Timer, HEARTBEAT_INTERVAL}; +use crate::net::ip::is_unicast_link_local; +use crate::relay::RelayUrl; +use crate::util::relay_only_mode; +use crate::{stun, NodeAddr, NodeId}; + +use super::best_addr::{self, ClearReason, Source}; +use super::udp_paths::{NodeUdpPaths, UdpSendAddr}; use super::IpPort; /// Number of addresses that are not active that we keep around per node. @@ -116,10 +110,7 @@ pub(super) struct NodeState { /// /// The fallback/bootstrap path, if non-zero (non-zero for well-behaved clients). relay_url: Option<(RelayUrl, PathState)>, - /// Best non-relay path, i.e. a UDP address. - best_addr: BestAddr, - /// State for each of this node's direct paths. - direct_addr_state: BTreeMap, + udp_paths: NodeUdpPaths, sent_pings: HashMap, /// Last time this node was used. /// @@ -169,9 +160,8 @@ impl NodeState { PathState::new(options.node_id, SendAddr::Relay(url)), ) }), - best_addr: Default::default(), + udp_paths: NodeUdpPaths::new(), sent_pings: HashMap::new(), - direct_addr_state: BTreeMap::new(), last_used: options.active.then(Instant::now), last_call_me_maybe: None, conn_type: Watchable::new(ConnectionType::None), @@ -203,7 +193,8 @@ impl NodeState { let conn_type = self.conn_type.get(); let latency = match conn_type { ConnectionType::Direct(addr) => self - .direct_addr_state + .udp_paths + .paths .get(&addr.into()) .and_then(|state| state.latency()), ConnectionType::Relay(ref url) => self @@ -213,7 +204,8 @@ impl NodeState { .and_then(|(_, state)| state.latency()), ConnectionType::Mixed(addr, ref url) => { let addr_latency = self - .direct_addr_state + .udp_paths + .paths .get(&addr.into()) .and_then(|state| state.latency()); let relay_latency = self @@ -226,7 +218,8 @@ impl NodeState { ConnectionType::None => None, }; let addrs = self - .direct_addr_state + .udp_paths + .paths .iter() .map(|(addr, endpoint_state)| DirectAddrInfo { addr: SocketAddr::from(*addr), @@ -261,31 +254,34 @@ impl NodeState { /// Returns the address(es) that should be used for sending the next packet. /// /// This may return to send on one, both or no paths. - fn addr_for_send(&mut self, now: &Instant) -> (Option, Option) { + fn addr_for_send( + &mut self, + now: &Instant, + have_ipv6: bool, + ) -> (Option, Option) { if relay_only_mode() { debug!("in `DEV_relay_ONLY` mode, giving the relay address as the only viable address for this endpoint"); return (None, self.relay_url()); } - // Update our best addr from candidate addresses (only if it is empty and if we have - // recent pongs). - self.assign_best_addr_from_candidates_if_empty(); - let (best_addr, relay_url) = match self.best_addr.state(*now) { - best_addr::State::Valid(best_addr) => { + let (best_addr, relay_url) = match self.udp_paths.send_addr(*now, have_ipv6) { + UdpSendAddr::Valid(addr) => { // If we have a valid address we use it. - trace!(addr = %best_addr.addr, latency = ?best_addr.latency, - "best_addr is set and valid, use best_addr only"); - (Some(best_addr.addr), None) + trace!(%addr, "UdpSendAddr is valid, use it"); + (Some(addr), None) } - best_addr::State::Outdated(best_addr) => { + UdpSendAddr::Outdated(addr) => { // If the address is outdated we use it, but send via relay at the same time. // We also send disco pings so that it will become valid again if it still // works (i.e. we don't need to holepunch again). - trace!(addr = %best_addr.addr, latency = ?best_addr.latency, - "best_addr is set but outdated, use best_addr and relay"); - (Some(best_addr.addr), self.relay_url()) + trace!(%addr, "UdpSendAddr is outdated, use it together with relay"); + (Some(addr), self.relay_url()) } - best_addr::State::Empty => { - trace!("best_addr is unset, use relay"); + UdpSendAddr::Unconfirmed(addr) => { + trace!(%addr, "UdpSendAddr is unconfirmed, use it together with relay"); + (Some(addr), self.relay_url()) + } + UdpSendAddr::None => { + trace!("No UdpSendAddr, use relay"); (None, self.relay_url()) } }; @@ -356,7 +352,7 @@ impl NodeState { /// /// If this is also the best address, it will be cleared as well. pub(super) fn remove_direct_addr(&mut self, ip_port: &IpPort, reason: ClearReason) { - let Some(state) = self.direct_addr_state.remove(ip_port) else { + let Some(state) = self.udp_paths.paths.remove(ip_port) else { return; }; @@ -365,55 +361,11 @@ impl NodeState { None => debug!(%ip_port, last_seen=%"never", ?reason, "pruning address"), } - self.best_addr - .clear_if_equals((*ip_port).into(), reason, self.relay_url.is_some()); - } - - /// Fixup best_adrr from candidates. - /// - /// If somehow we end up in a state where we failed to set a best_addr, while we do have - /// valid candidates, this will chose a candidate and set best_addr again. Most likely - /// this is a bug elsewhere though. - fn assign_best_addr_from_candidates_if_empty(&mut self) { - if !self.best_addr.is_empty() { - return; - } - - // The highest acceptable latency for an endpoint path. If the latency is higher - // then this the path will be ignored. - const MAX_LATENCY: Duration = Duration::from_secs(60 * 60); - let best_pong = self - .direct_addr_state - .iter() - .fold(None, |best_pong, (ipp, state)| { - let best_latency = best_pong - .map(|p: &PongReply| p.latency) - .unwrap_or(MAX_LATENCY); - match state.recent_pong() { - // This pong is better if it has a lower latency, or if it has the same - // latency but on an IPv6 path. - Some(pong) - if pong.latency < best_latency - || (pong.latency == best_latency && ipp.ip().is_ipv6()) => - { - Some(pong) - } - _ => best_pong, - } - }); - - // If we found a candidate, set to best addr - if let Some(pong) = best_pong { - if let SendAddr::Udp(addr) = pong.from { - warn!(%addr, "No best_addr was set, choose candidate with lowest latency"); - self.best_addr.insert_if_better_or_reconfirm( - addr, - pong.latency, - best_addr::Source::BestCandidate, - pong.pong_at, - ) - } - } + self.udp_paths.best_addr.clear_if_equals( + (*ip_port).into(), + reason, + self.relay_url.is_some(), + ); } /// Whether we need to send another call-me-maybe to the endpoint. @@ -430,7 +382,7 @@ impl NodeState { debug!("no previous full ping: need full ping"); return true; }; - match self.best_addr.state(*now) { + match self.udp_paths.best_addr.state(*now) { best_addr::State::Empty => { debug!("best addr not set: need full ping"); true @@ -461,7 +413,7 @@ impl NodeState { debug!(tx = %hex::encode(txid), addr = %sp.to, "pong not received in timeout"); match sp.to { SendAddr::Udp(addr) => { - if let Some(path_state) = self.direct_addr_state.get_mut(&addr.into()) { + if let Some(path_state) = self.udp_paths.paths.get_mut(&addr.into()) { path_state.last_ping = None; // only clear the best address if there was no sign of life from this path // within the time the pong should have arrived @@ -470,7 +422,7 @@ impl NodeState { .map(|last_alive| last_alive.elapsed() <= PING_TIMEOUT_DURATION) .unwrap_or(false); if !consider_alive { - self.best_addr.clear_if_equals( + self.udp_paths.best_addr.clear_if_equals( addr, ClearReason::PongTimeout, self.relay_url().is_some(), @@ -479,7 +431,7 @@ impl NodeState { } else { // If we have no state for the best addr it should have been cleared // anyway. - self.best_addr.clear_if_equals( + self.udp_paths.best_addr.clear_if_equals( addr, ClearReason::PongTimeout, self.relay_url.is_some(), @@ -539,7 +491,7 @@ impl NodeState { let mut path_found = false; match to { SendAddr::Udp(addr) => { - if let Some(st) = self.direct_addr_state.get_mut(&addr.into()) { + if let Some(st) = self.udp_paths.paths.get_mut(&addr.into()) { st.last_ping.replace(now); path_found = true } @@ -633,7 +585,7 @@ impl NodeState { #[must_use = "actions must be handled"] fn send_pings(&mut self, now: Instant) -> Vec { // We allocate +1 in case the caller wants to add a call-me-maybe message. - let mut ping_msgs = Vec::with_capacity(self.direct_addr_state.len() + 1); + let mut ping_msgs = Vec::with_capacity(self.udp_paths.paths.len() + 1); if let Some((url, state)) = self.relay_url.as_ref() { if state.needs_ping(&now) { @@ -653,7 +605,8 @@ impl NodeState { } self.prune_direct_addresses(); let mut ping_dsts = String::from("["); - self.direct_addr_state + self.udp_paths + .paths .iter() .filter_map(|(ipp, state)| state.needs_ping(&now).then_some(*ipp)) .filter_map(|ipp| { @@ -668,7 +621,7 @@ impl NodeState { debug!( %ping_dsts, dst = %self.node_id.fmt_short(), - paths = %summarize_node_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.udp_paths.paths), "sending pings to node", ); self.last_full_ping.replace(now); @@ -676,7 +629,7 @@ impl NodeState { } pub(super) fn update_from_node_addr(&mut self, n: &AddrInfo) { - if self.best_addr.is_empty() { + if self.udp_paths.best_addr.is_empty() { // we do not have a direct connection, so changing the relay information may // have an effect on our connection status if self.relay_url.is_none() && n.relay_url.is_some() { @@ -702,11 +655,12 @@ impl NodeState { } for &addr in n.direct_addresses.iter() { - self.direct_addr_state + self.udp_paths + .paths .entry(addr.into()) .or_insert_with(|| PathState::new(self.node_id, SendAddr::from(addr))); } - let paths = summarize_node_paths(&self.direct_addr_state); + let paths = summarize_node_paths(&self.udp_paths.paths); debug!(new = ?n.direct_addresses , %paths, "added new direct paths for endpoint"); } @@ -714,10 +668,11 @@ impl NodeState { #[instrument(skip_all, fields(node = %self.node_id.fmt_short()))] pub(super) fn reset(&mut self) { self.last_full_ping = None; - self.best_addr + self.udp_paths + .best_addr .clear(ClearReason::Reset, self.relay_url.is_some()); - for es in self.direct_addr_state.values_mut() { + for es in self.udp_paths.paths.values_mut() { es.last_ping = None; } } @@ -739,7 +694,7 @@ impl NodeState { let now = Instant::now(); let role = match path { - SendAddr::Udp(addr) => match self.direct_addr_state.entry(addr.into()) { + SendAddr::Udp(addr) => match self.udp_paths.paths.entry(addr.into()) { Entry::Occupied(mut occupied) => occupied.get_mut().handle_ping(tx_id, now), Entry::Vacant(vacant) => { info!(%addr, "new direct addr for node"); @@ -787,7 +742,7 @@ impl NodeState { // if the endpoint does not yet have a best_addrr let needs_ping_back = if matches!(path, SendAddr::Udp(_)) && matches!( - self.best_addr.state(now), + self.udp_paths.best_addr.state(now), best_addr::State::Empty | best_addr::State::Outdated(_) ) { // We also need to send a ping to make this path available to us as well. This @@ -803,7 +758,7 @@ impl NodeState { debug!( ?role, needs_ping_back = ?needs_ping_back.is_some(), - paths = %summarize_node_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.udp_paths.paths), "endpoint handled ping", ); PingHandled { @@ -819,7 +774,8 @@ impl NodeState { pub(super) fn prune_direct_addresses(&mut self) { // prune candidates are addresses that are not active let mut prune_candidates: Vec<_> = self - .direct_addr_state + .udp_paths + .paths .iter() .filter(|(_ip_port, state)| !state.is_active()) .map(|(ip_port, state)| (*ip_port, state.last_alive())) @@ -834,7 +790,7 @@ impl NodeState { if prune_count == 0 { // nothing to do, within limits debug!( - paths = %summarize_node_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.udp_paths.paths), "prune addresses: {prune_count} pruned", ); return; @@ -848,7 +804,7 @@ impl NodeState { self.remove_direct_addr(&ip_port, ClearReason::Inactive) } debug!( - paths = %summarize_node_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.udp_paths.paths), "prune addresses: {prune_count} pruned", ); } @@ -857,8 +813,8 @@ impl NodeState { /// assumptions about which paths work. #[instrument("disco", skip_all, fields(node = %self.node_id.fmt_short()))] pub(super) fn note_connectivity_change(&mut self) { - self.best_addr.clear_trust("connectivity changed"); - for es in self.direct_addr_state.values_mut() { + self.udp_paths.best_addr.clear_trust("connectivity changed"); + for es in self.udp_paths.paths.values_mut() { es.clear(); } } @@ -906,7 +862,7 @@ impl NodeState { match src { SendAddr::Udp(addr) => { - match self.direct_addr_state.get_mut(&addr.into()) { + match self.udp_paths.paths.get_mut(&addr.into()) { None => { warn!("ignoring pong: no state for src addr"); // This is no longer an endpoint we care about. @@ -923,7 +879,7 @@ impl NodeState { } } debug!( - paths = %summarize_node_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.udp_paths.paths), "handled pong", ); } @@ -954,7 +910,7 @@ impl NodeState { // TODO(bradfitz): decide how latency vs. preference order affects decision if let SendAddr::Udp(to) = sp.to { debug_assert!(!is_relay, "mismatching relay & udp"); - self.best_addr.insert_if_better_or_reconfirm( + self.udp_paths.best_addr.insert_if_better_or_reconfirm( to, latency, best_addr::Source::ReceivedPong, @@ -991,7 +947,8 @@ impl NodeState { } let ipp = IpPort::from(*peer_sockaddr); call_me_maybe_ipps.insert(ipp); - self.direct_addr_state + self.udp_paths + .paths .entry(ipp) .or_insert_with(|| PathState::new(self.node_id, SendAddr::from(*peer_sockaddr))) .call_me_maybe_time @@ -1001,7 +958,7 @@ impl NodeState { // Zero out all the last_ping times to force send_pings to send new ones, even if // it's been less than 5 seconds ago. Also clear pongs for direct addresses not // included in the updated set. - for (ipp, st) in self.direct_addr_state.iter_mut() { + for (ipp, st) in self.udp_paths.paths.iter_mut() { st.last_ping = None; if !call_me_maybe_ipps.contains(ipp) { // TODO: This seems like a weird way to signal that the endpoint no longer @@ -1014,16 +971,17 @@ impl NodeState { } // Clear trust on our best_addr if it is not included in the updated set. Also // clear the last call-me-maybe send time so we will send one again. - if let Some(addr) = self.best_addr.addr() { + if let Some(addr) = self.udp_paths.best_addr.addr() { let ipp: IpPort = addr.into(); if !call_me_maybe_ipps.contains(&ipp) { - self.best_addr + self.udp_paths + .best_addr .clear_trust("best_addr not in new call-me-maybe"); self.last_call_me_maybe = None; } } debug!( - paths = %summarize_node_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.udp_paths.paths), "updated endpoint paths from call-me-maybe", ); self.send_pings(now) @@ -1031,13 +989,14 @@ impl NodeState { /// Marks this endpoint as having received a UDP payload message. pub(super) fn receive_udp(&mut self, addr: IpPort, now: Instant) { - let Some(state) = self.direct_addr_state.get_mut(&addr) else { + let Some(state) = self.udp_paths.paths.get_mut(&addr) else { debug_assert!(false, "node map inconsistency by_ip_port <-> direct addr"); return; }; state.last_payload_msg = Some(now); self.last_used = Some(now); - self.best_addr + self.udp_paths + .best_addr .reconfirm_if_used(addr.into(), Source::Udp, now); } @@ -1063,7 +1022,8 @@ impl NodeState { pub(super) fn last_ping(&self, addr: &SendAddr) -> Option { match addr { SendAddr::Udp(addr) => self - .direct_addr_state + .udp_paths + .paths .get(&(*addr).into()) .and_then(|ep| ep.last_ping), SendAddr::Relay(url) => self @@ -1100,7 +1060,7 @@ impl NodeState { } // Send heartbeat ping to keep the current addr going as long as we need it. - if let Some(udp_addr) = self.best_addr.addr() { + if let Some(udp_addr) = self.udp_paths.best_addr.addr() { let elapsed = self.last_ping(&SendAddr::Udp(udp_addr)).map(|l| now - l); // Send a ping if the last ping is older than 2 seconds. let needs_ping = match elapsed { @@ -1131,10 +1091,11 @@ impl NodeState { #[instrument("get_send_addrs", skip_all, fields(node = %self.node_id.fmt_short()))] pub(crate) fn get_send_addrs( &mut self, + have_ipv6: bool, ) -> (Option, Option, Vec) { let now = Instant::now(); self.last_used.replace(now); - let (udp_addr, relay_url) = self.addr_for_send(&now); + let (udp_addr, relay_url) = self.addr_for_send(&now, have_ipv6); let mut ping_msgs = Vec::new(); if self.want_call_me_maybe(&now) { @@ -1153,12 +1114,12 @@ impl NodeState { /// Get the direct addresses for this endpoint. pub(super) fn direct_addresses(&self) -> impl Iterator + '_ { - self.direct_addr_state.keys().copied() + self.udp_paths.paths.keys().copied() } #[cfg(test)] pub(super) fn direct_address_states(&self) -> impl Iterator + '_ { - self.direct_addr_state.iter() + self.udp_paths.paths.iter() } pub(super) fn last_used(&self) -> Option { @@ -1223,6 +1184,13 @@ impl PathState { } } + pub(super) fn udp_addr(&self) -> Option { + match self.path { + SendAddr::Udp(addr) => Some(addr), + SendAddr::Relay(_) => None, + } + } + pub(super) fn with_last_payload(node_id: NodeId, path: SendAddr, now: Instant) -> Self { PathState { node_id, @@ -1337,7 +1305,7 @@ impl PathState { } /// Returns the most recent pong if available. - fn recent_pong(&self) -> Option<&PongReply> { + pub(super) fn recent_pong(&self) -> Option<&PongReply> { self.recent_pong.as_ref() } @@ -1604,6 +1572,8 @@ pub enum ConnectionType { mod tests { use std::net::Ipv4Addr; + use best_addr::BestAddr; + use super::{ super::{NodeMap, NodeMapInner}, *, @@ -1659,13 +1629,15 @@ mod tests { node_id: key.public(), last_full_ping: None, relay_url: None, - best_addr: BestAddr::from_parts( - ip_port.into(), - latency, - now, - now + Duration::from_secs(100), + udp_paths: NodeUdpPaths::from_parts( + endpoint_state, + BestAddr::from_parts( + ip_port.into(), + latency, + now, + now + Duration::from_secs(100), + ), ), - direct_addr_state: endpoint_state, sent_pings: HashMap::new(), last_used: Some(now), last_call_me_maybe: None, @@ -1684,8 +1656,7 @@ mod tests { node_id: key.public(), last_full_ping: None, relay_url: relay_and_state(key.public(), send_addr.clone()), - best_addr: BestAddr::default(), - direct_addr_state: BTreeMap::default(), + udp_paths: NodeUdpPaths::new(), sent_pings: HashMap::new(), last_used: Some(now), last_call_me_maybe: None, @@ -1696,7 +1667,6 @@ mod tests { // endpoint w/ no best addr but a relay w/ no latency let c_endpoint = { // let socket_addr = "0.0.0.0:8".parse().unwrap(); - let endpoint_state = BTreeMap::new(); let key = SecretKey::generate(); NodeState { id: 2, @@ -1707,8 +1677,7 @@ mod tests { send_addr.clone(), PathState::new(key.public(), SendAddr::from(send_addr.clone())), )), - best_addr: BestAddr::default(), - direct_addr_state: endpoint_state, + udp_paths: NodeUdpPaths::new(), sent_pings: HashMap::new(), last_used: Some(now), last_call_me_maybe: None, @@ -1741,13 +1710,10 @@ mod tests { node_id: key.public(), last_full_ping: None, relay_url: relay_and_state(key.public(), send_addr.clone()), - best_addr: BestAddr::from_parts( - socket_addr, - Duration::from_millis(80), - now, - expired, + udp_paths: NodeUdpPaths::from_parts( + endpoint_state, + BestAddr::from_parts(socket_addr, Duration::from_millis(80), now, expired), ), - direct_addr_state: endpoint_state, sent_pings: HashMap::new(), last_used: Some(now), last_call_me_maybe: None, diff --git a/iroh-net/src/magicsock/node_map/udp_paths.rs b/iroh-net/src/magicsock/node_map/udp_paths.rs new file mode 100644 index 0000000000..1154bc19c1 --- /dev/null +++ b/iroh-net/src/magicsock/node_map/udp_paths.rs @@ -0,0 +1,179 @@ +//! Path state for UDP addresses of a single peer node. +//! +//! This started as simply moving the [`NodeState`]'s `direct_addresses` and `best_addr` +//! into one place together. The aim is for external places to not directly interact with +//! the inside and instead only notifies this struct of state changes to each path. +//! +//! [`NodeState`]: super::node_state::NodeState +use std::collections::BTreeMap; +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +use rand::seq::IteratorRandom; +use tracing::warn; + +use crate::disco::SendAddr; + +use super::best_addr::{self, BestAddr}; +use super::node_state::{PathState, PongReply}; +use super::IpPort; + +/// The address on which to send datagrams over UDP. +/// +/// The [`MagicSock`] sends packets to zero or one UDP address, depending on the known paths +/// to the remote node. This conveys the UDP address to send on from the [`NodeUdpPaths`] +/// to the [`NodeState`]. +/// +/// [`NodeUdpPaths`] contains all the UDP path states, while [`NodeState`] has to decide the +/// bigger picture including the relay server. +/// +/// See [`NodeUdpPaths::send_addr`]. +/// +/// [`MagicSock`]: crate::magicsock::MagicSock +/// [`NodeState`]: super::node_state::NodeState +#[derive(Debug)] +pub(super) enum UdpSendAddr { + /// The UDP address can be relied on to deliver data to the remote node. + /// + /// This means this path is usable with a reasonable latency and can be fully trusted to + /// transport payload data to the remote node. + Valid(SocketAddr), + /// The UDP address is highly likely to work, but has not been used for a while. + /// + /// The path should be usable but has not carried DISCO or payload data for a little too + /// long. It is best to also use a backup, i.e. relay, path if possible. + Outdated(SocketAddr), + /// The UDP address is not known to work, but it might. + /// + /// We know this UDP address belongs to the remote node, but we do not know if the path + /// already works or may need holepunching before it will start to work. It might even + /// never work. It is still useful to send to this together with backup path, + /// i.e. relay, in case the path works: if the path does not need holepunching it might + /// be much faster. And if there is no relay path at all it might be the only way to + /// establish a connection. + Unconfirmed(SocketAddr), + /// No known UDP path exists to the remote node. + None, +} + +/// The UDP paths for a single node. +/// +/// Paths are identified by the [`IpPort`] of their UDP address. +/// +/// Initially this collects two structs directly from the [`NodeState`] into one place, +/// leaving the APIs and astractions the same. The goal is that this slowly migrates +/// directly interacting with this data into only receiving [`PathState`] updates. This +/// will consolidate the logic of direct path selection and make this simpler to reason +/// about. However doing that all at once is too large a refactor. +/// +/// [`NodeState`]: super::node_state::NodeState +#[derive(Debug, Default)] +pub(super) struct NodeUdpPaths { + /// The state for each of this node's direct paths. + pub(super) paths: BTreeMap, + /// Best UDP path currently selected. + pub(super) best_addr: BestAddr, + /// If we had to choose a path because we had no `best_addr` it is stored here. + chosen_candidate: Option, +} + +impl NodeUdpPaths { + pub(super) fn new() -> Self { + Default::default() + } + + #[cfg(test)] + pub(super) fn from_parts(paths: BTreeMap, best_addr: BestAddr) -> Self { + Self { + paths, + best_addr, + chosen_candidate: None, + } + } + + /// Returns the current UDP address to send on. + /// + /// TODO: The goal here is for this to simply return the already known send address, so + /// it should be `&self` and not `&mut self`. This is only possible once the state from + /// [`NodeUdpPaths`] is no longer modified from outside. + pub(super) fn send_addr(&mut self, now: Instant, have_ipv6: bool) -> UdpSendAddr { + self.assign_best_addr_from_candidates_if_empty(); + match self.best_addr.state(now) { + best_addr::State::Valid(addr) => UdpSendAddr::Valid(addr.addr), + best_addr::State::Outdated(addr) => UdpSendAddr::Outdated(addr.addr), + best_addr::State::Empty => { + // No direct connection has been used before. If we know of any possible + // candidate addresses, randomly try to use one. This path is most + // effective when folks use a NodeAddr with exactly one direct address which + // they know to work, effectively like using a traditional socket or QUIC + // endpoint. + let addr = self + .chosen_candidate + .and_then(|ipp| self.paths.get(&ipp)) + .and_then(|path| path.udp_addr()) + .filter(|addr| addr.is_ipv4() || have_ipv6) + .or_else(|| { + // Look for a new candidate in all the known paths. This may look + // like a RNG use on the hot-path but this is normally invoked at + // most most once at startup. + let addr = self + .paths + .values() + .filter_map(|path| path.udp_addr()) + .filter(|addr| addr.is_ipv4() || have_ipv6) + .choose(&mut rand::thread_rng()); + self.chosen_candidate = addr.map(IpPort::from); + addr + }); + match addr { + Some(addr) => UdpSendAddr::Unconfirmed(addr), + None => UdpSendAddr::None, + } + } + } + } + + /// Fixup best_addr from candidates. + /// + /// If somehow we end up in a state where we failed to set a best_addr, while we do have + /// valid candidates, this will chose a candidate and set best_addr again. Most likely + /// this is a bug elsewhere though. + fn assign_best_addr_from_candidates_if_empty(&mut self) { + if !self.best_addr.is_empty() { + return; + } + + // The highest acceptable latency for an endpoint path. If the latency is higher + // then this the path will be ignored. + const MAX_LATENCY: Duration = Duration::from_secs(60 * 60); + let best_pong = self.paths.iter().fold(None, |best_pong, (ipp, state)| { + let best_latency = best_pong + .map(|p: &PongReply| p.latency) + .unwrap_or(MAX_LATENCY); + match state.recent_pong() { + // This pong is better if it has a lower latency, or if it has the same + // latency but on an IPv6 path. + Some(pong) + if pong.latency < best_latency + || (pong.latency == best_latency && ipp.ip().is_ipv6()) => + { + Some(pong) + } + _ => best_pong, + } + }); + + // If we found a candidate, set to best addr + if let Some(pong) = best_pong { + if let SendAddr::Udp(addr) = pong.from { + warn!(%addr, "No best_addr was set, choose candidate with lowest latency"); + self.best_addr.insert_if_better_or_reconfirm( + addr, + pong.latency, + best_addr::Source::BestCandidate, + pong.pong_at, + ) + } + } + } +}