Skip to content

Commit

Permalink
Keep uds implementation sync to avoid pulling tokio runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
FedericoPonzi committed Nov 2, 2024
1 parent 827f854 commit 102f0e9
Show file tree
Hide file tree
Showing 13 changed files with 298 additions and 246 deletions.
5 changes: 2 additions & 3 deletions commands/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ serde = { version = "~1.0", features = ["derive"] }
serde_json = "~1.0"
prost = "~0.13"
anyhow = "~1.0"
tokio = { version = "~1.41", features = ["full"] }
tracing = "0.1"
log = "~0.4"

[dev-dependencies]
tracing-test = { version = "0.2", features = ["no-env-filter"] }
env_logger = "~0.10"

[build-dependencies]
prost-build = { version = "~0.13" }
1 change: 0 additions & 1 deletion commands/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
## Compile

To compile this crate, you will need protobuf compiler. On debian-like you can run:

apt-get install protobuf-compiler
53 changes: 34 additions & 19 deletions commands/src/commands.proto
Original file line number Diff line number Diff line change
@@ -1,26 +1,41 @@
syntax = "proto3";
package tutorial;
package messages;

message Person {
string name = 1;
int32 id = 2; // Unique ID number for this person.
string email = 3;

enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
message HorustMsgMessage {
oneof request_type {
HorustMsgServiceStatusRequest status_request = 1;
HorustMsgServiceStatusResponse status_response = 2;
HorustMsgServiceChangeRequest change_request = 3;
}
}
message HorustMsgError {

message PhoneNumber {
string number = 1;
PhoneType type = 2;
}
}

message HorustMsgServiceStatusRequest {
string service_name = 1;
}

repeated PhoneNumber phones = 4;
message HorustMsgServiceStatusResponse {
string service_name = 1;
HorustMsgServiceStatus service_status = 2;
}

// Our address book file is just one of these.
message AddressBook {
repeated Person people = 1;
}
message HorustMsgServiceChangeRequest {
string service_name = 1;
HorustMsgServiceStatus service_status = 2;
}

// return the current status - similar to HorustServiceStatusReponse.
message HorustMsgServiceChangeResponse {
string service_name = 1;
HorustMsgServiceStatus service_status = 2;
}

enum HorustMsgServiceStatus {
STOPPED = 0;
STARTED = 1;
KILLED = 2;
SHUTTING_DOWN = 3;
RUNNING = 4;
}
197 changes: 111 additions & 86 deletions commands/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,107 +1,132 @@
extern crate core;

mod proto;

use anyhow::{bail, Context, Result};
use crate::proto::messages::horust_msg_message::RequestType;
pub use crate::proto::messages::HorustMsgServiceStatus;
use crate::proto::messages::{
HorustMsgMessage, HorustMsgServiceStatusRequest, HorustMsgServiceStatusResponse,
};
use anyhow::{anyhow, Context, Result};
use log::{error, info};
use prost::Message;
use std::io::{Read, Write};
use std::net::Shutdown;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::{Path, PathBuf};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{UnixListener, UnixStream};
use tracing::info;

pub struct CommandsUdsConnectionHandler {
socket: UnixStream,
}
impl CommandsUdsConnectionHandler {
fn get_path(socket_folder: &Path, socket_name: u32) -> PathBuf {
socket_folder.join(format!("hourst-{socket_name}.sock"))
}
fn new(socket: UnixStream) -> Self {
Self { socket }
pub trait CommandsHandlerTrait {
// "blocking" - execute in its own tokio task.
fn start(&mut self) -> Result<()> {
// put the server logic in a loop to accept several connections
loop {
self.accept().expect("TODO: panic message");
}
}
pub async fn new_client(socket_path: PathBuf) -> Result<Self> {
Ok(Self {
socket: UnixStream::connect(socket_path)
.await
.context("Could not create stream")?,
})
fn get_unix_listener(&mut self) -> &mut UnixListener;
fn accept(&mut self) -> Result<()> {
match self.get_unix_listener().accept() {
Ok((stream, _addr)) => {
let conn_handler = UdsConnectionHandler::new(stream);
if let Err(err) = self.handle_connection(conn_handler) {
error!("Error handling connection: {}", err);
}
}
Err(e) => {
error!("Error accepting connction: {e} - you might need to restart Horust.");
}
};
Ok(())
}

pub async fn client(mut self) -> Result<()> {
info!("client: sending data");
self.socket
.write_all(b"Hello?")
.await // we write bytes, &[u8]
.context("Failed at writing onto the unix stream")?;
info!("client: Completed.");
// server is waiting for EOF.
self.socket.shutdown().await?;

let mut buf = String::new();
info!("client: reading back:");
//Reads all bytes until EOF in this source, appending them to buf.
self.socket
.read_to_string(&mut buf)
.await // we write bytes, &[u8]
.context("Failed at writing onto the unix stream")?;
info!("Client received: {}", buf);
fn handle_connection(&self, mut uds_conn_handler: UdsConnectionHandler) -> Result<()> {
let received = uds_conn_handler
.receive_message()?
.request_type
.ok_or(anyhow!("No request found in message sent from client."))?;
match received {
RequestType::StatusRequest(status_request) => {
info!("Requested status for {}", status_request.service_name);
let service_status = self.get_service_status(status_request.service_name.clone());
uds_conn_handler.send_message(new_horust_msg_service_status_response(
status_request.service_name,
service_status,
))?;
}
RequestType::StatusResponse(_) => {}
RequestType::ChangeRequest(_) => {}
};
Ok(())
}

pub async fn server(mut self) -> Result<()> {
let mut message = String::new();
info!("Server: receving data");
// Reads all bytes until EOF in this source, appending them to buf.
self.socket
.read_to_string(&mut message)
.await
.context("Failed at reading the unix stream")?;
info!("Server: Received data: {message}");
self.socket
.write_all(message.as_bytes())
.await
.context("Failed at reading the unix stream")?;
fn get_service_status(&self, service_name: String) -> HorustMsgServiceStatus;
}

info!("Server: has written back {}", message);
Ok(())
pub fn new_horust_msg_service_status_response(
service_name: String,
status: HorustMsgServiceStatus,
) -> HorustMsgMessage {
HorustMsgMessage {
request_type: Some(RequestType::StatusResponse(
HorustMsgServiceStatusResponse {
service_name,
service_status: status.into(),
},
)),
}
}
pub struct CommandsUdsServer {
unix_listener: UnixListener,

pub struct ClientHandler {
uds_connection_handler: UdsConnectionHandler,
}
impl CommandsUdsServer {
pub async fn new(socket_path: &Path) -> Result<Self> {
impl ClientHandler {
pub fn new_client(socket_path: PathBuf) -> Result<Self> {
Ok(Self {
unix_listener: UnixListener::bind(socket_path)
.context("Could not create the unix socket")?,
uds_connection_handler: UdsConnectionHandler::new(
UnixStream::connect(socket_path).context("Could not create stream")?,
),
})
}
pub async fn start(&mut self) -> Result<()> {
// put the server logic in a loop to accept several connections
loop {
self.accept().await?;
}
Ok(())
}
pub async fn accept(&mut self) -> Result<()> {
match self.unix_listener.accept().await {
Ok((stream, _addr)) => {
tokio::spawn(async move {
CommandsUdsConnectionHandler::new(stream)
.server()
.await
.unwrap();
})
.await?
}
Err(e) => {
bail!("error accepting connction: {e}")
}

pub fn client(mut self, service_name: String) -> Result<()> {
let status = HorustMsgMessage {
request_type: Some(RequestType::StatusRequest(HorustMsgServiceStatusRequest {
service_name,
})),
};
self.uds_connection_handler.send_message(status)?;
// server is waiting for EOF.
self.uds_connection_handler
.socket
.shutdown(Shutdown::Write)?;
//Reads all bytes until EOF in this source, appending them to buf.
let received = self.uds_connection_handler.receive_message()?;
info!("Client: received: {received:?}");
Ok(())
}
}

fn create_uds() {}

fn listen_uds() {}

fn send_message() {}
fn receive_message() {}
pub struct UdsConnectionHandler {
socket: UnixStream,
}
impl UdsConnectionHandler {
fn get_path(socket_folder: &Path, socket_name: u32) -> PathBuf {
socket_folder.join(format!("hourst-{socket_name}.sock"))
}
fn new(socket: UnixStream) -> Self {
Self { socket }
}
pub fn send_message(&mut self, message: HorustMsgMessage) -> Result<()> {
let mut buf = Vec::new();
// Serialize the message into a byte array.
message.encode(&mut buf)?;
self.socket
.write_all(&buf)
.context("Failed at writing onto the unix stream")?;
Ok(())
}
pub fn receive_message(&mut self) -> Result<HorustMsgMessage> {
let mut buf = Vec::new();
self.socket.read_to_end(&mut buf)?;
Ok(HorustMsgMessage::decode(buf.as_slice())?)
}
}
2 changes: 1 addition & 1 deletion commands/src/proto/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@

pub mod messages;
64 changes: 48 additions & 16 deletions commands/tests/simple.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,63 @@
use anyhow::Result;
use horust_commands_lib::{CommandsUdsConnectionHandler, CommandsUdsServer};
use std::os::unix::net::UnixListener;

use horust_commands_lib::{ClientHandler, CommandsHandlerTrait, HorustMsgServiceStatus};
use log::info;
use std::path::PathBuf;
use tracing::info;
use tracing_test::traced_test;
use std::sync::{Arc, Barrier};
use std::thread;

struct MockCommandsHandler {
unix_listener: UnixListener,
}
impl MockCommandsHandler {
// full socket path (not the folder).
pub fn new(socket_path: PathBuf) -> Self {
Self {
unix_listener: UnixListener::bind(socket_path).unwrap(),
}
}
}
impl CommandsHandlerTrait for MockCommandsHandler {
fn get_unix_listener(&mut self) -> &mut UnixListener {
&mut self.unix_listener
}

#[tokio::test]
#[traced_test]
async fn test_simple() -> Result<()> {
fn get_service_status(&self, service_name: String) -> HorustMsgServiceStatus {
match service_name.as_str() {
"Running" => HorustMsgServiceStatus::Running,
_ => unimplemented!(),
}
}
}
fn init() {
let _ = env_logger::builder().is_test(true).try_init();
}
#[test]
fn test_simple() -> Result<()> {
info!("Starting");
init();

let socket_path: PathBuf = "/tmp/simple.sock".into();
if socket_path.exists() {
std::fs::remove_file(&socket_path)?;
}
let socket_path2 = socket_path.clone();
let s_handle = tokio::spawn(async move {
let mut uds = CommandsUdsServer::new(&socket_path2).await.unwrap();
let barrier_server = Arc::new(Barrier::new(2));
let barrier_client = Arc::clone(&barrier_server);
let s_handle = thread::spawn(move || {
let mut uds = MockCommandsHandler::new(socket_path2);
info!("uds created");
uds.accept().await.unwrap();
barrier_server.wait();
uds.accept().unwrap();
});
let c_handle = tokio::spawn(async {
let client = CommandsUdsConnectionHandler::new_client(socket_path)
.await
.unwrap();
client.client().await.unwrap();

let c_handle = thread::spawn(move || {
barrier_client.wait();
let client = ClientHandler::new_client(socket_path).unwrap();
client.client("Running".into()).unwrap();
});
s_handle.await?;
c_handle.await?;
s_handle.join().unwrap();
c_handle.join().unwrap();
Ok(())
}
1 change: 1 addition & 0 deletions horust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ shellexpand = "~3.1"
anyhow = "~1.0"
thiserror = "~1.0"
bytefmt = "0.1.7"
horust-commands-lib = {path = "../commands"}

[features]
default = ["http-healthcheck"]
Expand Down
Loading

0 comments on commit 102f0e9

Please sign in to comment.