Skip to content

Commit

Permalink
feat: implement iterations - a maximum amount of time the server can …
Browse files Browse the repository at this point in the history
…idle without active connections
  • Loading branch information
vyfor committed Dec 6, 2024
1 parent 29938ed commit b6e8955
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 23 deletions.
24 changes: 18 additions & 6 deletions src/cord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ pub struct Cord {
pub pipe: PipeServer,
pub tx: Sender<Message>,
pub rx: Receiver<Message>,
pub iterations: u32,
pub max_iterations: u32,
}

impl Cord {
pub fn new(pipe_name: &str, client_id: u64) -> crate::Result<Self> {
pub fn new(pipe_name: &str, client_id: u64, max_iterations: u32) -> crate::Result<Self> {
let (tx, rx) = mpsc::channel::<Message>();
let rich_client = Arc::new(RichClient::connect(client_id)?);
let server = PipeServer::new(pipe_name, tx.clone());
Expand All @@ -40,6 +42,8 @@ impl Cord {
pipe: server,
tx,
rx,
iterations: 0,
max_iterations,
})
}

Expand All @@ -52,11 +56,19 @@ impl Cord {
}

fn start_event_loop(&mut self) -> crate::Result<()> {
while let Ok(msg) = self.rx.recv() {
msg.event.on_event(&mut EventContext {
cord: self,
client_id: msg.client_id,
})?;
loop {
if let Ok(msg) = self.rx.recv_timeout(std::time::Duration::from_millis(1000)) {
self.iterations = 0;
msg.event.on_event(&mut EventContext {
cord: self,
client_id: msg.client_id,
})?;
} else if self.pipe.clients.read().unwrap().is_empty() {
self.iterations += 1;
if self.iterations >= self.max_iterations {
break;
}
}
}

Ok(())
Expand Down
16 changes: 8 additions & 8 deletions src/ipc/pipe/platform/unix/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::os::unix::net::UnixListener;
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;

use crate::ipc::pipe::{PipeClientImpl, PipeServerImpl};
Expand All @@ -15,9 +15,9 @@ use crate::{client_event, local_event};
use super::client::PipeClient;

pub struct PipeServer {
pub clients: Arc<RwLock<HashMap<u32, PipeClient>>>,
pipe_name: String,
tx: Sender<Message>,
clients: Arc<Mutex<HashMap<u32, PipeClient>>>,
next_client_id: Arc<AtomicU32>,
running: Arc<AtomicBool>,
listener: Option<UnixListener>,
Expand All @@ -27,9 +27,9 @@ pub struct PipeServer {
impl PipeServerImpl for PipeServer {
fn new(pipe_name: &str, tx: Sender<Message>) -> Self {
Self {
clients: Arc::new(RwLock::new(HashMap::new())),
pipe_name: pipe_name.to_string(),
tx,
clients: Arc::new(Mutex::new(HashMap::new())),
next_client_id: Arc::new(AtomicU32::new(1)),
running: Arc::new(AtomicBool::new(false)),
listener: None,
Expand Down Expand Up @@ -64,7 +64,7 @@ impl PipeServerImpl for PipeServer {
let mut client = PipeClient::new(client_id, stream, tx.clone());
tx.send(client_event!(0, Connect)).ok();
client.start_read_thread().ok();
clients.lock().unwrap().insert(client_id, client);
clients.write().unwrap().insert(client_id, client);
}
Err(e) => {
tx.send(local_event!(0, Error, ErrorEvent::new(Box::new(e))))
Expand All @@ -88,11 +88,11 @@ impl PipeServerImpl for PipeServer {
if let Some(handle) = self.thread_handle.take() {
drop(handle);
}
self.clients.lock().unwrap().clear();
self.clients.write().unwrap().clear();
}

fn broadcast(&self, data: &[u8]) -> io::Result<()> {
let mut clients = self.clients.lock().unwrap();
let mut clients = self.clients.write().unwrap();
let mut failed_clients = Vec::new();

for (client_id, client) in clients.iter_mut() {
Expand All @@ -110,15 +110,15 @@ impl PipeServerImpl for PipeServer {

fn write_to(&self, client_id: u32, data: &[u8]) -> io::Result<()> {
self.clients
.lock()
.write()
.unwrap()
.get_mut(&client_id)
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "Client not found"))?
.write(data)
}

fn disconnect(&self, client_id: u32) -> io::Result<()> {
self.clients.lock().unwrap().remove(&client_id);
self.clients.write().unwrap().remove(&client_id);
Ok(())
}
}
Expand Down
16 changes: 8 additions & 8 deletions src/ipc/pipe/platform/windows/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::io;
use std::os::windows::io::FromRawHandle;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;

use super::{
Expand All @@ -24,9 +24,9 @@ use crate::messages::message::Message;
use crate::{client_event, local_event};

pub struct PipeServer {
pub clients: Arc<RwLock<HashMap<u32, PipeClient>>>,
pipe_name: String,
tx: Sender<Message>,
clients: Arc<Mutex<HashMap<u32, PipeClient>>>,
next_client_id: Arc<AtomicU32>,
running: Arc<AtomicBool>,
thread_handle: Option<JoinHandle<()>>,
Expand All @@ -35,9 +35,9 @@ pub struct PipeServer {
impl PipeServerImpl for PipeServer {
fn new(pipe_name: &str, tx: Sender<Message>) -> Self {
Self {
clients: Arc::new(RwLock::new(HashMap::new())),
pipe_name: pipe_name.to_string(),
tx,
clients: Arc::new(Mutex::new(HashMap::new())),
next_client_id: Arc::new(AtomicU32::new(1)),
running: Arc::new(AtomicBool::new(false)),
thread_handle: None,
Expand Down Expand Up @@ -113,7 +113,7 @@ impl PipeServerImpl for PipeServer {
);
client.start_read_thread().ok();
tx.send(client_event!(client_id, Connect)).ok();
clients.lock().unwrap().insert(client_id, client);
clients.write().unwrap().insert(client_id, client);

CloseHandle(h_event);
}
Expand All @@ -129,11 +129,11 @@ impl PipeServerImpl for PipeServer {
if let Some(handle) = self.thread_handle.take() {
drop(handle);
}
self.clients.lock().unwrap().clear();
self.clients.write().unwrap().clear();
}

fn broadcast(&self, data: &[u8]) -> io::Result<()> {
let mut clients = self.clients.lock().unwrap();
let mut clients = self.clients.write().unwrap();
let mut failed_clients = Vec::new();

for (client_id, client) in clients.iter_mut() {
Expand All @@ -151,15 +151,15 @@ impl PipeServerImpl for PipeServer {

fn write_to(&self, client_id: u32, data: &[u8]) -> io::Result<()> {
self.clients
.lock()
.write()
.unwrap()
.get_mut(&client_id)
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "Client not found"))?
.write(data)
}

fn disconnect(&self, client_id: u32) -> io::Result<()> {
self.clients.lock().unwrap().remove(&client_id);
self.clients.write().unwrap().remove(&client_id);
Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ use error::Result;
fn main() -> Result<()> {
let client_id = args().nth(1).ok_or("Missing client ID")?.parse::<u64>()?;

Cord::new("cord-ipc", client_id)?.run()
Cord::new("cord-ipc", client_id, 30)?.run()
}

0 comments on commit b6e8955

Please sign in to comment.