Skip to content

Commit

Permalink
some cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Apr 20, 2023
1 parent 9d381d2 commit 9cf5866
Showing 1 changed file with 43 additions and 29 deletions.
72 changes: 43 additions & 29 deletions src/hp/magicsock/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,14 @@ impl Inner {
derp_node_src: Option<key::node::PublicKey>,
) -> bool {
let (s, r) = sync::oneshot::channel();
self.derp_sender.send(DerpMessage::HandleDiscoMessage {
msg,
src,
derp_node_src,
s,
});
self.derp_sender
.send(DerpMessage::HandleDiscoMessage {
msg,
src,
derp_node_src,
s,
})
.unwrap();
debug!("waiting for disco message handling from {} start", src);
let res = tokio::task::block_in_place(|| r.blocking_recv()).expect("dropped sender");
debug!("waiting for disco message handling from {} done", src);
Expand Down Expand Up @@ -453,7 +455,8 @@ impl Inner {
derp_addr,
endpoint,
})
.await;
.await
.unwrap();
}

pub(super) fn populate_cli_ping_response(
Expand Down Expand Up @@ -678,7 +681,10 @@ impl Conn {

/// Triggers an address discovery. The provided why string is for debug logging only.
pub async fn re_stun(&self, why: &'static str) {
self.derp_sender.send_async(DerpMessage::ReStun(why)).await;
self.derp_sender
.send_async(DerpMessage::ReStun(why))
.await
.unwrap();
}

pub async fn get_mapping_addr(&self, node_key: &key::node::PublicKey) -> Option<SocketAddr> {
Expand Down Expand Up @@ -733,8 +739,9 @@ impl Conn {
let (s, r) = sync::oneshot::channel();
self.derp_sender
.send_async(DerpMessage::SetPreferredPort(port, s))
.await;
r.await;
.await
.unwrap();
r.await.unwrap();
}

/// Controls which (if any) DERP servers are used. A `None` value means to disable DERP; it's disabled by default.
Expand All @@ -751,7 +758,8 @@ impl Conn {
if derp_map.is_none() {
self.derp_sender
.send_async(DerpMessage::CloseAll("derp-disabled"))
.await;
.await
.unwrap();
return;
}

Expand All @@ -773,18 +781,16 @@ impl Conn {
rid.try_into().expect("region too large"),
"derp-region-redefined",
))
.await;
.await
.unwrap();
}
if changes {
// TODO:
// self.log_active_derp();
}
}

let this = self.clone();
tokio::task::spawn(async move {
this.re_stun("derp-map-update").await;
});
self.re_stun("derp-map-update").await;
}

/// Called when the control client gets a new network map from the control server.
Expand Down Expand Up @@ -845,8 +851,11 @@ impl Conn {
pub async fn rebind_all(&self) {
// TODO: check all calls for responses.
let (s, r) = sync::oneshot::channel();
self.derp_sender.send_async(DerpMessage::RebindAll(s)).await;
r.await;
self.derp_sender
.send_async(DerpMessage::RebindAll(s))
.await
.unwrap();
r.await.unwrap();
}
}

Expand Down Expand Up @@ -1400,25 +1409,25 @@ impl DerpHandler {
}
DerpMessage::RebindAll(s) => {
self.rebind_all().await;
s.send(());
s.send(()).unwrap();
}
DerpMessage::SetPreferredPort(port, s) => {
self.set_preferred_port(port).await;
s.send(());
s.send(()).unwrap();
}
DerpMessage::SendDiscoMessage {
dst, dst_key, dst_disco, msg, s
} => {
let res = self.send_disco_message(dst, dst_key, dst_disco, msg).await;
s.send(res);
s.send(res).unwrap();
}
DerpMessage::SetNetworkMap(nm, s) => {
self.set_network_map(nm).await;
s.send(());
s.send(()).unwrap();
}
DerpMessage::HandleDiscoMessage { msg, src, derp_node_src, s} => {
let res = self.handle_disco_message(&msg, src, derp_node_src.as_ref()).await;
s.send(res);
s.send(res).unwrap();
}
}
}
Expand Down Expand Up @@ -1641,7 +1650,10 @@ impl DerpHandler {
}

let rs = ReaderState::new(region_id, cancel, dc.clone());
self.msg_sender.send_async(DerpMessage::Connected(rs)).await;
self.msg_sender
.send_async(DerpMessage::Connected(rs))
.await
.unwrap();
dc
}

Expand Down Expand Up @@ -1731,7 +1743,8 @@ impl DerpHandler {
if let Err(_err) = dc.ping().await {
msg_sender
.send_async(DerpMessage::CloseOrReconnect(region_id, "rebind-ping-fail"))
.await;
.await
.unwrap();
return;
}
debug!("post-rebind ping of DERP region {} okay", region_id);
Expand Down Expand Up @@ -1987,7 +2000,6 @@ impl DerpHandler {
if let Some(ref on_net_info) = self.conn.on_net_info {
debug!("net_info update: {:?}", ni);
on_net_info(ni);
// tokio::task::spawn(async move { cb(ni) });
}
}

Expand Down Expand Up @@ -2163,14 +2175,16 @@ impl DerpHandler {
derp_addr,
endpoint,
})
.await;
.await
.unwrap();
})
}),
);

self.msg_sender
.send_async(DerpMessage::ReStun("refresh-for-peering"))
.await;
.await
.unwrap();
} else {
let eps: Vec<_> = self.last_endpoints.iter().map(|ep| ep.addr).collect();
let msg = disco::Message::CallMeMaybe(disco::CallMeMaybe { my_number: eps });
Expand Down Expand Up @@ -3418,7 +3432,7 @@ mod tests {

for (i, m) in ms.iter().enumerate() {
let nm = build_netmap(&eps[..], ms, i).await;
m.conn.set_network_map(nm).await;
m.conn.set_network_map(nm).await.unwrap();
}
}

Expand Down

0 comments on commit 9cf5866

Please sign in to comment.