Skip to content

Commit

Permalink
Refactor commands lib and add response errors handling
Browse files Browse the repository at this point in the history
  • Loading branch information
FedericoPonzi committed Nov 2, 2024
1 parent 0b69994 commit 4dbd11e
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 185 deletions.
79 changes: 79 additions & 0 deletions commands/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use crate::proto::messages::horust_msg_message::MessageType;
use crate::proto::messages::{
horust_msg_request, horust_msg_response, HorustMsgMessage, HorustMsgRequest,
HorustMsgServiceStatusRequest,
};
use crate::{HorustMsgServiceStatus, UdsConnectionHandler};
use anyhow::{anyhow, Context};
use anyhow::{bail, Result};
use log::info;
use std::net::Shutdown;
use std::os::unix::net::UnixStream;
use std::path::Path;

fn new_request(request_type: horust_msg_request::Request) -> HorustMsgMessage {
HorustMsgMessage {
message_type: Some(MessageType::Request(HorustMsgRequest {
request: Some(request_type),
})),
}
}

// if anything is none it will return none
// if the response was an error it will return Some(Err).
fn unwrap_response(response: HorustMsgMessage) -> Option<Result<horust_msg_response::Response>> {
if let MessageType::Response(resp) = response.message_type? {
let v = resp.response?;
return match &v {
horust_msg_response::Response::Error(error) => {
Some(Err(anyhow!("Error: {}", error.error_string)))
}
horust_msg_response::Response::StatusResponse(status) => Some(Ok(v)),
};
}
None
}

pub struct ClientHandler {
uds_connection_handler: UdsConnectionHandler,
}
impl ClientHandler {
pub fn new_client(socket_path: &Path) -> Result<Self> {
Ok(Self {
uds_connection_handler: UdsConnectionHandler::new(
UnixStream::connect(socket_path).context("Could not create stream")?,
),
})
}
pub fn send_status_request(
&mut self,
service_name: String,
) -> Result<(String, HorustMsgServiceStatus)> {
let status = new_request(horust_msg_request::Request::StatusRequest(
HorustMsgServiceStatusRequest { service_name },
));
self.uds_connection_handler.send_message(status)?;
// server is waiting for EOF.
self.uds_connection_handler
.socket
.shutdown(Shutdown::Write)?;
//Reads all bytes until EOF in this source, appending them to buf.
let received = self.uds_connection_handler.receive_message()?;
info!("Client: received: {received:?}");
let response = unwrap_response(received).unwrap()?;
if let horust_msg_response::Response::StatusResponse(resp) = response {
return Ok((
resp.service_name,
HorustMsgServiceStatus::from_i32(resp.service_status).unwrap(),
));
} else {
bail!("Invalid response received: {:?}", response);
}
}

pub fn client(mut self, service_name: String) -> Result<()> {
let received = self.send_status_request(service_name)?;
info!("Client: received: {received:?}");
Ok(())
}
}
30 changes: 23 additions & 7 deletions commands/src/commands.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,26 @@ syntax = "proto3";
package messages;

