Skip to content

Commit

Permalink
endpoint: fix ping pong timings
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Apr 21, 2023
1 parent 9cf5866 commit e2f2bce
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 63 deletions.
4 changes: 3 additions & 1 deletion src/hp/derp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::debug;
use tracing::{debug, error};

use super::PER_CLIENT_SEND_QUEUE_DEPTH;
use super::{
Expand Down Expand Up @@ -73,6 +73,8 @@ impl<R: AsyncRead + Unpin> Client<R> {
///
/// Errors if the packet is larger than [`MAX_PACKET_SIZE`]
pub async fn send(&self, dstkey: PublicKey, packet: Vec<u8>) -> Result<()> {
error!("[DERP] -> {:?} ({}b)", dstkey, packet.len());

self.inner
.writer_channel
.send(ClientWriterMessage::Packet((dstkey, packet)))
Expand Down
26 changes: 15 additions & 11 deletions src/hp/derp/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::net::TcpStream;
use tokio::sync::oneshot;
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tracing::{debug, warn};
use tracing::{debug, error, warn};

use crate::hp::derp::{
client::ClientBuilder as DerpClientBuilder, DerpNode, MeshKey, PacketForwarder, UseIpv4,
Expand Down Expand Up @@ -288,17 +288,18 @@ impl Client {
}
}

async fn connect_0(&self) -> Result<DerpClient<tokio::net::tcp::OwnedReadHalf>, ClientError> {
let region = {
if let Some(get_region) = &self.inner.get_region {
get_region()
.await
.expect("Cannot connection client: DERP region is unknown")
} else {
return Err(ClientError::DerpRegionNotAvail);
}
};
async fn current_region(&self) -> Result<DerpRegion, ClientError> {
if let Some(get_region) = &self.inner.get_region {
let region = get_region()
.await
.expect("Cannot connection client: DERP region is unknown");
return Ok(region);
}
Err(ClientError::DerpRegionNotAvail)
}

async fn connect_0(&self) -> Result<DerpClient<tokio::net::tcp::OwnedReadHalf>, ClientError> {
let region = self.current_region().await?;
let (tcp_stream, _node) = self.dial_region(region).await?;

let local_addr = tcp_stream
Expand Down Expand Up @@ -564,6 +565,9 @@ impl Client {
let (client, conn_gen) = self.connect().await?;
match client.recv().await {
Ok(msg) => {
let region = self.current_region().await?;
error!("[DERP] <- {} ({:?})", self.target_string(&region), msg);

if let ReceivedMessage::Pong(ping) = msg {
if let Some(chan) = self.unregister_ping(ping).await {
if let Err(_) = chan.send(()) {
Expand Down
2 changes: 2 additions & 0 deletions src/hp/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ pub enum DiscoPingPurpose {
Discovery,
/// The user is running "tailscale ping" from the CLI. These types of pings can go over DERP.
Cli,
/// Ping to ensure the current route is still valid.
Heartbeat,
}

// TODO: metrics
Expand Down
81 changes: 48 additions & 33 deletions src/hp/magicsock/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,13 @@ impl AsyncUdpSocket for Conn {
cx: &mut Context,
transmits: &[quinn_proto::Transmit],
) -> Poll<io::Result<usize>> {
if self.is_closed() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection closed",
)));
}

debug!(
"sending:\n{}",
transmits
Expand Down Expand Up @@ -1005,6 +1012,12 @@ impl AsyncUdpSocket for Conn {
// FIXME: currently ipv4 load results in ipv6 traffic being ignored
debug_assert_eq!(bufs.len(), meta.len(), "non matching bufs & metas");
debug!("trying to receive up to {} packets", bufs.len());
if self.is_closed() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection closed",
)));
}

let mut num_msgs_total = 0;

Expand Down Expand Up @@ -3063,7 +3076,6 @@ impl ReaderState {
mod tests {
use anyhow::Context;
use hyper::server::conn::Http;
use rand::RngCore;
use tokio::{net, sync, task::JoinSet};
use tracing_subscriber::{prelude::*, EnvFilter};

Expand Down Expand Up @@ -3466,7 +3478,7 @@ mod tests {
}

#[tokio::test(flavor = "multi_thread")]
async fn test_two_devices_roundtrip() -> Result<()> {
async fn test_two_devices_roundtrip_quinn() -> Result<()> {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr))
.with(EnvFilter::from_default_env())
Expand Down Expand Up @@ -3515,32 +3527,33 @@ mod tests {

let b_task = tokio::task::spawn(async move {
println!("[{}] accepting conn", b_name);
while let Some(conn) = b.quic_ep.accept().await {
println!("[{}] connecting", b_name);
let conn = conn
.await
.with_context(|| format!("[{}] connecting", b_name))?;
println!("[{}] accepting bi", b_name);
let (mut send_bi, recv_bi) = conn
.accept_bi()
.await
.with_context(|| format!("[{}] accepting bi", b_name))?;
let conn = b.quic_ep.accept().await.expect("no conn");
println!("[{}] connecting", b_name);
let conn = conn
.await
.with_context(|| format!("[{}] connecting", b_name))?;
println!("[{}] accepting bi", b_name);
let (mut send_bi, recv_bi) = conn
.accept_bi()
.await
.with_context(|| format!("[{}] accepting bi", b_name))?;

println!("[{}] reading", b_name);
let val = recv_bi
.read_to_end(usize::MAX)
.await
.with_context(|| format!("[{}] reading to end", b_name))?;
println!("[{}] finishing", b_name);
send_bi
.finish()
.await
.with_context(|| format!("[{}] finishing", b_name))?;
println!("[{}] finished", b_name);
println!("[{}] reading", b_name);
let val = recv_bi
.read_to_end(usize::MAX)
.await
.with_context(|| format!("[{}] reading to end", b_name))?;
println!("[{}] finishing", b_name);
send_bi
.finish()
.await
.with_context(|| format!("[{}] finishing", b_name))?;

return Ok::<_, anyhow::Error>(val);
}
bail!("no connections available anymore");
drop(send_bi);
drop(conn);
println!("[{}] finished", b_name);

Ok::<_, anyhow::Error>(val)
});

println!("[{}] connecting to {}", a_name, b_addr);
Expand Down Expand Up @@ -3588,6 +3601,8 @@ mod tests {
hex::encode($msg),
hex::encode(val)
);

// tokio::time::sleep(Duration::from_secs(1)).await;
};
}

Expand All @@ -3597,13 +3612,13 @@ mod tests {
roundtrip!(m2, m1, b"hello m2");
}

println!("-- larger data");
{
let mut data = vec![0u8; 10 * 1024];
rand::thread_rng().fill_bytes(&mut data);
roundtrip!(m1, m2, data);
roundtrip!(m2, m1, data);
}
// println!("-- larger data");
// {
// let mut data = vec![0u8; 10 * 1024];
// rand::thread_rng().fill_bytes(&mut data);
// roundtrip!(m1, m2, data);
// roundtrip!(m2, m1, data);
// }

println!("cleaning up");
cleanup();
Expand Down
66 changes: 51 additions & 15 deletions src/hp/magicsock/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{

use futures::future::BoxFuture;
use tokio::{sync::Mutex, time::Instant};
use tracing::{debug, info, instrument, warn};
use tracing::{debug, error, info, instrument, warn};

use crate::{
hp::{
Expand Down Expand Up @@ -167,18 +167,20 @@ impl Endpoint {
now: &Instant,
) -> (Option<SocketAddr>, Option<SocketAddr>) {
let udp_addr = state.best_addr.as_ref().map(|a| a.addr);
let derp_addr = if udp_addr.is_none() || !state.is_best_addr_valid(*now) {
let mut derp_addr = None;
if udp_addr.is_none() || !state.is_best_addr_valid(*now) {
error!(
"no good udp addr {:?} {:?} - {:?}",
now, udp_addr, state.trust_best_addr_until
);
// We had a best_addr but it expired so send both to it and DERP.
state.derp_addr
} else {
None
};
derp_addr = state.derp_addr;
}

(udp_addr, derp_addr)
}

/// Reports whether we should ping to all our peers looking for a better path.
// TODO: figure out when to call this, now that the heartbeat timer is gone
#[instrument(skip_all, fields(self.name = %self.name()))]
fn want_full_ping(&self, state: &InnerMutEndpoint, now: &Instant) -> bool {
if state.last_full_ping.is_none() {
Expand All @@ -191,7 +193,7 @@ impl Endpoint {
return false;
}

if state.last_full_ping.as_ref().unwrap().duration_since(*now) >= UPGRADE_INTERVAL {
if *now - *state.last_full_ping.as_ref().unwrap() >= UPGRADE_INTERVAL {
return true;
}
false
Expand Down Expand Up @@ -248,6 +250,9 @@ impl Endpoint {
txid, sp.to, self.public_key, state.disco_key
);
}
if let Some(ep_state) = state.endpoint_state.get_mut(&sp.to) {
ep_state.last_ping = None;
}
}
}

Expand Down Expand Up @@ -319,7 +324,7 @@ impl Endpoint {

let txid = stun::TransactionId::default();
let this = self.clone();
info!("disco: sent ping [{}]", txid);
error!("disco: sent ping [{}]", txid);
state.sent_ping.insert(
txid,
SentPing {
Expand Down Expand Up @@ -360,8 +365,12 @@ impl Endpoint {
.iter()
.filter_map(|(ep, st)| {
if st.last_ping.is_some()
&& st.last_ping.as_ref().unwrap().duration_since(now) < DISCO_PING_INTERVAL
&& now - *st.last_ping.as_ref().unwrap() < DISCO_PING_INTERVAL
{
info!(
"disco: [{:?}] skipping ping, too new {:?} {:?}",
ep, now, st.last_ping
);
return None;
}
Some(ep.clone())
Expand Down Expand Up @@ -574,17 +583,16 @@ impl Endpoint {
}
Some(sp) => {
let known_tx_id = true;
let txid = m.tx_id;
sp.timer.stop().await;
info!("disco: timer aborted for {}", txid);
di.set_node_key(self.public_key.clone());

let now = Instant::now();
let latency = sp.at.duration_since(now);
let latency = now - sp.at;

if !is_derp {
match state.endpoint_state.get_mut(&sp.to) {
None => {
info!("disco: ignoring pong: {}", sp.to);
// This is no longer an endpoint we care about.
return known_tx_id;
}
Expand Down Expand Up @@ -762,7 +770,29 @@ impl Endpoint {
"available addrs: UDP({:?}), DERP({:?})",
udp_addr, derp_addr
);
if udp_addr.is_none() || !state.is_best_addr_valid(now) {

// Send heartbeat ping to keep the current addr going as long as we need it.
if let Some(udp_addr) = udp_addr {
if let Some(ep_state) = state.endpoint_state.get(&udp_addr) {
let needs_ping = ep_state
.last_ping
.map(|l| now - l > Duration::from_secs(2))
.unwrap_or(true);

error!(
"needs ping {}: {:?} {:?}",
needs_ping,
ep_state.last_ping.map(|l| now - l),
now
);
if needs_ping {
self.start_ping(&mut state, udp_addr, now, DiscoPingPurpose::Heartbeat);
}
}
}

// If we do not have an optimal addr, send pings to all known places.
if udp_addr.is_none() || self.want_full_ping(&mut state, &now) {
self.send_pings(&mut state, now, true);
}
drop(state);
Expand All @@ -774,6 +804,12 @@ impl Endpoint {
)));
}

error!(
"sending UDP: {}, DERP: {}",
udp_addr.is_some(),
derp_addr.is_some()
);

let res = if let Some(udp_addr) = udp_addr {
debug!("sending UDP: {}", udp_addr);
self.c.poll_send_raw(udp_state, cx, udp_addr, transmits)
Expand Down Expand Up @@ -813,7 +849,7 @@ impl InnerMutEndpoint {
match self.best_addr {
None => false,
Some(_) => match self.trust_best_addr_until {
Some(expiry) => expiry < instant,
Some(expiry) => instant < expiry,
None => false,
},
}
Expand Down
13 changes: 10 additions & 3 deletions src/hp/magicsock/rebinding_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::{
io::Interest,
sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock},
};
use tracing::{debug, info};
use tracing::{debug, error, info};

use super::conn::{CurrentPortFate, Network};
use crate::hp::magicsock::SOCKET_BUFFER_SIZE;
Expand Down Expand Up @@ -250,14 +250,18 @@ impl AsyncUdpSocket for UdpSocket {
.map(|t| format!("dest: {:?}, bytes: {}", t.destination, t.contents.len()))
.collect::<Vec<_>>()
);

let inner = &mut self.inner;
let io = &self.io;
loop {
ready!(io.poll_send_ready(cx))?;
if let Ok(res) = io.try_io(Interest::WRITABLE, || {
inner.send(io.into(), state, transmits)
}) {
debug!("sent {:?}", res);
for t in transmits.iter().take(res) {
error!("[UDP] -> {} ({}b)", t.destination, t.contents.len());
}

return Poll::Ready(Ok(res));
}
}
Expand All @@ -276,7 +280,10 @@ impl AsyncUdpSocket for UdpSocket {
if let Ok(res) = self.io.try_io(Interest::READABLE, || {
self.inner.recv((&self.io).into(), bufs, meta)
}) {
debug!("received {:?}", res);
for meta in meta.iter().take(res) {
error!("[UDP] <- {} ({}b)", meta.addr, meta.len);
}

return Poll::Ready(Ok(res));
}
}
Expand Down

0 comments on commit e2f2bce

Please sign in to comment.