Skip to content

Commit

Permalink
horust compiling and tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
FedericoPonzi committed Nov 2, 2024
1 parent 102f0e9 commit 0b69994
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 270 deletions.
275 changes: 59 additions & 216 deletions Cargo.lock

Large diffs are not rendered by default.

47 changes: 40 additions & 7 deletions commands/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::proto::messages::{
use anyhow::{anyhow, Context, Result};
use log::{error, info};
use prost::Message;
use std::io::{Read, Write};
use std::io::{ErrorKind, Read, Write};
use std::net::Shutdown;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::{Path, PathBuf};
Expand All @@ -33,7 +33,10 @@ pub trait CommandsHandlerTrait {
}
}
Err(e) => {
error!("Error accepting connction: {e} - you might need to restart Horust.");
let kind = e.kind();
if !matches!(ErrorKind::WouldBlock, kind) {
error!("Error accepting connction: {e} - you might need to restart Horust.");
}
}
};
Ok(())
Expand Down Expand Up @@ -79,13 +82,41 @@ pub struct ClientHandler {
uds_connection_handler: UdsConnectionHandler,
}
impl ClientHandler {
pub fn new_client(socket_path: PathBuf) -> Result<Self> {
pub fn new_client(socket_path: &Path) -> Result<Self> {
Ok(Self {
uds_connection_handler: UdsConnectionHandler::new(
UnixStream::connect(socket_path).context("Could not create stream")?,
),
})
}
pub fn send_status_request(
&mut self,
service_name: String,
) -> Result<(String, HorustMsgServiceStatus)> {
let status = HorustMsgMessage {
request_type: Some(RequestType::StatusRequest(HorustMsgServiceStatusRequest {
service_name,
})),
};
self.uds_connection_handler.send_message(status)?;
// server is waiting for EOF.
self.uds_connection_handler
.socket
.shutdown(Shutdown::Write)?;
//Reads all bytes until EOF in this source, appending them to buf.
let received = self.uds_connection_handler.receive_message()?;
info!("Client: received: {received:?}");
match received
.request_type
.ok_or(anyhow!("Error receiving message"))?
{
RequestType::StatusResponse(resp) => Ok((
resp.service_name,
HorustMsgServiceStatus::from_i32(resp.service_status).unwrap(),
)),
_ => unreachable!(),
}
}

pub fn client(mut self, service_name: String) -> Result<()> {
let status = HorustMsgMessage {
Expand All @@ -105,14 +136,16 @@ impl ClientHandler {
}
}

/// socket_name should be the pid of the horust process.
pub fn get_path(socket_folder: &Path, socket_name: i32) -> PathBuf {
socket_folder.join(format!("hourst-{socket_name}.sock"))
}

pub struct UdsConnectionHandler {
socket: UnixStream,
}
impl UdsConnectionHandler {
fn get_path(socket_folder: &Path, socket_name: u32) -> PathBuf {
socket_folder.join(format!("hourst-{socket_name}.sock"))
}
fn new(socket: UnixStream) -> Self {
pub fn new(socket: UnixStream) -> Self {
Self { socket }
}
pub fn send_message(&mut self, message: HorustMsgMessage) -> Result<()> {
Expand Down
7 changes: 6 additions & 1 deletion commands/tests/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ impl CommandsHandlerTrait for MockCommandsHandler {
fn get_service_status(&self, service_name: String) -> HorustMsgServiceStatus {
match service_name.as_str() {
"Running" => HorustMsgServiceStatus::Running,
"Started" => HorustMsgServiceStatus::Started,
_ => unimplemented!(),
}
}
Expand All @@ -50,12 +51,16 @@ fn test_simple() -> Result<()> {
info!("uds created");
barrier_server.wait();
uds.accept().unwrap();
uds.accept().unwrap();
});

let c_handle = thread::spawn(move || {
barrier_client.wait();
let client = ClientHandler::new_client(socket_path).unwrap();
let client = ClientHandler::new_client(&socket_path).unwrap();
client.client("Running".into()).unwrap();

let client = ClientHandler::new_client(&socket_path).unwrap();
client.client("Started".into()).unwrap();
});
s_handle.join().unwrap();
c_handle.join().unwrap();
Expand Down
53 changes: 43 additions & 10 deletions horust/src/horust/commands_handler.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,64 @@
use crate::horust::bus::BusConnector;
use crate::horust::formats::{ServiceName, ServiceStatus};
use crate::horust::Event;
use horust_commands_lib::{CommandsHandlerTrait, HorustMsgServiceStatus};
use std::collections::HashMap;
use std::os::unix::net::UnixListener;
use std::path::PathBuf;
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
use std::{fs, thread};

pub fn spawn(bus: BusConnector<Event>, uds_folder_path: PathBuf) -> JoinHandle<()> {
pub fn spawn(
bus: BusConnector<Event>,
uds_path: PathBuf,
services: Vec<ServiceName>,
) -> JoinHandle<()> {
thread::spawn(move || {
run(bus, uds_folder_path);
let mut commands_handler = CommandsHandler::new(bus, uds_path, services);
commands_handler.run();
})
}

fn run(bus: BusConnector<Event>, uds_folder_path: PathBuf) {
let mut commands_handler = CommandsHandler::new(bus, uds_folder_path);
commands_handler.start();
}

struct CommandsHandler {
bus: BusConnector<Event>,
services: HashMap<ServiceName, ServiceStatus>,
uds_listener: UnixListener,
uds_path: PathBuf,
}

impl CommandsHandler {
fn new(bus: BusConnector<Event>, uds_folder_path: PathBuf) -> Self {
fn new(bus: BusConnector<Event>, uds_path: PathBuf, services: Vec<ServiceName>) -> Self {
let mut uds_listener = UnixListener::bind(&uds_path).unwrap();
uds_listener.set_nonblocking(true).unwrap();
Self {
bus,
uds_listener: UnixListener::bind(uds_folder_path).unwrap(),
uds_path,
uds_listener,
services: services
.into_iter()
.map(|s| (s, ServiceStatus::Initial))
.collect(),
}
}
fn run(&mut self) {
loop {
let evs = self.bus.try_get_events();
for ev in evs {
match ev {
Event::StatusChanged(name, status) => {
let k = self.services.get_mut(&name).unwrap();
*k = status;
}
Event::ShuttingDownInitiated(_) => {
fs::remove_file(&self.uds_path).unwrap();
return;
}
_ => {}
}
}
self.accept().unwrap();
thread::sleep(Duration::from_millis(300));
}
}
}
Expand Down
18 changes: 8 additions & 10 deletions horust/src/horust/healthcheck/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! If a service has defined an healthchecker, this module will spawn a worker to making sure that
//! the service is working as supposed to.
use std::net::{SocketAddrV4, TcpListener};
use std::sync::mpsc;
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
Expand Down Expand Up @@ -153,10 +155,6 @@ mod test {
use crate::horust::formats::{Healthiness, HealthinessStatus};
use crate::horust::healthcheck::check_health;

fn check_health_w(healthiness: &Healthiness) -> bool {
check_health(healthiness) == HealthinessStatus::Healthy
}

#[test]
fn test_healthiness_check_file() -> Result<()> {
let tempdir = TempDir::new("health")?;
Expand All @@ -166,11 +164,11 @@ mod test {
http_endpoint: None,
..Default::default()
};
assert!(!check_health_w(&healthiness));
assert_ne!(check_health(&healthiness), HealthinessStatus::Healthy);
std::fs::write(file_path, "Hello world!")?;
assert!(check_health_w(&healthiness));
assert_eq!(check_health(&healthiness), HealthinessStatus::Healthy);
let healthiness: Healthiness = Default::default();
assert!(check_health_w(&healthiness));
assert_eq!(check_health(&healthiness), HealthinessStatus::Healthy);
Ok(())
}

Expand All @@ -193,7 +191,7 @@ mod test {
http_endpoint: Some("http://localhost:123/".into()),
..Default::default()
};
assert!(!check_health_w(&healthiness));
assert_ne!(check_health(&healthiness), HealthinessStatus::Healthy);
let loopback = Ipv4Addr::new(127, 0, 0, 1);
let socket = SocketAddrV4::new(loopback, 0);
let listener = TcpListener::bind(socket)?;
Expand All @@ -209,11 +207,11 @@ mod test {
handle_request(listener).unwrap();
sender.send(()).expect("Chan closed");
});
assert!(check_health_w(&healthiness));
assert_eq!(check_health(&healthiness), HealthinessStatus::Healthy);
receiver
.recv_timeout(Duration::from_millis(2000))
.expect("Failed to received response from handle_request");
assert!(!check_health_w(&healthiness));
assert_ne!(check_health(&healthiness), HealthinessStatus::Healthy);
Ok(())
}
}
22 changes: 9 additions & 13 deletions horust/src/horust/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,22 @@ mod formats;
mod healthcheck;
mod signal_safe;
mod supervisor;
mod uds_messages;

#[derive(Debug)]
pub struct Horust {
services: Vec<Service>,
uds_folder_path: PathBuf,
uds_path: PathBuf,
}

impl Horust {
fn new(services: Vec<Service>, uds_folder_path: PathBuf) -> Self {
Horust {
services,
uds_folder_path,
}
fn new(services: Vec<Service>, uds_path: PathBuf) -> Self {
Horust { services, uds_path }
}

/// Creates a new Horust instance from a command.
/// The command will be wrapped in a service and run with sane defaults
pub fn from_command(command: String, uds_folder_path: PathBuf) -> Self {
Self::new(vec![Service::from_command(command)], uds_folder_path)
pub fn from_command(command: String, uds_path: PathBuf) -> Self {
Self::new(vec![Service::from_command(command)], uds_path)
}

fn load_services_from_folders(paths: &[PathBuf]) -> Result<Vec<Service>> {
Expand All @@ -54,10 +50,10 @@ impl Horust {
.collect::<Result<Vec<_>>>()
}
/// Create a new horust instance from multiple paths of services.
pub fn from_services_dirs(paths: &[PathBuf], uds_folder_path: PathBuf) -> Result<Self> {
pub fn from_services_dirs(paths: &[PathBuf], uds_path: PathBuf) -> Result<Self> {
let services = Self::load_services_from_folders(paths)?;
let services = validate(services)?;
Ok(Horust::new(services, uds_folder_path))
Ok(Horust::new(services, uds_path))
}

/// Blocking call, will setup the event loop and the threads and run all the available services.
Expand All @@ -84,9 +80,9 @@ impl Horust {
// Spawn helper threads:
healthcheck::spawn(dispatcher.join_bus(), self.services.clone());
commands_handler::spawn(
self.uds_folder_path.clone(),
dispatcher.join_bus(),
self.services.clone(),
self.uds_path.clone(),
self.services.iter().map(|s| s.name.clone()).collect(),
);
let handle = supervisor::spawn(dispatcher.join_bus(), self.services.clone());
dispatcher.run();
Expand Down
6 changes: 0 additions & 6 deletions horust/src/horust/uds_messages.rs

This file was deleted.

10 changes: 6 additions & 4 deletions horust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use clap::Parser;
use horust::horust::ExitStatus;
use horust::Horust;
use log::{error, info};
use nix::unistd::getpid;

#[derive(clap::Parser, Debug)]
#[clap(author, about)]
Expand Down Expand Up @@ -50,7 +51,7 @@ fn main() -> Result<()> {
}

if !opts.uds_folder_path.exists() {
std::fs::create_dir_all(opts.uds_folder_path)?;
std::fs::create_dir_all(&opts.uds_folder_path)?;
}

if !opts.uds_folder_path.is_dir() {
Expand All @@ -59,22 +60,23 @@ fn main() -> Result<()> {
opts.uds_folder_path
);
}
let uds_path = horust_commands_lib::get_path(&opts.uds_folder_path, getpid().into());

let mut horust = if opts.command.is_empty() {
info!(
"Loading services from {}",
display_directories(&opts.services_paths)
);
Horust::from_services_dirs(&opts.services_paths).with_context(|| {
Horust::from_services_dirs(&opts.services_paths, uds_path).with_context(|| {
format!(
"Failed loading services from {}",
display_directories(&opts.services_paths)
)
})?
} else {
info!("Running command: {:?}", opts.command);
Horust::from_command(opts.command.join(" "))
Horust::from_command(opts.command.join(" "), uds_path)
};
horust.set_uds_folder_path();

if let ExitStatus::SomeServiceFailed = horust.run() {
if opts.unsuccessful_exit_finished_failed {
Expand Down
2 changes: 2 additions & 0 deletions horustctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ log = "~0.4"
env_logger = "~0.11"
crossbeam = "~0.8"
clap = { version = "~4.5", features = ["derive"] }
horust-commands-lib = {path = "../commands"}

Loading

0 comments on commit 0b69994

Please sign in to comment.