Skip to content

Commit

Permalink
Merge #595: Fix panicking after starting UDP server due to closed "ha…
Browse files Browse the repository at this point in the history
…lt" channel

5fd0c84 chore: normalize log ouput (Jose Celano)
0c1f389 fix: [#591] panicking after starting UDP server due to close halt channel (Jose Celano)

Pull request description:

  Relates to: #594

  ### Context

  We use a oneshot channel between the main app that launches the UDP server and the running UDP server. It could be used to send the "halt" message to the UDP server. When the app starts the server this server waits for that signal.

  That code is not working because when the server starts waiting the channel is closed. That does not affect the server that is still running. And currently, we are not using that signal in production (only for test environments).

  ### Details

  The "halt" channel is immediately closed after starting the UDP tracker. So the app panics with:

  ```console
  thread 'tokio-runtime-worker' panicked at src/servers/signals.rs:52:25:
  Failed to install stop signal: channel closed
  ```

  The UDP is started but it's not possible to shut it down gracefully.

  I've added a lot of debug lines in this PR:

  ```rust
  $ cargo run
     Compiling torrust-tracker v3.0.0-alpha.12-develop (/home/josecelano/Documents/git/committer/me/github/torrust/torrust-tracker)
      Finished dev [optimized + debuginfo] target(s) in 7.05s
       Running `target/debug/torrust-tracker`
  Loading default configuration file: `./share/default/config/tracker.development.sqlite3.toml` ...
  2024-01-10T17:46:40.348930411+00:00 [torrust_tracker::bootstrap::logging][INFO] logging initialized.
  2024-01-10T17:46:40.349651636+00:00 [UDP Tracker][DEBUG] Launcher starting ...
  2024-01-10T17:46:40.349677556+00:00 [UDP Tracker][INFO] Starting on: udp://0.0.0.0:6969
  2024-01-10T17:46:40.349682046+00:00 [UDP Tracker][INFO] Started on: udp://0.0.0.0:6969
  2024-01-10T17:46:40.349684556+00:00 [UDP Tracker][DEBUG] Launcher finished ...
  2024-01-10T17:46:40.349687856+00:00 [UDP Tracker][DEBUG] Waiting for packets ...
  2024-01-10T17:46:40.349687736+00:00 [torrust_tracker::bootstrap::jobs::http_tracker][INFO] Note: Not loading Http Tracker Service, Not Enabled in Configuration.
  2024-01-10T17:46:40.349691306+00:00 [UDP Tracker][DEBUG] Wait for launcher (UDP service) to finish ...
  2024-01-10T17:46:40.349689536+00:00 [UDP Tracker][DEBUG] Server on socket address: udp://0.0.0.0:6969 waiting for halt signal ...
  2024-01-10T17:46:40.349700506+00:00 [torrust_tracker::bootstrap::jobs::health_check_api][INFO] Starting Health Check API server: http://127.0.0.1:1313
  thread 'tokio-runtime-worker' panicked at src/servers/signals.rs:52:25:
  Failed to install stop signal: channel closed
  note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
  2024-01-10T17:46:40.349793665+00:00 [torrust_tracker::bootstrap::jobs::health_check_api][INFO] Torrust Health Check API server started on: http://127.0.0.1:1313
  2024-01-10T17:46:40.349793225+00:00 [UDP Tracker][DEBUG] Halt signal spawned task stopped on address: udp://0.0.0.0:6969

  ```

  The function panicking is:

  ```rust
  pub async fn shutdown_signal(rx_halt: tokio::sync::oneshot::Receiver<Halted>) {
      let halt = async {
          match rx_halt.await {
              Ok(signal) => signal,
              Err(err) => panic!("Failed to install stop signal: {err}"),
          }
      };

      tokio::select! {
          signal = halt => { info!("Halt signal processed: {}", signal) },
          () = global_shutdown_signal() => { info!("Global shutdown signal processed") }
      }
  }
  ```

  When the thread starts waiting for the signal it receives the error "channel closed".

  I fixed a similar problem for the Tracker API and the HTTP Tracker:

  - #593
  - #589

  Apparently the problem was the sender in the channel was dropped (I don't know why, it looks like a compiler optimization).

  In the end, I think the problem was we were not waiting for the right spawned task. I've reorganized the code but the change that fixed the problem is:

  ```rust
          let task = tokio::spawn(async move {
              debug!(target: "UDP Tracker", "Launcher starting ...");

              let starting = launcher.start(tracker, tx_start, rx_halt).await;

              starting.await.expect("UDP server should have started running");

              launcher
          });
  ```

  We needed to wait for the launcher to finish because in this case, the launcher is not the thread that runs the final server. So we were only waiting until the launcher was "launched" but not for the final task running the UDP server. That is the reason why the channel was closed prematurely.

  I have also normalized the log output:

  ```
  $ cargo run
      Finished dev [optimized + debuginfo] target(s) in 0.07s
       Running `target/debug/torrust-tracker`
  Loading default configuration file: `./share/default/config/tracker.development.sqlite3.toml` ...
  2024-01-11T17:12:11.134816964+00:00 [torrust_tracker::bootstrap::logging][INFO] logging initialized.
  2024-01-11T17:12:11.135473883+00:00 [UDP Tracker][INFO] Starting on: udp://0.0.0.0:6969
  2024-01-11T17:12:11.135494422+00:00 [UDP Tracker][INFO] Started on: udp://0.0.0.0:6969
  2024-01-11T17:12:11.135503672+00:00 [torrust_tracker::bootstrap::jobs][INFO] TLS not enabled
  2024-01-11T17:12:11.135587738+00:00 [HTTP Tracker][INFO] Starting on: http://0.0.0.0:7070
  2024-01-11T17:12:11.135612497+00:00 [HTTP Tracker][INFO] Started on: http://0.0.0.0:7070
  2024-01-11T17:12:11.135619586+00:00 [torrust_tracker::bootstrap::jobs][INFO] TLS not enabled
  2024-01-11T17:12:11.135675454+00:00 [API][INFO] Starting on http://127.0.0.1:1212
  2024-01-11T17:12:11.135688443+00:00 [API][INFO] Started on http://127.0.0.1:1212
  2024-01-11T17:12:11.135701143+00:00 [Health Check API][INFO] Starting on: http://127.0.0.1:1313
  2024-01-11T17:12:11.135718012+00:00 [Health Check API][INFO] Started on: http://127.0.0.1:1313
  ^C2024-01-11T17:12:17.463967431+00:00 [torrust_tracker][INFO] Torrust shutting down..
  2024-01-11T17:12:17.463960892+00:00 [torrust_tracker::servers::signals][INFO] Global shutdown signal processed
  2024-01-11T17:12:17.464011399+00:00 [torrust_tracker::servers::signals][INFO] Shutting down HTTP server on socket address: 0.0.0.0:7070
  2024-01-11T17:12:17.464015379+00:00 [torrust_tracker::servers::signals][INFO] Sending graceful shutdown signal
  !! shuting down in 90 seconds !!
  2024-01-11T17:12:17.464035618+00:00 [torrust_tracker::servers::signals][INFO] Global shutdown signal processed
  2024-01-11T17:12:17.464055757+00:00 [torrust_tracker::servers::health_check_api::server][INFO] Stopping Torrust Health Check API server o http://127.0.0.1:1313 ...
  2024-01-11T17:12:17.464064287+00:00 [torrust_tracker::servers::signals][INFO] Global shutdown signal processed
  2024-01-11T17:12:17.464088366+00:00 [torrust_tracker::servers::signals][INFO] Shutting down UDP server on socket address: udp://0.0.0.0:6969
  2024-01-11T17:12:17.464067327+00:00 [torrust_tracker::servers::signals][INFO] Shutting down tracker API server on socket address: 127.0.0.1:1212
  2024-01-11T17:12:17.464097095+00:00 [torrust_tracker::servers::signals][INFO] Sending graceful shutdown signal
  !! shuting down in 90 seconds !!
  2024-01-11T17:12:17.464105665+00:00 [Health Check API][INFO] Stopped server running on: http://127.0.0.1:1313
  2024-01-11T17:12:17.464093226+00:00 [torrust_tracker::bootstrap::jobs::torrent_cleanup][INFO] Stopping torrent cleanup job..
  2024-01-11T17:12:17.464154493+00:00 [torrust_tracker][INFO] Torrust successfully shutdown.
  ```

  I'm using targets for some reasons:

  1. Easily identify service in logs
  2. Easily filter log by service
  3. Autodiscover services from logs

ACKs for top commit:
  josecelano:
    ACK 5fd0c84

Tree-SHA512: 83d50ded031c8fe1505ca68a7b73dc0fac7dd9d5e414ac615f357daf3682e561fda5ddf9c8ae91dab811e2ecb3c8f540d1790ebbe469c218adcdc476c016df9f
  • Loading branch information
josecelano committed Jan 11, 2024
2 parents 49c961c + 5fd0c84 commit 3dd1402
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 36 deletions.
5 changes: 3 additions & 2 deletions cSpell.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"Containerfile",
"curr",
"Cyberneering",
"datagram",
"datetime",
"Dijke",
"distroless",
Expand Down Expand Up @@ -79,6 +80,7 @@
"nonroot",
"Norberg",
"numwant",
"nvCFlJCq7fz7Qx6KoKTDiMZvns8l5Kw7",
"oneshot",
"ostr",
"Pando",
Expand Down Expand Up @@ -129,8 +131,7 @@
"Xtorrent",
"Xunlei",
"xxxxxxxxxxxxxxxxxxxxd",
"yyyyyyyyyyyyyyyyyyyyd",
"nvCFlJCq7fz7Qx6KoKTDiMZvns8l5Kw7"
"yyyyyyyyyyyyyyyyyyyyd"
],
"enableFiletypes": [
"dockerfile",
Expand Down
6 changes: 3 additions & 3 deletions src/bootstrap/jobs/health_check_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ pub async fn start_job(config: Arc<Configuration>) -> JoinHandle<()> {

// Run the API server
let join_handle = tokio::spawn(async move {
info!("Starting Health Check API server: http://{}", bind_addr);
info!(target: "Health Check API", "Starting on: http://{}", bind_addr);

let handle = server::start(bind_addr, tx_start, config.clone());

if let Ok(()) = handle.await {
info!("Health Check API server on http://{} stopped", bind_addr);
info!(target: "Health Check API", "Stopped server running on: http://{}", bind_addr);
}
});

// Wait until the API server job is running
match rx_start.await {
Ok(msg) => info!("Torrust Health Check API server started on: http://{}", msg.address),
Ok(msg) => info!(target: "Health Check API", "Started on: http://{}", msg.address),
Err(e) => panic!("the Health Check API server was dropped: {e}"),
}

Expand Down
11 changes: 11 additions & 0 deletions src/bootstrap/jobs/udp_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
//! for the configuration options.
use std::sync::Arc;

use log::debug;
use tokio::task::JoinHandle;
use torrust_tracker_configuration::UdpTracker;

Expand Down Expand Up @@ -36,10 +37,20 @@ pub async fn start_job(config: &UdpTracker, tracker: Arc<core::Tracker>) -> Join
.expect("it should be able to start the udp tracker");

tokio::spawn(async move {
debug!(target: "UDP Tracker", "Wait for launcher (UDP service) to finish ...");
debug!(target: "UDP Tracker", "Is halt channel closed before waiting?: {}", server.state.halt_task.is_closed());

assert!(
!server.state.halt_task.is_closed(),
"Halt channel for UDP tracker should be open"
);

server
.state
.task
.await
.expect("it should be able to join to the udp tracker task");

debug!(target: "UDP Tracker", "Is halt channel closed after finishing the server?: {}", server.state.halt_task.is_closed());
})
}
4 changes: 3 additions & 1 deletion src/servers/apis/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ impl Launcher {
let tls = self.tls.clone();
let protocol = if tls.is_some() { "https" } else { "http" };

info!(target: "API", "Starting on {protocol}://{}", address);

let running = Box::pin(async {
match tls {
Some(tls) => axum_server::from_tcp_rustls(socket, tls)
Expand All @@ -190,7 +192,7 @@ impl Launcher {
}
});

info!(target: "API", "API server started on {protocol}://{}", address);
info!(target: "API", "Started on {protocol}://{}", address);

tx_start
.send(Started { address })
Expand Down
2 changes: 1 addition & 1 deletion src/servers/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Launcher {
tokio::task::spawn(graceful_shutdown(
handle.clone(),
rx_halt,
format!("Shutting down http server on socket address: {address}"),
format!("Shutting down HTTP server on socket address: {address}"),
));

let tls = self.tls.clone();
Expand Down
86 changes: 57 additions & 29 deletions src/servers/udp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,30 @@ impl UdpServer<Stopped> {
let (tx_start, rx_start) = tokio::sync::oneshot::channel::<Started>();
let (tx_halt, rx_halt) = tokio::sync::oneshot::channel::<Halted>();

assert!(!tx_halt.is_closed(), "Halt channel for UDP tracker should be open");

let launcher = self.state.launcher;

let task = tokio::spawn(async move {
launcher.start(tracker, tx_start, rx_halt).await;
debug!(target: "UDP Tracker", "Launcher starting ...");

let starting = launcher.start(tracker, tx_start, rx_halt).await;

starting.await.expect("UDP server should have started running");

launcher
});

let binding = rx_start.await.expect("unable to start service").address;

let running_udp_server: UdpServer<Running> = UdpServer {
state: Running {
binding: rx_start.await.expect("unable to start service").address,
binding,
halt_task: tx_halt,
task,
},
};

info!("Running UDP Tracker on Socket: {}", running_udp_server.state.binding);

Ok(running_udp_server)
}
}
Expand Down Expand Up @@ -202,41 +209,62 @@ impl Udp {
tx_start: Sender<Started>,
rx_halt: Receiver<Halted>,
) -> JoinHandle<()> {
let binding = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to {self.socket}."));
let address = binding.local_addr().expect("Could not get local_addr from {binding}.");
let socket = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to {self.socket}."));
let address = socket.local_addr().expect("Could not get local_addr from {binding}.");

info!(target: "UDP Tracker", "Starting on: udp://{}", address);

let running = tokio::task::spawn(async move {
let halt = async move {
shutdown_signal_with_message(rx_halt, format!("Halting Http Service Bound to Socket: {address}")).await;
let halt = tokio::task::spawn(async move {
debug!(target: "UDP Tracker", "Waiting for halt signal for socket address: udp://{address} ...");

shutdown_signal_with_message(
rx_halt,
format!("Shutting down UDP server on socket address: udp://{address}"),
)
.await;
});

let listen = async move {
debug!(target: "UDP Tracker", "Waiting for packets on socket address: udp://{address} ...");

loop {
let mut data = [0; MAX_PACKET_SIZE];
let socket_clone = socket.clone();

match socket_clone.recv_from(&mut data).await {
Ok((valid_bytes, remote_addr)) => {
let payload = data[..valid_bytes].to_vec();

debug!(target: "UDP Tracker", "Received {} bytes", payload.len());
debug!(target: "UDP Tracker", "From: {}", &remote_addr);
debug!(target: "UDP Tracker", "Payload: {:?}", payload);

let response = handle_packet(remote_addr, payload, &tracker).await;

Udp::send_response(socket_clone, remote_addr, response).await;
}
Err(err) => {
error!("Error reading UDP datagram from socket. Error: {:?}", err);
}
}
}
};

pin_mut!(halt);
pin_mut!(listen);

loop {
let mut data = [0; MAX_PACKET_SIZE];
let binding = binding.clone();

tokio::select! {
() = & mut halt => {},

Ok((valid_bytes, remote_addr)) = binding.recv_from(&mut data) => {
let payload = data[..valid_bytes].to_vec();

debug!("Received {} bytes", payload.len());
debug!("From: {}", &remote_addr);
debug!("Payload: {:?}", payload);
tx_start
.send(Started { address })
.expect("the UDP Tracker service should not be dropped");

let response = handle_packet(remote_addr, payload, &tracker).await;

Udp::send_response(binding, remote_addr, response).await;
}
}
tokio::select! {
_ = & mut halt => { debug!(target: "UDP Tracker", "Halt signal spawned task stopped on address: udp://{address}"); },
() = & mut listen => { debug!(target: "UDP Tracker", "Socket listener stopped on address: udp://{address}"); },
}
});

tx_start
.send(Started { address })
.expect("the UDP Tracker service should not be dropped");
info!(target: "UDP Tracker", "Started on: udp://{}", address);

running
}
Expand Down

0 comments on commit 3dd1402

Please sign in to comment.