Skip to content

Commit

Permalink
Merge pull request #228 from FedericoPonzi/issue-201
Browse files Browse the repository at this point in the history
Don't spawn a worker if there are no healthchecks to be done.
  • Loading branch information
FedericoPonzi authored Apr 22, 2024
2 parents f9d20c2 + f4048a5 commit 620e3af
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 30 deletions.
5 changes: 5 additions & 0 deletions src/horust/formats/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,18 @@ pub struct Healthiness {
pub http_endpoint: Option<String>,
pub file_path: Option<PathBuf>,
#[serde(default = "Healthiness::default_max_failed")]
// todo: use an u32
pub max_failed: i32,
}

impl Healthiness {
fn default_max_failed() -> i32 {
3
}

pub(crate) fn has_any_check_defined(&self) -> bool {
self.http_endpoint.is_some() || self.file_path.is_some()
}
}

impl Default for Healthiness {
Expand Down
9 changes: 5 additions & 4 deletions src/horust/healthcheck/checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ use reqwest::blocking::Client;

use crate::horust::formats::Healthiness;

static FILE_CHECK: FilePathCheck = FilePathCheck {};
static HTTP_CHECK: HttpCheck = HttpCheck {};
const FILE_CHECK: FilePathCheck = FilePathCheck {};
const HTTP_CHECK: HttpCheck = HttpCheck {};
const CHECKS: [&dyn Check; 2] = [&FILE_CHECK, &HTTP_CHECK];

pub(crate) fn get_checks() -> Vec<&'static dyn Check> {
vec![&FILE_CHECK, &HTTP_CHECK]
pub(crate) fn get_checks() -> [&'static dyn Check; 2] {
CHECKS
}

pub(crate) trait Check {
Expand Down
47 changes: 29 additions & 18 deletions src/horust/healthcheck/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;

use crossbeam::channel::{unbounded, Receiver, RecvTimeoutError};
use crossbeam::channel::{unbounded, Receiver, RecvTimeoutError, Sender};

use checks::*;

Expand Down Expand Up @@ -62,12 +62,10 @@ pub fn spawn(bus: BusConnector<Event>, services: Vec<Service>) {

/// Returns true if the service is healthy and all checks are passed.
fn check_health(healthiness: &Healthiness) -> HealthinessStatus {
let failed_checks = get_checks()
get_checks()
.into_iter()
.filter(|check| !check.run(healthiness))
.count();
let is_healthy = failed_checks == 0;
is_healthy.into()
.all(|check| check.run(healthiness))
.into()
}

fn run(bus: BusConnector<Event>, services: Vec<Service>) {
Expand All @@ -81,23 +79,25 @@ fn run(bus: BusConnector<Event>, services: Vec<Service>) {
.collect::<Vec<Service>>()
.remove(0)
};

for ev in bus.iter() {
match ev {
Event::StatusChanged(s_name, ServiceStatus::Started) => {
let (worker_notifier, work_done_rcv) = unbounded();
let service = get_service(&s_name);
let w = Worker::new(service, bus.join_bus(), work_done_rcv);
let handle = w.spawn_thread();
if !service.healthiness.has_any_check_defined() {
bus.send_event(Event::HealthCheck(s_name, HealthinessStatus::Healthy));
continue;
}
if let Some((sender, handler)) = workers.remove(&s_name) {
stop_worker(sender, handler)
}
let (worker_notifier, work_done_rcv) = unbounded();
let handle = Worker::new(service, bus.join_bus(), work_done_rcv).spawn_thread();
workers.insert(s_name, (worker_notifier, handle));
}
Event::ServiceExited(s_name, _exit_code) => {
if let Some((sender, handler)) = workers.remove(&s_name) {
if sender.send(()).is_err() {
error!("Cannot send msg to sender - channel closed.");
}
if let Err(error) = handler.join() {
error!("Error joining thread: {:?}", error);
}
stop_worker(sender, handler)
} else {
warn!("Worker thread for {} not found.", s_name);
}
Expand All @@ -119,10 +119,22 @@ fn run(bus: BusConnector<Event>, services: Vec<Service>) {
}
}

fn stop_worker(sender: Sender<()>, handler: JoinHandle<()>) {
if let Err(error) = sender.send(()) {
error!(
"Cannot send msg to sender - channel might be closed. Error: {:?}",
error
);
}
if let Err(error) = handler.join() {
error!("Error joining thread: {:?}", error);
}
}

/// Setup require for the service, before running the healthchecks and starting the service
pub fn prepare_service(healthiness: &Healthiness) -> Result<Vec<()>, std::io::Error> {
get_checks()
.into_iter()
.iter()
.map(|check| check.prepare(healthiness))
.collect()
}
Expand Down Expand Up @@ -163,14 +175,13 @@ mod test {
}

fn handle_request(listener: TcpListener) -> std::io::Result<()> {
for stream in listener.incoming() {
if let Some(stream) = listener.incoming().next() {
info!("Received request");
let mut buffer = [0; 512];
let mut stream = stream?;
stream.read(&mut buffer).unwrap();
let response = b"HTTP/1.1 200 OK\r\n\r\n";
stream.write(response).expect("Stream write");
break;
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/horust/supervisor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! The supervisor is one of the biggest module. It is responsible for supervising the services, and
//! The supervisor is one of the biggest modules. It is responsible for supervising the services, and
//! keeping track of their current state.
//! It will also reap the dead processes
Expand Down
2 changes: 1 addition & 1 deletion src/horust/supervisor/reaper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::horust::Event;
///
/// # Safety
///
/// This function must run in isolation with respect to the fork processes in order to
/// This function must run in isolation with respect to the fork processes to
/// prevent pid reusage.
pub(crate) fn run(repo: &Repo, max_iterations: u32) -> Vec<Event> {
(0..max_iterations)
Expand Down
5 changes: 2 additions & 3 deletions src/horust/supervisor/service_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ fn next_events(repo: &Repo, service_handler: &ServiceHandler) -> Vec<Event> {
ServiceStatus::Initial if repo.is_service_runnable(service_handler) => {
vec![Event::Run(service_handler.name().clone())]
}
// if enough time have passed, this will be considered running
// if enough time has passed, this will be considered running
ServiceStatus::Started if !service_handler.has_some_failed_healthchecks() => {
vev_status(ServiceStatus::Running)
}
Expand Down Expand Up @@ -227,8 +227,7 @@ fn handle_status_change(
//TODO: refactor + cleanup.
// A -> [B,C] means that transition to A is allowed only if service is in state B or C.
let allowed_transitions = hashmap! {
ServiceStatus::Initial => vec![ServiceStatus::Success, ServiceStatus::Failed,
ServiceStatus::Started],
ServiceStatus::Initial => vec![ServiceStatus::Success, ServiceStatus::Failed],
ServiceStatus::Starting => vec![ServiceStatus::Initial],
ServiceStatus::Started => vec![ServiceStatus::Starting],
ServiceStatus::InKilling => vec![ServiceStatus::Initial,
Expand Down
30 changes: 27 additions & 3 deletions tests/section_healthiness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use std::io::{Read, Write};
use std::net::{Ipv4Addr, SocketAddrV4, TcpListener};
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, TryRecvError};
use std::thread;
use std::time::Duration;
use std::{io, thread};
use utils::*;

fn handle_requests(listener: TcpListener, stop: Receiver<()>) -> std::io::Result<()> {
fn handle_requests(listener: TcpListener, stop: Receiver<()>) -> io::Result<()> {
listener.set_nonblocking(true).unwrap();
for stream in listener.incoming() {
match stream {
Expand All @@ -30,7 +30,7 @@ fn handle_requests(listener: TcpListener, stop: Receiver<()>) -> std::io::Result
}

#[test]
fn test_http_healthcheck() -> Result<(), std::io::Error> {
fn test_healthcheck_http() -> io::Result<()> {
let (mut cmd, tempdir) = get_cli();
let loopback = Ipv4Addr::new(127, 0, 0, 1);
let socket = SocketAddrV4::new(loopback, 0);
Expand Down Expand Up @@ -66,3 +66,27 @@ http-endpoint = "{}""#,
.expect("Failed to received response from handle_request");
Ok(())
}

#[test]
fn test_healthcheck_file() -> io::Result<()> {
let (mut cmd, tempdir) = get_cli();
let service = format!(
r#"
[termination]
wait = "1s"
[restart]
strategy = "never"
[healthiness]
file-path = "{}""#,
tempdir.path().join("healthy").display()
);
let script = r#"#!/usr/bin/env bash
touch healthy;
sleep 2;
exit 0;
"#;
store_service_script(tempdir.path(), script, Some(service.as_str()), None);
let mut cmd = cmd.args(vec!["--unsuccessful-exit-finished-failed"]);
run_async(&mut cmd, true).recv_or_kill(Duration::from_secs(70));
Ok(())
}

0 comments on commit 620e3af

Please sign in to comment.