Skip to content

Commit

Permalink
Merge #629: Fix services bootstraping
Browse files Browse the repository at this point in the history
17296cd fix: [#626] healt check api server shutdown (Jose Celano)
bbf1be6 fix: don't start HTTP tracker if it's disabled (Jose Celano)

Pull request description:

  This fixes all errors introduced after merging [this PR](#623).

  - [x] HTTP tracker should not be started if it's disabled

  Error running the Health Check API server:

  ```console
  thread 'tokio-runtime-worker' panicked at src/servers/signals.rs:52:25:
  Failed to install stop signal: channel closed
  ```

ACKs for top commit:
  josecelano:
    ACK 17296cd

Tree-SHA512: 95fb8011f6eb58b52f8805185ccabb484be29d6231930aaa9a24d4a9b0a4b5dd457259a343c4a337529264ecd42f76609a57914a661a7ce19255e0fcb41a3145
  • Loading branch information
josecelano committed Jan 19, 2024
2 parents 203ce96 + 17296cd commit 62fddba
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 12 deletions.
4 changes: 4 additions & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ pub async fn start(config: &Configuration, tracker: Arc<core::Tracker>) -> Vec<J

// Start the HTTP blocks
for http_tracker_config in &config.http_trackers {
if !http_tracker_config.enabled {
continue;
}

if let Some(job) = http_tracker::start_job(
http_tracker_config,
tracker.clone(),
Expand Down
20 changes: 14 additions & 6 deletions src/bootstrap/jobs/health_check_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,32 @@ pub async fn start_job(config: &HealthCheckApi, register: ServiceRegistry) -> Jo

let (tx_start, rx_start) = oneshot::channel::<Started>();
let (tx_halt, rx_halt) = tokio::sync::oneshot::channel::<Halted>();
drop(tx_halt);

let protocol = "http";

// Run the API server
let join_handle = tokio::spawn(async move {
info!(target: "Health Check API", "Starting on: http://{}", bind_addr);
info!(target: "Health Check API", "Starting on: {protocol}://{}", bind_addr);

let handle = server::start(bind_addr, tx_start, rx_halt, register);

if let Ok(()) = handle.await {
info!(target: "Health Check API", "Stopped server running on: http://{}", bind_addr);
info!(target: "Health Check API", "Stopped server running on: {protocol}://{}", bind_addr);
}
});

// Wait until the API server job is running
// Wait until the server sends the started message
match rx_start.await {
Ok(msg) => info!(target: "Health Check API", "Started on: http://{}", msg.address),
Ok(msg) => info!(target: "Health Check API", "Started on: {protocol}://{}", msg.address),
Err(e) => panic!("the Health Check API server was dropped: {e}"),
}

join_handle
// Wait until the server finishes
tokio::spawn(async move {
assert!(!tx_halt.is_closed(), "Halt channel for Health Check API should be open");

join_handle
.await
.expect("it should be able to join to the Health Check API server task");
})
}
12 changes: 9 additions & 3 deletions src/servers/apis/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use axum_server::tls_rustls::RustlsConfig;
use axum_server::Handle;
use derive_more::Constructor;
use futures::future::BoxFuture;
use log::{error, info};
use log::{debug, error, info};
use tokio::sync::oneshot::{Receiver, Sender};
use torrust_tracker_configuration::AccessTokens;

Expand Down Expand Up @@ -120,7 +120,12 @@ impl ApiServer<Stopped> {
let launcher = self.state.launcher;

let task = tokio::spawn(async move {
launcher.start(tracker, access_tokens, tx_start, rx_halt).await;
debug!(target: "API", "Starting with launcher in spawned task ...");

let _task = launcher.start(tracker, access_tokens, tx_start, rx_halt).await;

debug!(target: "API", "Started with launcher in spawned task");

launcher
});

Expand Down Expand Up @@ -266,9 +271,10 @@ mod tests {
#[tokio::test]
async fn it_should_be_able_to_start_and_stop() {
let cfg = Arc::new(ephemeral_mode_public());
let tracker = initialize_with_configuration(&cfg);
let config = &cfg.http_api;

let tracker = initialize_with_configuration(&cfg);

let bind_to = config
.bind_address
.parse::<std::net::SocketAddr>()
Expand Down
5 changes: 4 additions & 1 deletion src/servers/health_check_api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use axum::routing::get;
use axum::{Json, Router};
use axum_server::Handle;
use futures::Future;
use log::debug;
use serde_json::json;
use tokio::sync::oneshot::{Receiver, Sender};

Expand Down Expand Up @@ -37,10 +38,12 @@ pub fn start(

let handle = Handle::new();

debug!(target: "Health Check API", "Starting service with graceful shutdown in a spawned task ...");

tokio::task::spawn(graceful_shutdown(
handle.clone(),
rx_halt,
format!("shutting down http server on socket address: {address}"),
format!("Shutting down http server on socket address: {address}"),
));

let running = axum_server::from_tcp(socket)
Expand Down
10 changes: 8 additions & 2 deletions src/servers/registar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::net::SocketAddr;
use std::sync::Arc;

use derive_more::Constructor;
use log::debug;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

Expand Down Expand Up @@ -81,10 +82,15 @@ impl Registar {

/// Inserts a listing into the registry.
async fn insert(&self, rx: tokio::sync::oneshot::Receiver<ServiceRegistration>) {
let listing = rx.await.expect("it should receive the listing");
debug!("Waiting for the started service to send registration data ...");

let service_registration = rx
.await
.expect("it should receive the service registration from the started service");

let mut mutex = self.registry.lock().await;
mutex.insert(listing.binding, listing);

mutex.insert(service_registration.binding, service_registration);
}

/// Returns the [`ServiceRegistry`] of services
Expand Down
10 changes: 10 additions & 0 deletions tests/servers/health_check_api/environment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::net::SocketAddr;
use std::sync::Arc;

use log::debug;
use tokio::sync::oneshot::{self, Sender};
use tokio::task::JoinHandle;
use torrust_tracker::bootstrap::jobs::Started;
Expand Down Expand Up @@ -50,13 +51,22 @@ impl Environment<Stopped> {

let register = self.registar.entries();

debug!(target: "Health Check API", "Spawning task to launch the service ...");

let server = tokio::spawn(async move {
debug!(target: "Health Check API", "Starting the server in a spawned task ...");

server::start(self.state.bind_to, tx_start, rx_halt, register)
.await
.expect("it should start the health check service");

debug!(target: "Health Check API", "Server started. Sending the binding {} ...", self.state.bind_to);

self.state.bind_to
});

debug!(target: "Health Check API", "Waiting for spawning task to send the binding ...");

let binding = rx_start.await.expect("it should send service binding").address;

Environment {
Expand Down

0 comments on commit 62fddba

Please sign in to comment.