Skip to content

Commit

Permalink
fix: make sure that the listening loop must exit when the client closes
Browse files Browse the repository at this point in the history
  • Loading branch information
honsunrise committed Nov 17, 2023
1 parent 09963cd commit 2e9b549
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 20 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions turn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ util = { version = "0.8", path = "../util", package = "webrtc-util", default-fea
stun = { version = "0.5", path = "../stun" }

tokio = { version = "1.32.0", features = ["full"] }
tokio-util = "0.7"
futures = "0.3"
async-trait = "0.1"
log = "0.4"
Expand Down
60 changes: 42 additions & 18 deletions turn/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use stun::integrity::*;
use stun::message::*;
use stun::textattrs::*;
use stun::xoraddr::*;
use tokio::pin;
use tokio::select;
use tokio::sync::{mpsc, Mutex};
use tokio_util::sync::CancellationToken;
use transaction::*;
use util::conn::*;
use util::vnet::net::*;
Expand Down Expand Up @@ -78,6 +81,7 @@ struct ClientInternal {
binding_mgr: Arc<Mutex<BindingManager>>,
rto_in_ms: u16,
read_ch_tx: Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
close_notify: CancellationToken,
}

#[async_trait]
Expand Down Expand Up @@ -210,6 +214,7 @@ impl ClientInternal {
},
integrity: MessageIntegrity::new_short_term_integrity(String::new()),
read_ch_tx: Arc::new(Mutex::new(None)),
close_notify: CancellationToken::new(),
})
}

Expand All @@ -227,33 +232,51 @@ impl ClientInternal {
let tr_map = Arc::clone(&self.tr_map);
let read_ch_tx = Arc::clone(&self.read_ch_tx);
let binding_mgr = Arc::clone(&self.binding_mgr);
let close_notify = self.close_notify.clone();

tokio::spawn(async move {
let mut buf = vec![0u8; MAX_DATA_BUFFER_SIZE];
let wait_cancel = close_notify.cancelled();
pin!(wait_cancel);

loop {
//TODO: gracefully exit loop
let (n, from) = match conn.recv_from(&mut buf).await {
Ok((n, from)) => (n, from),
Err(err) => {
log::debug!("exiting read loop: {}", err);
let (n, from) = select! {
biased;

_ = &mut wait_cancel => {
log::debug!("exiting read loop");
break;
},
result = conn.recv_from(&mut buf) => match result {
Ok((n, from)) => (n, from),
Err(err) => {
log::debug!("exiting read loop: {}", err);
break;
}
}
};

log::debug!("received {} bytes of udp from {}", n, from);

if let Err(err) = ClientInternal::handle_inbound(
&read_ch_tx,
&buf[..n],
from,
&stun_serv_str,
&tr_map,
&binding_mgr,
)
.await
{
log::debug!("exiting read loop: {}", err);
break;
select! {
biased;

_ = &mut wait_cancel => {
log::debug!("exiting read loop");
break;
},
result = ClientInternal::handle_inbound(
&read_ch_tx,
&buf[..n],
from,
&stun_serv_str,
&tr_map,
&binding_mgr,
) => {
if let Err(err) = result {
log::debug!("exiting read loop: {}", err);
break;
}
}
}
}
});
Expand Down Expand Up @@ -430,6 +453,7 @@ impl ClientInternal {

/// Closes this client.
async fn close(&mut self) {
self.close_notify.cancel();
{
let mut read_ch_tx = self.read_ch_tx.lock().await;
read_ch_tx.take();
Expand Down

0 comments on commit 2e9b549

Please sign in to comment.