diff --git a/src/hp/derp/clients.rs b/src/hp/derp/clients.rs index 1792bca367..aa26558fd8 100644 --- a/src/hp/derp/clients.rs +++ b/src/hp/derp/clients.rs @@ -131,6 +131,7 @@ impl Clients { } pub fn close_conn(&mut self, key: &PublicKey) { + tracing::info!("closing conn {:?}", key); if let Some(client) = self.inner.remove(key) { client.shutdown(); } @@ -170,6 +171,7 @@ impl Clients { // this builds the client handler & starts the read & write loops to that client connection let client = client.build(); let key = client.key.clone(); + tracing::trace!("registering client: {:?}", key); // TODO: in future, do not remove clients that share a publicKey, instead, // expand the `Client` struct to handle multiple connections & a policy for // how to handle who we write to when mulitple connections exist. @@ -184,6 +186,7 @@ impl Clients { /// to each client that peers has sent data to, to let them know that /// peer is gone from the network. pub fn unregister(&mut self, peer: &PublicKey) { + tracing::trace!("unregistering client: {:?}", peer); if let Some(client) = self.inner.remove(peer) { // go impl `notePeerGoneFromRegion` for key in client.sent_to.iter() { diff --git a/src/hp/derp/server.rs b/src/hp/derp/server.rs index 595b3302c8..7620d455c2 100644 --- a/src/hp/derp/server.rs +++ b/src/hp/derp/server.rs @@ -12,7 +12,7 @@ use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; -use tracing::debug; +use tracing::{debug, instrument}; use crate::hp::key::node::{PublicKey, SecretKey}; @@ -366,6 +366,7 @@ where client_mesh: HashMap>, /// Mesh clients that need to be appraised on the state of the network watchers: HashSet, + name: String, } impl ServerActor @@ -375,15 +376,18 @@ where P: PacketForwarder, { pub(crate) fn new(key: PublicKey, receiver: mpsc::Receiver>) -> Self { + let name = format!("derp-{}", hex::encode(&key.as_ref()[..8])); Self { key, receiver, clients: Clients::new(), client_mesh: HashMap::default(), watchers: HashSet::default(), + name, } } + #[instrument(skip_all, fields(self.name = %self.name))] pub(crate) async fn run(mut self, done: CancellationToken) -> Result<()> { loop { tokio::select! { @@ -406,6 +410,7 @@ where }; match msg { ServerMessage::AddWatcher(key) => { + tracing::trace!("add watcher: {:?} (is_self: {})", key, key == self.key); // connecting to ourselves, ignore if key == self.key { continue; @@ -419,11 +424,13 @@ where self.watchers.insert(key.clone()); }, ServerMessage::ClosePeer(key) => { + tracing::trace!("close peer: {:?}", key); // close the actual underlying connection to the client, but don't remove it from // the list of clients self.clients.close_conn(&key); }, ServerMessage::SendPacket((key, packet)) => { + tracing::trace!("send disco packet from: {:?} to: {:?} ({}b)", packet.src, key, packet.bytes.len()); let src = packet.src.clone(); if self.clients.contains_key(&key) { // if this client is in our local network, just try to send the @@ -436,10 +443,11 @@ where // forwarder fwd.forward_packet(packet.src, key, packet.bytes); } else { - tracing::warn!("no way to reach client {key:?}, dropped packet"); + tracing::warn!("send packet: no way to reach client {key:?}, dropped packet"); } } ServerMessage::SendDiscoPacket((key, packet)) => { + tracing::trace!("send disco packet from: {:?} to: {:?} ({}b)", packet.src, key, packet.bytes.len()); let src = packet.src.clone(); if self.clients.contains_key(&key) { // if this client is in our local network, just try to send the @@ -452,10 +460,11 @@ where // forwarder fwd.forward_packet(packet.src, key, packet.bytes); } else { - tracing::warn!("no way to reach client {key:?}, dropped packet"); + tracing::warn!("send disco packet: no way to reach client {key:?}, dropped packet"); } } ServerMessage::CreateClient(client_builder) => { + tracing::trace!("create client: {:?}", client_builder.key); let key = client_builder.key.clone(); // add client to mesh if !self.client_mesh.contains_key(&key) { @@ -471,6 +480,7 @@ where } ServerMessage::RemoveClient(key) => { + tracing::trace!("remove client: {:?}", key); // remove the client from the map of clients, & notify any peers that it // has sent messages that it has left the network self.clients.unregister(&key); @@ -480,11 +490,13 @@ where self.broadcast_peer_state_change(key, false); } ServerMessage::AddPacketForwarder((key, packet_forwarder)) => { + tracing::trace!("add packet forwarder: {:?}", key); // Only one packet forward allowed at a time right now self.client_mesh.insert(key, Some(packet_forwarder)); }, ServerMessage::RemovePacketForwarder(key) => { + tracing::trace!("remove packet forwarder: {:?}", key); // check if we have a local connection to the client at `key` if self.clients.contains_key(&key) { // remove any current packet forwarder associated with key diff --git a/src/hp/magicsock/conn.rs b/src/hp/magicsock/conn.rs index 71ad537f9b..12ce02a8b4 100644 --- a/src/hp/magicsock/conn.rs +++ b/src/hp/magicsock/conn.rs @@ -175,6 +175,8 @@ pub struct Inner { closing: AtomicBool, /// Close was called. closed: AtomicBool, + /// Do we currently have a derp connection. + derp_connected: AtomicBool, } impl Inner { @@ -194,6 +196,10 @@ impl Inner { fn is_closed(&self) -> bool { self.closed.load(Ordering::SeqCst) } + + fn is_derp_connected(&self) -> bool { + self.derp_connected.load(Ordering::Relaxed) + } } #[derive(Debug)] @@ -256,6 +262,7 @@ impl Conn { public_key: private_key.public_key(), closing: AtomicBool::new(false), closed: AtomicBool::new(false), + derp_connected: AtomicBool::new(false), network_recv_ch: network_recv_ch_receiver, network_recv_wakers: std::sync::Mutex::new(None), network_send_wakers: std::sync::Mutex::new(None), @@ -911,12 +918,15 @@ impl Actor { match result { ReadResult::Break => { // drop client + self.conn.derp_connected.store(false, Ordering::Relaxed); continue; } ReadResult::Continue => { + self.conn.derp_connected.store(true, Ordering::Relaxed); recvs.push(rs.recv()) } ReadResult::Yield(read_result) => { + self.conn.derp_connected.store(true, Ordering::Relaxed); if let Some(passthrough) = self.process_derp_read_result(read_result).await { self.derp_recv_sender.send_async(passthrough).await.expect("missing recv sender"); let mut wakers = self.conn.network_recv_wakers.lock().unwrap(); @@ -3140,14 +3150,16 @@ mod tests { let conn = Conn::new(opts).await?; conn.set_derp_map(Some(derp_map)).await?; - // TODO: alternative check? - // let c = conn.clone(); - // tokio::time::timeout(Duration::from_secs(10), async move { - // while !c.0.state.lock().await.derp_started { - // tokio::time::sleep(Duration::from_millis(100)).await; - // } - // }) - // .await?; + let c = conn.clone(); + tokio::time::timeout(Duration::from_secs(10), async move { + trace!("waiting for derp connection"); + while !c.is_derp_connected() { + tokio::time::sleep(Duration::from_millis(100)).await; + } + trace!("got derp connection"); + }) + .await + .context("wait for derp connection")?; let tls_server_config = tls::make_server_config(&key.clone().into(), vec![tls::P2P_ALPN.to_vec()], false)?;