Skip to content

Commit

Permalink
feat(conn): improve shutdown of IO loop
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed May 17, 2023
1 parent 4437d7e commit dbe0228
Showing 1 changed file with 106 additions and 72 deletions.
178 changes: 106 additions & 72 deletions src/hp/magicsock/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,86 +845,28 @@ impl Actor {
let mut endpoints_update_receiver = self.endpoints_update_state.running.subscribe();
let mut recvs = futures::stream::FuturesUnordered::new();

let mut ip_stream = IpStream::new(
let ip_stream = IpStream::new(
&self.udp_state,
self.conn.clone(),
self.pconn4.clone(),
self.pconn6.clone(),
);

let (ip_sender, mut ip_receiver) = mpsc::channel(128);
let stun_packet_channel = self.net_checker.get_msg_sender();

// Process incoming packets in an independent task of the other work.

let io_cancel_token = CancellationToken::new();
let cloned_token = io_cancel_token.clone();
let conn = self.conn.clone();
// TODO: add shutdown for this and the main run loop
tokio::task::spawn(async move {
while let Some(ip_msgs) = ip_stream.next().await {
trace!("tick: ip_msgs");
match ip_msgs {
Ok((packet, network, meta)) => {
// Classify packets

// Stun?
if stun::is(&packet) {
let enable_stun_packets =
conn.enable_stun_packets.load(Ordering::Relaxed);
debug!("on_stun_receive, processing {}", enable_stun_packets);
if enable_stun_packets {
let msg = netcheck::ActorMessage::StunPacket(packet, meta.addr);
stun_packet_channel.try_send(msg).ok();
}
continue;
}
// Disco?
if let Some((source, sealed_box)) = disco::source_and_box(&packet) {
if ip_sender
.send(IpPacket::Disco {
source,
sealed_box: packet.slice_ref(sealed_box),
src: meta.addr,
})
.await
.is_err()
{
warn!("ip_sender gone");
break;
};
continue;
}

// Foward
let forward = match network {
Network::Ipv4 => NetworkReadResult::Ok {
source: NetworkSource::Ipv4,
bytes: packet,
meta,
},
Network::Ipv6 => NetworkReadResult::Ok {
source: NetworkSource::Ipv6,
bytes: packet,
meta,
},
};

if ip_sender.send(IpPacket::Forward(forward)).await.is_err() {
warn!("ip_sender gone");
break;
}
}
Err(err) => {
if ip_sender
.send(IpPacket::Forward(NetworkReadResult::Error(err)))
.await
.is_err()
{
warn!("ip_sender gone");
break;
}
}
}
}
let io_handle = tokio::task::spawn(async move {
run_io_loop(
ip_stream,
cloned_token,
conn,
stun_packet_channel,
ip_sender,
)
.await;
});

loop {
Expand Down Expand Up @@ -981,8 +923,7 @@ impl Actor {
conn.close().await.ok();
}
self.pconn4.close().await.ok();

return Ok(());
break;
}
ActorMessage::CloseOrReconnect(rid, reason) => {
self.close_or_reconnect_derp(rid, reason).await;
Expand Down Expand Up @@ -1100,6 +1041,11 @@ impl Actor {
}
}
}

// Shutdown IO loop
io_cancel_token.cancel();
io_handle.await?;
Ok(())
}

/// This modifies the [`quinn_udp::RecvMeta`] for the packet to set the addresses
Expand Down Expand Up @@ -2739,6 +2685,94 @@ impl Actor {
}
}

/// Executes the IO over the UDP sockets
async fn run_io_loop(
mut ip_stream: IpStream,
cancel_token: CancellationToken,
conn: Arc<Inner>,
stun_packet_channel: mpsc::Sender<netcheck::ActorMessage>,
ip_sender: mpsc::Sender<IpPacket>,
) {
loop {
tokio::select! {
biased;
_ = cancel_token.cancelled() => {
break;
}
msg = ip_stream.next() => {
match msg {
None => break,
Some(ip_msgs) => {
trace!("tick: ip_msgs");
match ip_msgs {
Ok((packet, network, meta)) => {
// Classify packets

// Stun?
if stun::is(&packet) {
let enable_stun_packets =
conn.enable_stun_packets.load(Ordering::Relaxed);
debug!("on_stun_receive, processing {}", enable_stun_packets);
if enable_stun_packets {
let msg = netcheck::ActorMessage::StunPacket(packet, meta.addr);
stun_packet_channel.try_send(msg).ok();
}
continue;
}
// Disco?
if let Some((source, sealed_box)) = disco::source_and_box(&packet) {
if ip_sender
.send(IpPacket::Disco {
source,
sealed_box: packet.slice_ref(sealed_box),
src: meta.addr,
})
.await
.is_err()
{
warn!("ip_sender gone");
break;
};
continue;
}

// Foward
let forward = match network {
Network::Ipv4 => NetworkReadResult::Ok {
source: NetworkSource::Ipv4,
bytes: packet,
meta,
},
Network::Ipv6 => NetworkReadResult::Ok {
source: NetworkSource::Ipv6,
bytes: packet,
meta,
},
};

if ip_sender.send(IpPacket::Forward(forward)).await.is_err() {
warn!("ip_sender gone");
break;
}
}
Err(err) => {
if ip_sender
.send(IpPacket::Forward(NetworkReadResult::Error(err)))
.await
.is_err()
{
warn!("ip_sender gone");
break;
}
}
}
}
}
}
}
}
}

struct IpStream {
conn: Arc<Inner>,
pconn4: RebindingUdpConn,
Expand Down

0 comments on commit dbe0228

Please sign in to comment.