From 2e9b5496191f1021d608398be62ad92b79319ee5 Mon Sep 17 00:00:00 2001 From: Honsun Zhu Date: Fri, 17 Nov 2023 18:06:58 +0800 Subject: [PATCH] fix: make sure that the listening loop must exit when the client closes --- Cargo.lock | 5 ++-- turn/Cargo.toml | 1 + turn/src/client/mod.rs | 60 +++++++++++++++++++++++++++++------------- 3 files changed, 46 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eb4dc1d92..0efe328bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2526,9 +2526,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.9" +version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" +checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" dependencies = [ "bytes", "futures-core", @@ -2590,6 +2590,7 @@ dependencies = [ "thiserror", "tokio", "tokio-test", + "tokio-util", "webrtc-util", ] diff --git a/turn/Cargo.toml b/turn/Cargo.toml index 569833637..9c1a8a227 100644 --- a/turn/Cargo.toml +++ b/turn/Cargo.toml @@ -14,6 +14,7 @@ util = { version = "0.8", path = "../util", package = "webrtc-util", default-fea stun = { version = "0.5", path = "../stun" } tokio = { version = "1.32.0", features = ["full"] } +tokio-util = "0.7" futures = "0.3" async-trait = "0.1" log = "0.4" diff --git a/turn/src/client/mod.rs b/turn/src/client/mod.rs index 45f2ca172..9ae590d8f 100644 --- a/turn/src/client/mod.rs +++ b/turn/src/client/mod.rs @@ -24,7 +24,10 @@ use stun::integrity::*; use stun::message::*; use stun::textattrs::*; use stun::xoraddr::*; +use tokio::pin; +use tokio::select; use tokio::sync::{mpsc, Mutex}; +use tokio_util::sync::CancellationToken; use transaction::*; use util::conn::*; use util::vnet::net::*; @@ -78,6 +81,7 @@ struct ClientInternal { binding_mgr: Arc>, rto_in_ms: u16, read_ch_tx: Arc>>>, + close_notify: CancellationToken, } #[async_trait] @@ -210,6 +214,7 @@ impl ClientInternal { }, integrity: MessageIntegrity::new_short_term_integrity(String::new()), read_ch_tx: Arc::new(Mutex::new(None)), + close_notify: CancellationToken::new(), }) } @@ -227,33 +232,51 @@ impl ClientInternal { let tr_map = Arc::clone(&self.tr_map); let read_ch_tx = Arc::clone(&self.read_ch_tx); let binding_mgr = Arc::clone(&self.binding_mgr); + let close_notify = self.close_notify.clone(); tokio::spawn(async move { let mut buf = vec![0u8; MAX_DATA_BUFFER_SIZE]; + let wait_cancel = close_notify.cancelled(); + pin!(wait_cancel); + loop { - //TODO: gracefully exit loop - let (n, from) = match conn.recv_from(&mut buf).await { - Ok((n, from)) => (n, from), - Err(err) => { - log::debug!("exiting read loop: {}", err); + let (n, from) = select! { + biased; + + _ = &mut wait_cancel => { + log::debug!("exiting read loop"); break; + }, + result = conn.recv_from(&mut buf) => match result { + Ok((n, from)) => (n, from), + Err(err) => { + log::debug!("exiting read loop: {}", err); + break; + } } }; - log::debug!("received {} bytes of udp from {}", n, from); - if let Err(err) = ClientInternal::handle_inbound( - &read_ch_tx, - &buf[..n], - from, - &stun_serv_str, - &tr_map, - &binding_mgr, - ) - .await - { - log::debug!("exiting read loop: {}", err); - break; + select! { + biased; + + _ = &mut wait_cancel => { + log::debug!("exiting read loop"); + break; + }, + result = ClientInternal::handle_inbound( + &read_ch_tx, + &buf[..n], + from, + &stun_serv_str, + &tr_map, + &binding_mgr, + ) => { + if let Err(err) = result { + log::debug!("exiting read loop: {}", err); + break; + } + } } } }); @@ -430,6 +453,7 @@ impl ClientInternal { /// Closes this client. async fn close(&mut self) { + self.close_notify.cancel(); { let mut read_ch_tx = self.read_ch_tx.lock().await; read_ch_tx.take();