Skip to content

Commit

Permalink
Merge branch 'main' into simplify-store-triat
Browse files Browse the repository at this point in the history
# Conflicts:
#	iroh-net/src/netcheck/reportgen.rs
  • Loading branch information
rklaehn committed Feb 19, 2024
2 parents d386636 + fea92ac commit bee6d19
Show file tree
Hide file tree
Showing 13 changed files with 663 additions and 303 deletions.
14 changes: 14 additions & 0 deletions .github/workflows/commit.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: Commits

on:
pull_request:
branches: [main]
types: [opened, edited, synchronize]

jobs:
check-for-cc:
runs-on: ubuntu-latest
steps:
- name: check-for-cc
id: check-for-cc
uses: agenthunt/[email protected]
3 changes: 3 additions & 0 deletions iroh-net/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use crate::derp::DerpUrl;

use super::portmapper;

// TODO: This re-uses "Endpoint" again, a term that already means "a quic endpoint" and "a
// magicsock endpoint". this time it means "an IP address on which our local magicsock
// endpoint is listening". Name this better.
/// An endpoint IPPort and an associated type.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Endpoint {
Expand Down
9 changes: 4 additions & 5 deletions iroh-net/src/derp/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::derp::{
client::ClientReceiver as DerpClientReceiver, metrics::Metrics, server::PacketForwarderHandler,
MeshKey, PacketForwarder, ReceivedMessage,
};
use crate::dns::DNS_RESOLVER;
use crate::dns::lookup_ipv4_ipv6;
use crate::key::{PublicKey, SecretKey};
use crate::util::AbortingJoinHandle;

