Skip to content

Commit

Permalink
derp: wait for connection
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed May 8, 2023
1 parent f0bde56 commit 6c2f592
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 11 deletions.
3 changes: 3 additions & 0 deletions src/hp/derp/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ impl Clients {
}

pub fn close_conn(&mut self, key: &PublicKey) {
tracing::info!("closing conn {:?}", key);
if let Some(client) = self.inner.remove(key) {
client.shutdown();
}
Expand Down Expand Up @@ -170,6 +171,7 @@ impl Clients {
// this builds the client handler & starts the read & write loops to that client connection
let client = client.build();
let key = client.key.clone();
tracing::trace!("registering client: {:?}", key);
// TODO: in future, do not remove clients that share a publicKey, instead,
// expand the `Client` struct to handle multiple connections & a policy for
// how to handle who we write to when mulitple connections exist.
Expand All @@ -184,6 +186,7 @@ impl Clients {
/// to each client that peers has sent data to, to let them know that
/// peer is gone from the network.
pub fn unregister(&mut self, peer: &PublicKey) {
tracing::trace!("unregistering client: {:?}", peer);
if let Some(client) = self.inner.remove(peer) {
// go impl `notePeerGoneFromRegion`
for key in client.sent_to.iter() {
Expand Down
18 changes: 15 additions & 3 deletions src/hp/derp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::{debug, instrument};

use crate::hp::key::node::{PublicKey, SecretKey};

Expand Down Expand Up @@ -366,6 +366,7 @@ where
client_mesh: HashMap<PublicKey, Option<P>>,
/// Mesh clients that need to be appraised on the state of the network
watchers: HashSet<PublicKey>,
name: String,
}

impl<R, W, P> ServerActor<R, W, P>
Expand All @@ -375,15 +376,18 @@ where
P: PacketForwarder,
{
pub(crate) fn new(key: PublicKey, receiver: mpsc::Receiver<ServerMessage<R, W, P>>) -> Self {
let name = format!("derp-{}", hex::encode(&key.as_ref()[..8]));
Self {
key,
receiver,
clients: Clients::new(),
client_mesh: HashMap::default(),
watchers: HashSet::default(),
name,
}
}

#[instrument(skip_all, fields(self.name = %self.name))]
pub(crate) async fn run(mut self, done: CancellationToken) -> Result<()> {
loop {
tokio::select! {
Expand All @@ -406,6 +410,7 @@ where
};
match msg {
ServerMessage::AddWatcher(key) => {
tracing::trace!("add watcher: {:?} (is_self: {})", key, key == self.key);
// connecting to ourselves, ignore
if key == self.key {
continue;
Expand All @@ -419,11 +424,13 @@ where
self.watchers.insert(key.clone());
},
ServerMessage::ClosePeer(key) => {
tracing::trace!("close peer: {:?}", key);
// close the actual underlying connection to the client, but don't remove it from
// the list of clients
self.clients.close_conn(&key);
},
ServerMessage::SendPacket((key, packet)) => {
tracing::trace!("send disco packet from: {:?} to: {:?} ({}b)", packet.src, key, packet.bytes.len());
let src = packet.src.clone();
if self.clients.contains_key(&key) {
// if this client is in our local network, just try to send the
Expand All @@ -436,10 +443,11 @@ where
// forwarder
fwd.forward_packet(packet.src, key, packet.bytes);
} else {
tracing::warn!("no way to reach client {key:?}, dropped packet");
tracing::warn!("send packet: no way to reach client {key:?}, dropped packet");
}
}
ServerMessage::SendDiscoPacket((key, packet)) => {
tracing::trace!("send disco packet from: {:?} to: {:?} ({}b)", packet.src, key, packet.bytes.len());
let src = packet.src.clone();
if self.clients.contains_key(&key) {
// if this client is in our local network, just try to send the
Expand All @@ -452,10 +460,11 @@ where
// forwarder
fwd.forward_packet(packet.src, key, packet.bytes);
} else {
tracing::warn!("no way to reach client {key:?}, dropped packet");
tracing::warn!("send disco packet: no way to reach client {key:?}, dropped packet");
}
}
ServerMessage::CreateClient(client_builder) => {
tracing::trace!("create client: {:?}", client_builder.key);
let key = client_builder.key.clone();
// add client to mesh
if !self.client_mesh.contains_key(&key) {
Expand All @@ -471,6 +480,7 @@ where

}
ServerMessage::RemoveClient(key) => {
tracing::trace!("remove client: {:?}", key);
// remove the client from the map of clients, & notify any peers that it
// has sent messages that it has left the network
self.clients.unregister(&key);
Expand All @@ -480,11 +490,13 @@ where
self.broadcast_peer_state_change(key, false);
}
ServerMessage::AddPacketForwarder((key, packet_forwarder)) => {
tracing::trace!("add packet forwarder: {:?}", key);
// Only one packet forward allowed at a time right now
self.client_mesh.insert(key, Some(packet_forwarder));
},

ServerMessage::RemovePacketForwarder(key) => {
tracing::trace!("remove packet forwarder: {:?}", key);
// check if we have a local connection to the client at `key`
if self.clients.contains_key(&key) {
// remove any current packet forwarder associated with key
Expand Down
28 changes: 20 additions & 8 deletions src/hp/magicsock/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ pub struct Inner {
closing: AtomicBool,
/// Close was called.
closed: AtomicBool,
/// Do we currently have a derp connection.
derp_connected: AtomicBool,
}

impl Inner {
Expand All @@ -194,6 +196,10 @@ impl Inner {
fn is_closed(&self) -> bool {
self.closed.load(Ordering::SeqCst)
}

fn is_derp_connected(&self) -> bool {
self.derp_connected.load(Ordering::Relaxed)
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -256,6 +262,7 @@ impl Conn {
public_key: private_key.public_key(),
closing: AtomicBool::new(false),
closed: AtomicBool::new(false),
derp_connected: AtomicBool::new(false),
network_recv_ch: network_recv_ch_receiver,
network_recv_wakers: std::sync::Mutex::new(None),
network_send_wakers: std::sync::Mutex::new(None),
Expand Down Expand Up @@ -911,12 +918,15 @@ impl Actor {
match result {
ReadResult::Break => {
// drop client
self.conn.derp_connected.store(false, Ordering::Relaxed);
continue;
}
ReadResult::Continue => {
self.conn.derp_connected.store(true, Ordering::Relaxed);
recvs.push(rs.recv())
}
ReadResult::Yield(read_result) => {
self.conn.derp_connected.store(true, Ordering::Relaxed);
if let Some(passthrough) = self.process_derp_read_result(read_result).await {
self.derp_recv_sender.send_async(passthrough).await.expect("missing recv sender");
let mut wakers = self.conn.network_recv_wakers.lock().unwrap();
Expand Down Expand Up @@ -3140,14 +3150,16 @@ mod tests {
let conn = Conn::new(opts).await?;
conn.set_derp_map(Some(derp_map)).await?;

// TODO: alternative check?
// let c = conn.clone();
// tokio::time::timeout(Duration::from_secs(10), async move {
// while !c.0.state.lock().await.derp_started {
// tokio::time::sleep(Duration::from_millis(100)).await;
// }
// })
// .await?;
let c = conn.clone();
tokio::time::timeout(Duration::from_secs(10), async move {
trace!("waiting for derp connection");
while !c.is_derp_connected() {
tokio::time::sleep(Duration::from_millis(100)).await;
}
trace!("got derp connection");
})
.await
.context("wait for derp connection")?;

let tls_server_config =
tls::make_server_config(&key.clone().into(), vec![tls::P2P_ALPN.to_vec()], false)?;
Expand Down

0 comments on commit 6c2f592

Please sign in to comment.