Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: try to fix halt channel closed after starting UDP tracker #594

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cSpell.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"Containerfile",
"curr",
"Cyberneering",
"datagram",
"datetime",
"Dijke",
"distroless",
Expand Down Expand Up @@ -79,6 +80,7 @@
"nonroot",
"Norberg",
"numwant",
"nvCFlJCq7fz7Qx6KoKTDiMZvns8l5Kw7",
"oneshot",
"ostr",
"Pando",
Expand Down Expand Up @@ -129,8 +131,7 @@
"Xtorrent",
"Xunlei",
"xxxxxxxxxxxxxxxxxxxxd",
"yyyyyyyyyyyyyyyyyyyyd",
"nvCFlJCq7fz7Qx6KoKTDiMZvns8l5Kw7"
"yyyyyyyyyyyyyyyyyyyyd"
],
"enableFiletypes": [
"dockerfile",
Expand Down
8 changes: 8 additions & 0 deletions src/bootstrap/jobs/udp_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
//! for the configuration options.
use std::sync::Arc;

use log::debug;
use tokio::task::JoinHandle;
use torrust_tracker_configuration::UdpTracker;

Expand Down Expand Up @@ -36,6 +37,13 @@
.expect("it should be able to start the udp tracker");

tokio::spawn(async move {
debug!(target: "UDP Tracker", "Wait for launcher (UDP service) to finish ...");

Check warning on line 40 in src/bootstrap/jobs/udp_tracker.rs

View check run for this annotation

Codecov / codecov/patch

src/bootstrap/jobs/udp_tracker.rs#L40

Added line #L40 was not covered by tests

assert!(
!server.state.halt_task.is_closed(),

Check warning on line 43 in src/bootstrap/jobs/udp_tracker.rs

View check run for this annotation

Codecov / codecov/patch

src/bootstrap/jobs/udp_tracker.rs#L42-L43

Added lines #L42 - L43 were not covered by tests
"Halt channel for UDP tracker should be open"
);

server
.state
.task
Expand Down
2 changes: 1 addition & 1 deletion src/servers/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Launcher {
tokio::task::spawn(graceful_shutdown(
handle.clone(),
rx_halt,
format!("Shutting down http server on socket address: {address}"),
format!("Shutting down HTTP server on socket address: {address}"),
));

let tls = self.tls.clone();
Expand Down
72 changes: 50 additions & 22 deletions src/servers/udp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,19 @@
let (tx_start, rx_start) = tokio::sync::oneshot::channel::<Started>();
let (tx_halt, rx_halt) = tokio::sync::oneshot::channel::<Halted>();

assert!(!tx_halt.is_closed(), "Halt channel for UDP tracker should be open");

let launcher = self.state.launcher;

let task = tokio::spawn(async move {
launcher.start(tracker, tx_start, rx_halt).await;
debug!(target: "UDP Tracker", "Launcher starting ...");

let server = launcher.start(tracker, tx_start, rx_halt);

server.await;

debug!(target: "UDP Tracker", "Launcher finished ...");

launcher
});

Expand All @@ -135,8 +144,6 @@
},
};

info!("Running UDP Tracker on Socket: {}", running_udp_server.state.binding);

Ok(running_udp_server)
}
}
Expand Down Expand Up @@ -202,38 +209,59 @@
tx_start: Sender<Started>,
rx_halt: Receiver<Halted>,
) -> JoinHandle<()> {
let binding = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to {self.socket}."));
let address = binding.local_addr().expect("Could not get local_addr from {binding}.");
let socket = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to {self.socket}."));
let address = socket.local_addr().expect("Could not get local_addr from {binding}.");

let running = tokio::task::spawn(async move {
let halt = async move {
shutdown_signal_with_message(rx_halt, format!("Halting Http Service Bound to Socket: {address}")).await;
};
let halt = tokio::task::spawn(async move {
debug!(target: "UDP Tracker", "Server on socket address: udp://{address} waiting for halt signal ...");

pin_mut!(halt);
shutdown_signal_with_message(
rx_halt,
format!("Shutting down UDP server on socket address: udp://{address}"),
)
.await;
});

info!(target: "UDP Tracker", "Starting on: udp://{}", address);

loop {
let mut data = [0; MAX_PACKET_SIZE];
let binding = binding.clone();
let running = tokio::task::spawn(async move {
let listen = async move {
debug!(target: "UDP Tracker", "Waiting for packets ...");

tokio::select! {
() = & mut halt => {},
loop {
let mut data = [0; MAX_PACKET_SIZE];
let socket_clone = socket.clone();

Ok((valid_bytes, remote_addr)) = binding.recv_from(&mut data) => {
let payload = data[..valid_bytes].to_vec();
match socket_clone.recv_from(&mut data).await {
Ok((valid_bytes, remote_addr)) => {
let payload = data[..valid_bytes].to_vec();

Check warning on line 237 in src/servers/udp/server.rs

View check run for this annotation

Codecov / codecov/patch

src/servers/udp/server.rs#L236-L237

Added lines #L236 - L237 were not covered by tests

debug!("Received {} bytes", payload.len());
debug!("From: {}", &remote_addr);
debug!("Payload: {:?}", payload);
debug!(target: "UDP Tracker", "Received {} bytes", payload.len());
debug!(target: "UDP Tracker", "From: {}", &remote_addr);
debug!(target: "UDP Tracker", "Payload: {:?}", payload);

Check warning on line 241 in src/servers/udp/server.rs

View check run for this annotation

Codecov / codecov/patch

src/servers/udp/server.rs#L239-L241

Added lines #L239 - L241 were not covered by tests

let response = handle_packet(remote_addr, payload, &tracker).await;
let response = handle_packet(remote_addr, payload, &tracker).await;

Check warning on line 243 in src/servers/udp/server.rs

View check run for this annotation

Codecov / codecov/patch

src/servers/udp/server.rs#L243

Added line #L243 was not covered by tests

Udp::send_response(binding, remote_addr, response).await;
Udp::send_response(socket_clone, remote_addr, response).await;
}
Err(err) => {
error!("Error reading UDP datagram from socket. Error: {:?}", err);
}

Check warning on line 249 in src/servers/udp/server.rs

View check run for this annotation

Codecov / codecov/patch

src/servers/udp/server.rs#L245-L249

Added lines #L245 - L249 were not covered by tests
}
}
};

pin_mut!(halt);
pin_mut!(listen);

tokio::select! {
_ = & mut halt => { debug!(target: "UDP Tracker", "Halt signal spawned task stopped on address: udp://{address}"); },
() = & mut listen => { debug!(target: "UDP Tracker", "Socket listener stopped on address: udp://{address}"); },
}
});

info!(target: "UDP Tracker", "Started on: udp://{}", address);

tx_start
.send(Started { address })
.expect("the UDP Tracker service should not be dropped");
Expand Down