message HorustMsgMessage {
oneof request_type {
oneof message_type {
HorustMsgRequest request = 1;
HorustMsgResponse response = 2;
}
}
message HorustMsgRequest {
oneof request {
HorustMsgServiceStatusRequest status_request = 1;
HorustMsgServiceChangeRequest change_request = 2;
}
}
message HorustMsgResponse {
oneof response {
HorustMsgError error = 1;
HorustMsgServiceStatusResponse status_response = 2;
HorustMsgServiceChangeRequest change_request = 3;
}
}
message HorustMsgError {

message HorustMsgError {
string error_string = 1;
}

message HorustMsgServiceStatusRequest {
Expand All @@ -33,9 +45,13 @@ message HorustMsgServiceChangeResponse {
}

enum HorustMsgServiceStatus {
STOPPED = 0;
STARTING = 0;
STARTED = 1;
KILLED = 2;
SHUTTING_DOWN = 3;
RUNNING = 4;
RUNNING = 2;
INKILLING = 3;
SUCCESS = 4;
FINISHED = 5;
FINISHEDFAILED = 6;
FAILED = 7;
INITIAL = 8;
}
149 changes: 15 additions & 134 deletions commands/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,144 +1,22 @@
extern crate core;

mod client;
mod proto;

use crate::proto::messages::horust_msg_message::RequestType;
mod server;
use crate::proto::messages::HorustMsgMessage;
pub use crate::proto::messages::HorustMsgServiceStatus;
use crate::proto::messages::{
HorustMsgMessage, HorustMsgServiceStatusRequest, HorustMsgServiceStatusResponse,
};
use anyhow::{anyhow, Context, Result};
use log::{error, info};
use anyhow::{Context, Result};
pub use client::ClientHandler;
use log::debug;
use prost::Message;
use std::io::{ErrorKind, Read, Write};
use std::net::Shutdown;
use std::os::unix::net::{UnixListener, UnixStream};
pub use server::CommandsHandlerTrait;
use std::io::{Read, Write};
use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};

pub trait CommandsHandlerTrait {
// "blocking" - execute in its own tokio task.
fn start(&mut self) -> Result<()> {
// put the server logic in a loop to accept several connections
loop {
self.accept().expect("TODO: panic message");
}
}
fn get_unix_listener(&mut self) -> &mut UnixListener;
fn accept(&mut self) -> Result<()> {
match self.get_unix_listener().accept() {
Ok((stream, _addr)) => {
let conn_handler = UdsConnectionHandler::new(stream);
if let Err(err) = self.handle_connection(conn_handler) {
error!("Error handling connection: {}", err);
}
}
Err(e) => {
let kind = e.kind();
if !matches!(ErrorKind::WouldBlock, kind) {
error!("Error accepting connction: {e} - you might need to restart Horust.");
}
}
};
Ok(())
}
fn handle_connection(&self, mut uds_conn_handler: UdsConnectionHandler) -> Result<()> {
let received = uds_conn_handler
.receive_message()?
.request_type
.ok_or(anyhow!("No request found in message sent from client."))?;
match received {
RequestType::StatusRequest(status_request) => {
info!("Requested status for {}", status_request.service_name);
let service_status = self.get_service_status(status_request.service_name.clone());
uds_conn_handler.send_message(new_horust_msg_service_status_response(
status_request.service_name,
service_status,
))?;
}
RequestType::StatusResponse(_) => {}
RequestType::ChangeRequest(_) => {}
};
Ok(())
}

fn get_service_status(&self, service_name: String) -> HorustMsgServiceStatus;
}

pub fn new_horust_msg_service_status_response(
service_name: String,
status: HorustMsgServiceStatus,
) -> HorustMsgMessage {
HorustMsgMessage {
request_type: Some(RequestType::StatusResponse(
HorustMsgServiceStatusResponse {
service_name,
service_status: status.into(),
},
)),
}
}

pub struct ClientHandler {
uds_connection_handler: UdsConnectionHandler,
}
impl ClientHandler {
pub fn new_client(socket_path: &Path) -> Result<Self> {
Ok(Self {
uds_connection_handler: UdsConnectionHandler::new(
UnixStream::connect(socket_path).context("Could not create stream")?,
),
})
}
pub fn send_status_request(
&mut self,
service_name: String,
) -> Result<(String, HorustMsgServiceStatus)> {
let status = HorustMsgMessage {
request_type: Some(RequestType::StatusRequest(HorustMsgServiceStatusRequest {
service_name,
})),
};
self.uds_connection_handler.send_message(status)?;
// server is waiting for EOF.
self.uds_connection_handler
.socket
.shutdown(Shutdown::Write)?;
//Reads all bytes until EOF in this source, appending them to buf.
let received = self.uds_connection_handler.receive_message()?;
info!("Client: received: {received:?}");
match received
.request_type
.ok_or(anyhow!("Error receiving message"))?
{
RequestType::StatusResponse(resp) => Ok((
resp.service_name,
HorustMsgServiceStatus::from_i32(resp.service_status).unwrap(),
)),
_ => unreachable!(),
}
}

pub fn client(mut self, service_name: String) -> Result<()> {
let status = HorustMsgMessage {
request_type: Some(RequestType::StatusRequest(HorustMsgServiceStatusRequest {
service_name,
})),
};
self.uds_connection_handler.send_message(status)?;
// server is waiting for EOF.
self.uds_connection_handler
.socket
.shutdown(Shutdown::Write)?;
//Reads all bytes until EOF in this source, appending them to buf.
let received = self.uds_connection_handler.receive_message()?;
info!("Client: received: {received:?}");
Ok(())
}
}

/// socket_name should be the pid of the horust process.
pub fn get_path(socket_folder: &Path, socket_name: i32) -> PathBuf {
socket_folder.join(format!("hourst-{socket_name}.sock"))
pub fn get_path(socket_folder_path: &Path, horust_pid: i32) -> PathBuf {
socket_folder_path.join(format!("hourst-{horust_pid}.sock"))
}

pub struct UdsConnectionHandler {
Expand All @@ -149,6 +27,7 @@ impl UdsConnectionHandler {
Self { socket }
}
pub fn send_message(&mut self, message: HorustMsgMessage) -> Result<()> {
debug!("Sending message: {:?}", message);
let mut buf = Vec::new();
// Serialize the message into a byte array.
message.encode(&mut buf)?;
Expand All @@ -160,6 +39,8 @@ impl UdsConnectionHandler {
pub fn receive_message(&mut self) -> Result<HorustMsgMessage> {
let mut buf = Vec::new();
self.socket.read_to_end(&mut buf)?;
Ok(HorustMsgMessage::decode(buf.as_slice())?)
let received = HorustMsgMessage::decode(buf.as_slice())?;
debug!("Received message: {:?}", received);
Ok(received)
}
}
Loading

0 comments on commit 4dbd11e

Please sign in to comment.