Expand Down Expand Up @@ -104,7 +104,7 @@ pub enum ClientError {
InvalidUrl(String),
/// There was an error with DNS resolution
#[error("dns: {0:?}")]
Dns(Option<trust_dns_resolver::error::ResolveError>),
Dns(Option<anyhow::Error>),
/// There was a timeout resolving DNS.
#[error("dns timeout")]
DnsTimeout,
Expand Down Expand Up @@ -1019,14 +1019,13 @@ async fn resolve_host(url: &Url, prefer_ipv6: bool) -> Result<IpAddr, ClientErro
match host {
url::Host::Domain(domain) => {
// Need to do a DNS lookup
let addrs = tokio::time::timeout(DNS_TIMEOUT, DNS_RESOLVER.lookup_ip(domain))
let addrs = lookup_ipv4_ipv6(domain, DNS_TIMEOUT)
.await
.map_err(|_| ClientError::DnsTimeout)?
.map_err(|e| ClientError::Dns(Some(e)))?;

if prefer_ipv6 {
if let Some(addr) = addrs.iter().find(|addr| addr.is_ipv6()) {
return Ok(addr);
return Ok(*addr);
}
}
addrs
Expand Down
73 changes: 70 additions & 3 deletions iroh-net/src/dns.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::net::IpAddr;
use std::time::Duration;

use anyhow::Result;
use once_cell::sync::Lazy;
use trust_dns_resolver::{AsyncResolver, TokioAsyncResolver};
use trust_dns_resolver::{AsyncResolver, IntoName, TokioAsyncResolver, TryParseIp};

pub static DNS_RESOLVER: Lazy<TokioAsyncResolver> =
Lazy::new(|| get_resolver().expect("unable to create DNS resolver"));
Expand All @@ -14,23 +17,87 @@ fn get_resolver() -> Result<TokioAsyncResolver> {
let (config, mut options) =
trust_dns_resolver::system_conf::read_system_conf().unwrap_or_default();
// lookup IPv4 and IPv6 in parallel
options.ip_strategy = trust_dns_resolver::config::LookupIpStrategy::Ipv4AndIpv6;
options.ip_strategy = trust_dns_resolver::config::LookupIpStrategy::Ipv4thenIpv6;

let resolver = AsyncResolver::tokio(config, options);
Ok(resolver)
}

/// Resolve IPv4 and IPv6 in parallel.
///
/// `LookupIpStrategy::Ipv4AndIpv6` will wait for ipv6 resolution timeout, even if it is
/// not usable on the stack, so we manually query both lookups concurrently and time them out
/// individually.
pub(crate) async fn lookup_ipv4_ipv6<N: IntoName + TryParseIp + Clone>(
host: N,
timeout: Duration,
) -> Result<Vec<IpAddr>> {
let ipv4 = DNS_RESOLVER.ipv4_lookup(host.clone());
let ipv6 = DNS_RESOLVER.ipv6_lookup(host);
let ipv4 = tokio::time::timeout(timeout, ipv4);
let ipv6 = tokio::time::timeout(timeout, ipv6);

let res = futures::future::join(ipv4, ipv6).await;
match res {
(Ok(Ok(ipv4)), Ok(Ok(ipv6))) => {
let res = ipv4
.into_iter()
.map(|ip| IpAddr::V4(ip.0))
.chain(ipv6.into_iter().map(|ip| IpAddr::V6(ip.0)))
.collect();
Ok(res)
}
(Ok(Ok(ipv4)), Err(_timeout)) => {
let res = ipv4.into_iter().map(|ip| IpAddr::V4(ip.0)).collect();
Ok(res)
}
(Ok(Ok(ipv4)), Ok(Err(_err))) => {
let res = ipv4.into_iter().map(|ip| IpAddr::V4(ip.0)).collect();
Ok(res)
}
(Ok(Err(_err)), Ok(Ok(ipv6))) => {
let res = ipv6.into_iter().map(|ip| IpAddr::V6(ip.0)).collect();
Ok(res)
}
(Ok(Err(err1)), Ok(Err(err2))) => {
anyhow::bail!("Ipv4: {:?}, Ipv6: {:?}", err1, err2);
}
(Ok(Err(err1)), Err(err2)) => {
anyhow::bail!("Ipv4: {:?}, Ipv6: {:?}", err1, err2);
}
(Err(_timeout), Ok(Ok(ipv6))) => {
let res = ipv6.into_iter().map(|ip| IpAddr::V6(ip.0)).collect();
Ok(res)
}
(Err(err1), Ok(Err(err2))) => {
anyhow::bail!("Ipv4: {:?}, Ipv6: {:?}", err1, err2);
}
(Err(timeout1), Err(timeout2)) => {
anyhow::bail!("Ipv4: {:?}, Ipv6: {:?}", timeout1, timeout2);
}
}
}

#[cfg(test)]
mod tests {
use crate::defaults::NA_DERP_HOSTNAME;

use super::*;

#[tokio::test]
async fn test_dns_lookup() {
async fn test_dns_lookup_basic() {
let res = DNS_RESOLVER.lookup_ip(NA_DERP_HOSTNAME).await.unwrap();
let res: Vec<_> = res.iter().collect();
assert!(!res.is_empty());
dbg!(res);
}

#[tokio::test]
async fn test_dns_lookup_ipv4_ipv6() {
let res = lookup_ipv4_ipv6(NA_DERP_HOSTNAME, Duration::from_secs(1))
.await
.unwrap();
assert!(!res.is_empty());
dbg!(res);
}
}
73 changes: 49 additions & 24 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ use tokio::{
time,
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, error_span, info, info_span, instrument, trace, warn, Instrument};
use tracing::{
debug, error, error_span, info, info_span, instrument, trace, trace_span, warn, Instrument,
};
use watchable::Watchable;

use crate::{
Expand Down Expand Up @@ -88,6 +90,9 @@ const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How often to save node data.
const SAVE_NODES_INTERVAL: Duration = Duration::from_secs(30);

/// Maximum duration to wait for a netcheck report.
const NETCHECK_REPORT_TIMEOUT: Duration = Duration::from_secs(10);

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum CurrentPortFate {
Keep,
Expand Down Expand Up @@ -298,7 +303,6 @@ impl Inner {
if transmits.is_empty() {
return Poll::Ready(Ok(n));
}

trace!(
"sending:\n{}",
transmits.iter().fold(
Expand Down Expand Up @@ -339,7 +343,11 @@ impl Inner {

let dest = QuicMappedAddr(dest);

match self.node_map.get_send_addrs_for_quic_mapped_addr(&dest) {
let mut transmits_sent = 0;
match self
.node_map
.get_send_addrs_for_quic_mapped_addr(&dest, self.ipv6_reported.load(Ordering::Relaxed))
{
Some((public_key, udp_addr, derp_url, mut msgs)) => {
let mut pings_sent = false;
// If we have pings to send, we *have* to send them out first.
Expand Down Expand Up @@ -370,6 +378,7 @@ impl Inner {
// are always returning Poll::Ready if poll_send_udp returned
// Poll::Ready.
transmits.truncate(n);
transmits_sent = transmits.len();
udp_sent = true;
// record metrics.
}
Expand All @@ -380,11 +389,10 @@ impl Inner {
}
}

let n = transmits.len();

// send derp
if let Some(ref derp_url) = derp_url {
self.try_send_derp(derp_url, public_key, split_packets(&transmits));
transmits_sent = transmits.len();
derp_sent = true;
}

Expand All @@ -400,14 +408,12 @@ impl Inner {
} else {
trace!(
node = %public_key.fmt_short(),
transmit_count = %transmits.len(),
packet_count = &transmits.iter().map(|t| t.segment_size.map(|ss| t.contents.len() / ss).unwrap_or(1)).sum::<usize>(),
len = &transmits.iter().map(|t| t.contents.len()).sum::<usize>(),
transmit_count = %transmits_sent,
send_udp = ?udp_addr,
send_derp = ?derp_url,
"sent transmits"
);
Poll::Ready(Ok(n))
Poll::Ready(Ok(transmits_sent))
}
}
None => {
Expand Down Expand Up @@ -645,7 +651,9 @@ impl Inner {
inc!(MagicsockMetrics, recv_disco_udp);
}

trace!(message = ?dm, "receive disco message");
let span = trace_span!("handle_disco", ?dm);
let _guard = span.enter();
trace!("receive disco message");
match dm {
disco::Message::Ping(ping) => {
inc!(MagicsockMetrics, recv_disco_ping);
Expand All @@ -658,8 +666,7 @@ impl Inner {
disco::Message::CallMeMaybe(cm) => {
inc!(MagicsockMetrics, recv_disco_call_me_maybe);
if !matches!(src, DiscoMessageSource::Derp { .. }) {
// CallMeMaybe messages should only come via DERP.
debug!("[unexpected] call-me-maybe packets should only come via DERP");
warn!("call-me-maybe packets should only come via DERP");
return;
};
let ping_actions = self.node_map.handle_call_me_maybe(sender, cm);
Expand All @@ -675,15 +682,16 @@ impl Inner {
}
}
}
trace!("disco message handled");
}

/// Handle a ping message.
fn handle_ping(&self, dm: disco::Ping, sender: &PublicKey, src: DiscoMessageSource) {
// Insert the ping into the node map, and return whether a ping with this tx_id was already
// received.
let addr: SendAddr = src.clone().into();
let role = self.node_map.handle_ping(*sender, addr.clone(), dm.tx_id);
match role {
let handled = self.node_map.handle_ping(*sender, addr.clone(), dm.tx_id);
match handled.role {
PingRole::Duplicate => {
debug!(%src, tx = %hex::encode(dm.tx_id), "received ping: endpoint already confirmed, skip");
return;
Expand All @@ -698,7 +706,8 @@ impl Inner {
}

// Send a pong.
debug!(tx = %hex::encode(dm.tx_id), "send pong");
debug!(tx = %hex::encode(dm.tx_id), %addr, dstkey = %sender.fmt_short(),
"sending pong");
let pong = disco::Message::Pong(disco::Pong {
tx_id: dm.tx_id,
src: addr.clone(),
Expand All @@ -707,6 +716,15 @@ impl Inner {
if !self.send_disco_message_queued(addr.clone(), *sender, pong) {
warn!(%addr, "failed to queue pong");
}

if let Some(ping) = handled.needs_ping_back {
debug!(
%addr,
dstkey = %sender.fmt_short(),
"sending direct ping back",
);
self.send_ping_queued(ping);
}
}

fn encode_disco_message(&self, dst_key: PublicKey, msg: &disco::Message) -> Bytes {
Expand All @@ -726,14 +744,13 @@ impl Inner {
tx_id,
node_key: self.public_key(),
});
trace!(%dst, %tx_id, ?purpose, "send ping");
let sent = match dst {
SendAddr::Udp(addr) => self.udp_disco_sender.try_send((addr, dst_key, msg)).is_ok(),
SendAddr::Derp(ref url) => self.send_disco_message_derp(url, dst_key, msg),
};
if sent {
let msg_sender = self.actor_sender.clone();
debug!(%dst, tx = %hex::encode(tx_id), ?purpose, "ping sent (queued)");
trace!(%dst, tx = %hex::encode(tx_id), ?purpose, "ping sent (queued)");
self.node_map
.notify_ping_sent(id, dst, tx_id, purpose, msg_sender);
} else {
Expand Down Expand Up @@ -953,7 +970,10 @@ impl Inner {
let msg = endpoints.to_call_me_maybe_message();
let msg = disco::Message::CallMeMaybe(msg);
if !self.send_disco_message_derp(url, dst_key, msg) {
warn!(node = %dst_key.fmt_short(), "derp channel full, dropping call-me-maybe");
warn!(dstkey = %dst_key.fmt_short(), derpurl = ?url,
"derp channel full, dropping call-me-maybe");
} else {
debug!(dstkey = %dst_key.fmt_short(), derpurl = ?url, "call-me-maybe sent");
}
} else {
self.pending_call_me_maybes
Expand Down Expand Up @@ -1805,7 +1825,6 @@ impl Actor {
Ok(part) => {
if self.handle_derp_disco_message(&part, url, dm.src) {
// Message was internal, do not bubble up.
debug!(node = %dm.src.fmt_short(), "handled disco message from derp");
continue;
}

Expand Down Expand Up @@ -2002,8 +2021,11 @@ impl Actor {
};
discovery.publish(&info);
}
self.inner.send_queued_call_me_maybes();
}

// Regardless of whether our local endpoints changed, we now want to send any queued
// call-me-maybe messages.
self.inner.send_queued_call_me_maybes();
}

/// Called when an endpoints update is done, no matter if it was successful or not.
Expand Down Expand Up @@ -2069,7 +2091,7 @@ impl Actor {
Ok(rx) => {
let msg_sender = self.msg_sender.clone();
tokio::task::spawn(async move {
let report = time::timeout(Duration::from_secs(10), rx).await;
let report = time::timeout(NETCHECK_REPORT_TIMEOUT, rx).await;
let report: anyhow::Result<_> = match report {
Ok(Ok(Ok(report))) => Ok(Some(report)),
Ok(Ok(Err(err))) => Err(err),
Expand All @@ -2080,6 +2102,8 @@ impl Actor {
.send(ActorMessage::NetcheckReport(report, why))
.await
.ok();
// The receiver of the NetcheckReport message will call
// .finalize_endpoints_update().
});
}
Err(err) => {
Expand All @@ -2095,9 +2119,10 @@ impl Actor {
.ipv6_reported
.store(report.ipv6, Ordering::Relaxed);
let r = &report;
debug!(
trace!(
"setting no_v4_send {} -> {}",
self.no_v4_send, !r.ipv4_can_send
self.no_v4_send,
!r.ipv4_can_send
);
self.no_v4_send = !r.ipv4_can_send;

Expand Down Expand Up @@ -2484,7 +2509,7 @@ impl Iterator for PacketSplitIter {
pub(crate) struct QuicMappedAddr(SocketAddr);

/// Counter to always generate unique addresses for [`QuicMappedAddr`].
static ADDR_COUNTER: AtomicU64 = AtomicU64::new(0);
static ADDR_COUNTER: AtomicU64 = AtomicU64::new(1);

impl QuicMappedAddr {
/// The Prefix/L of our Unique Local Addresses.
Expand Down
Loading

0 comments on commit bee6d19

Please sign in to comment.