diff --git a/src/cord.rs b/src/cord.rs index 5ca7a970..05d8e664 100644 --- a/src/cord.rs +++ b/src/cord.rs @@ -49,18 +49,21 @@ impl Cord { pub fn run(&mut self) -> crate::Result<()> { self.start_rpc()?; self.pipe.start()?; - self.start_event_loop(); + self.start_event_loop()?; Ok(()) } - fn start_event_loop(&mut self) { + fn start_event_loop(&mut self) -> crate::Result<()> { for msg in self.rx.iter() { msg.event.on_event(&EventContext { + client_id: msg.client_id, pipe: &self.pipe, rich_client: self.rich_client.clone(), - }); + })?; } + + Ok(()) } fn start_rpc(&self) -> crate::Result<()> { diff --git a/src/ipc/discord/client.rs b/src/ipc/discord/client.rs index 19805ae1..e2cd68db 100644 --- a/src/ipc/discord/client.rs +++ b/src/ipc/discord/client.rs @@ -2,6 +2,7 @@ use crate::ipc::discord::utils; use crate::json::Json; use crate::presence::types::{Activity, Packet}; use std::io::{self, Read, Write}; +use std::sync::atomic::AtomicBool; pub struct RichClient { pub client_id: u64, @@ -11,6 +12,7 @@ pub struct RichClient { pub pipe: Option, pub last_activity: Option, pub pid: u32, + pub is_ready: AtomicBool, } pub trait Connection { diff --git a/src/ipc/discord/platform/unix.rs b/src/ipc/discord/platform/unix.rs index 2d32db76..abd8e7c6 100644 --- a/src/ipc/discord/platform/unix.rs +++ b/src/ipc/discord/platform/unix.rs @@ -28,6 +28,7 @@ impl Connection for RichClient { pipe: Some(pipe), last_activity: None, pid: std::process::id(), + is_ready: false.into(), }) } Err(e) => match e.kind() { diff --git a/src/ipc/discord/platform/windows.rs b/src/ipc/discord/platform/windows.rs index 3cecc4f5..69f00243 100644 --- a/src/ipc/discord/platform/windows.rs +++ b/src/ipc/discord/platform/windows.rs @@ -19,6 +19,7 @@ impl Connection for RichClient { pipe: Some(pipe), last_activity: None, pid: std::process::id(), + is_ready: false.into(), }) } Err(e) => match e.kind() { diff --git a/src/ipc/pipe/platform/unix/server.rs b/src/ipc/pipe/platform/unix/server.rs index c1a9b6ce..342361f3 100644 --- a/src/ipc/pipe/platform/unix/server.rs +++ b/src/ipc/pipe/platform/unix/server.rs @@ -8,9 +8,9 @@ use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; use crate::ipc::pipe::{PipeClientImpl, PipeServerImpl}; -use crate::local_event; use crate::messages::events::local::ErrorEvent; use crate::messages::message::Message; +use crate::{client_event, local_event}; use super::client::PipeClient; @@ -62,6 +62,7 @@ impl PipeServerImpl for PipeServer { Ok((stream, _)) => { let client_id = next_client_id.fetch_add(1, Ordering::SeqCst); let mut client = PipeClient::new(client_id, stream, tx.clone()); + tx.send(client_event!(0, Connect)).ok(); client.start_read_thread().ok(); clients.lock().unwrap().insert(client_id, client); } diff --git a/src/ipc/pipe/platform/windows/client.rs b/src/ipc/pipe/platform/windows/client.rs index 96532cc4..7e2bbb24 100644 --- a/src/ipc/pipe/platform/windows/client.rs +++ b/src/ipc/pipe/platform/windows/client.rs @@ -1,9 +1,13 @@ use std::fs::File; -use std::io::{self, Read, Write}; +use std::io::{self}; +use std::os::windows::io::AsRawHandle; use std::sync::mpsc::Sender; use std::sync::Arc; use std::thread::JoinHandle; +use super::{ + CreateEventW, Overlapped, ReadFile, WaitForSingleObject, WriteFile, INFINITE, WAIT_OBJECT_0, +}; use crate::ipc::pipe::PipeClientImpl; use crate::local_event; use crate::messages::events::client::ClientEvent; @@ -31,40 +35,106 @@ impl PipeClientImpl for PipeClient { } fn write(&mut self, data: &[u8]) -> io::Result<()> { - self.pipe.as_mut().map_or( - Err(io::Error::new(io::ErrorKind::NotFound, "Pipe not found")), - |pipe| pipe.write_all(data), - ) + if let Some(pipe) = &self.pipe { + let handle = pipe.as_raw_handle(); + unsafe { + let mut overlapped = Overlapped { + internal: 0, + internal_high: 0, + offset: 0, + offset_high: 0, + h_event: CreateEventW(std::ptr::null_mut(), 1, 0, std::ptr::null_mut()), + }; + + let mut bytes_written = 0; + let write_result = WriteFile( + handle, + data.as_ptr(), + data.len() as u32, + &mut bytes_written, + &mut overlapped, + ); + + if write_result == 0 { + let error = io::Error::last_os_error(); + if error.raw_os_error() != Some(997) { + return Err(error); + } + } + + if WaitForSingleObject(overlapped.h_event, INFINITE) != WAIT_OBJECT_0 { + return Err(io::Error::last_os_error()); + } + + Ok(()) + } + } else { + Err(io::Error::new(io::ErrorKind::NotFound, "Pipe not found")) + } } fn start_read_thread(&mut self) -> io::Result<()> { - if let Some(mut pipe) = self.pipe.take() { + if let Some(pipe) = self.pipe.as_ref() { + let pipe = pipe.clone(); let tx = self.tx.clone(); let id = self.id; let handle = std::thread::spawn(move || { let mut buf = [0u8; 4096]; + let handle = pipe.as_raw_handle(); + loop { - match pipe.read(&mut buf) { - Ok(0) => { - tx.send(local_event!(id, ClientDisconnected)).ok(); - break; - } - Ok(n) => { - if let Ok(message) = - ClientEvent::deserialize(&String::from_utf8_lossy(&buf[..n])) - { - tx.send(Message::new(id, Event::Client(message))).ok(); + unsafe { + let mut overlapped = Overlapped { + internal: 0, + internal_high: 0, + offset: 0, + offset_high: 0, + h_event: CreateEventW(std::ptr::null_mut(), 1, 0, std::ptr::null_mut()), + }; + + let mut bytes_read = 0; + let read_result = ReadFile( + handle, + buf.as_mut_ptr(), + buf.len() as u32, + &mut bytes_read, + &mut overlapped, + ); + + if read_result == 0 { + let error = io::Error::last_os_error(); + if error.raw_os_error() != Some(997) { + tx.send(local_event!(id, Error, ErrorEvent::new(Box::new(error)))) + .ok(); + break; } } - Err(e) => { - tx.send(local_event!(id, Error, ErrorEvent::new(Box::new(e)))) - .ok(); + + if WaitForSingleObject(overlapped.h_event, INFINITE) != WAIT_OBJECT_0 { + tx.send(local_event!( + id, + Error, + ErrorEvent::new(Box::new(io::Error::last_os_error())) + )) + .ok(); + break; + } + + if bytes_read == 0 { + tx.send(local_event!(id, ClientDisconnected)).ok(); break; } + + if let Ok(message) = ClientEvent::deserialize(&String::from_utf8_lossy( + &buf[..bytes_read as usize], + )) { + tx.send(Message::new(id, Event::Client(message))).ok(); + } } } }); + self.thread_handle = Some(handle); Ok(()) } else { diff --git a/src/ipc/pipe/platform/windows/mod.rs b/src/ipc/pipe/platform/windows/mod.rs index c07f47e0..da0a4004 100644 --- a/src/ipc/pipe/platform/windows/mod.rs +++ b/src/ipc/pipe/platform/windows/mod.rs @@ -1,2 +1,68 @@ +#![allow(clippy::upper_case_acronyms)] + pub mod client; pub mod server; + +pub type HANDLE = *mut std::ffi::c_void; +pub type DWORD = u32; +pub type BOOL = i32; +pub type LPCWSTR = *const u16; +pub type LPVOID = *mut std::ffi::c_void; + +pub const INVALID_HANDLE_VALUE: HANDLE = -1isize as HANDLE; +pub const ERROR_PIPE_CONNECTED: DWORD = 535; +pub const ERROR_IO_PENDING: DWORD = 997; +pub const PIPE_ACCESS_DUPLEX: DWORD = 0x00000003; +pub const FILE_FLAG_OVERLAPPED: DWORD = 0x40000000; +pub const PIPE_TYPE_MESSAGE: DWORD = 0x00000004; +pub const PIPE_READMODE_MESSAGE: DWORD = 0x00000002; +pub const PIPE_UNLIMITED_INSTANCES: DWORD = 255; +pub const WAIT_OBJECT_0: DWORD = 0; +pub const INFINITE: DWORD = 0xFFFFFFFF; + +#[repr(C)] +pub struct Overlapped { + pub internal: usize, + pub internal_high: usize, + pub offset: DWORD, + pub offset_high: DWORD, + pub h_event: HANDLE, +} + +extern "system" { + pub fn CreateNamedPipeW( + lpName: LPCWSTR, + dwOpenMode: DWORD, + dwPipeMode: DWORD, + nMaxInstances: DWORD, + nOutBufferSize: DWORD, + nInBufferSize: DWORD, + nDefaultTimeOut: DWORD, + lpSecurityAttributes: LPVOID, + ) -> HANDLE; + + pub fn ConnectNamedPipe(hNamedPipe: HANDLE, lpOverlapped: LPVOID) -> BOOL; + pub fn GetLastError() -> DWORD; + pub fn CloseHandle(hObject: HANDLE) -> BOOL; + pub fn CreateEventW( + lpEventAttributes: LPVOID, + bManualReset: BOOL, + bInitialState: BOOL, + lpName: LPCWSTR, + ) -> HANDLE; + pub fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD; + pub fn WriteFile( + hFile: HANDLE, + lpBuffer: *const u8, + nNumberOfBytesToWrite: DWORD, + lpNumberOfBytesWritten: *mut DWORD, + lpOverlapped: *mut Overlapped, + ) -> BOOL; + pub fn ReadFile( + hFile: HANDLE, + lpBuffer: *mut u8, + nNumberOfBytesToRead: DWORD, + lpNumberOfBytesRead: *mut DWORD, + lpOverlapped: *mut Overlapped, + ) -> BOOL; +} diff --git a/src/ipc/pipe/platform/windows/server.rs b/src/ipc/pipe/platform/windows/server.rs index 67ae8368..ee4e572d 100644 --- a/src/ipc/pipe/platform/windows/server.rs +++ b/src/ipc/pipe/platform/windows/server.rs @@ -9,43 +9,19 @@ use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; +use super::{ + client::PipeClient, CreateEventW, CreateNamedPipeW, Overlapped, + FILE_FLAG_OVERLAPPED, HANDLE, INVALID_HANDLE_VALUE, LPVOID, PIPE_ACCESS_DUPLEX, + PIPE_READMODE_MESSAGE, PIPE_TYPE_MESSAGE, PIPE_UNLIMITED_INSTANCES, WAIT_OBJECT_0, +}; +use super::{ + CloseHandle, ConnectNamedPipe, GetLastError, WaitForSingleObject, ERROR_IO_PENDING, + ERROR_PIPE_CONNECTED, INFINITE, +}; use crate::ipc::pipe::{PipeClientImpl, PipeServerImpl}; -use crate::local_event; use crate::messages::events::local::ErrorEvent; use crate::messages::message::Message; - -use super::client::PipeClient; - -type HANDLE = *mut std::ffi::c_void; -type DWORD = u32; -type BOOL = i32; -type LPCWSTR = *const u16; -type LPVOID = *mut std::ffi::c_void; - -const INVALID_HANDLE_VALUE: HANDLE = -1isize as HANDLE; -const ERROR_PIPE_CONNECTED: DWORD = 535; -const PIPE_ACCESS_DUPLEX: DWORD = 0x00000003; -const PIPE_TYPE_MESSAGE: DWORD = 0x00000004; -const PIPE_READMODE_MESSAGE: DWORD = 0x00000002; -const PIPE_WAIT: DWORD = 0x00000000; -const PIPE_UNLIMITED_INSTANCES: DWORD = 255; - -extern "system" { - fn CreateNamedPipeW( - lpName: LPCWSTR, - dwOpenMode: DWORD, - dwPipeMode: DWORD, - nMaxInstances: DWORD, - nOutBufferSize: DWORD, - nInBufferSize: DWORD, - nDefaultTimeOut: DWORD, - lpSecurityAttributes: LPVOID, - ) -> HANDLE; - - fn ConnectNamedPipe(hNamedPipe: HANDLE, lpOverlapped: LPVOID) -> BOOL; - fn GetLastError() -> DWORD; - fn CloseHandle(hObject: HANDLE) -> BOOL; -} +use crate::{client_event, local_event}; pub struct PipeServer { pipe_name: String, @@ -83,10 +59,34 @@ impl PipeServerImpl for PipeServer { while running.load(Ordering::SeqCst) { if let Ok(handle) = PipeServer::create_pipe_instance(&pipe_name) { unsafe { - if ConnectNamedPipe(handle, std::ptr::null_mut()) == 0 { + let h_event = + CreateEventW(std::ptr::null_mut(), 1, 0, std::ptr::null_mut()); + if h_event.is_null() { + CloseHandle(handle); + tx.send(local_event!( + 0, + Error, + ErrorEvent::new(Box::new(io::Error::last_os_error())) + )) + .ok(); + continue; + } + + let mut overlapped = Overlapped { + internal: 0, + internal_high: 0, + offset: 0, + offset_high: 0, + h_event, + }; + + let connect_result = + ConnectNamedPipe(handle, &mut overlapped as *mut _ as LPVOID); + if connect_result == 0 { let error = GetLastError(); - if error != ERROR_PIPE_CONNECTED { + if error != ERROR_IO_PENDING && error != ERROR_PIPE_CONNECTED { CloseHandle(handle); + CloseHandle(h_event); tx.send(local_event!( 0, Error, @@ -99,6 +99,12 @@ impl PipeServerImpl for PipeServer { } } + if WaitForSingleObject(overlapped.h_event, INFINITE) != WAIT_OBJECT_0 { + CloseHandle(handle); + CloseHandle(h_event); + continue; + } + let client_id = next_client_id.fetch_add(1, Ordering::SeqCst); let mut client = PipeClient::new( client_id, @@ -106,7 +112,10 @@ impl PipeServerImpl for PipeServer { tx.clone(), ); client.start_read_thread().ok(); + tx.send(client_event!(client_id, Connect)).ok(); clients.lock().unwrap().insert(client_id, client); + + CloseHandle(h_event); } } } @@ -165,8 +174,8 @@ impl PipeServer { let handle = unsafe { CreateNamedPipeW( wide_name.as_ptr(), - PIPE_ACCESS_DUPLEX, - PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE, PIPE_UNLIMITED_INSTANCES, 1024 * 16, 1024 * 16, diff --git a/src/main.rs b/src/main.rs index e11f18df..4caeef14 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,5 +17,7 @@ use error::Result; fn main() -> Result<()> { let client_id = args().nth(1).ok_or("Missing client ID")?.parse::()?; - Cord::new("cord-ipc", client_id)?.run().map_err(Into::into) + Cord::new("cord-ipc", client_id)?.run().unwrap(); + + Ok(()) } diff --git a/src/messages/events/event.rs b/src/messages/events/event.rs index c1bf1137..1ef011b8 100644 --- a/src/messages/events/event.rs +++ b/src/messages/events/event.rs @@ -12,6 +12,7 @@ pub enum Event { } pub struct EventContext<'a> { + pub client_id: u32, pub pipe: &'a PipeServer, pub rich_client: Arc, } diff --git a/src/messages/events/server/ready.rs b/src/messages/events/server/ready.rs index ff191ca2..dd01d637 100644 --- a/src/messages/events/server/ready.rs +++ b/src/messages/events/server/ready.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::Ordering; + use crate::ipc::pipe::PipeServerImpl; use crate::messages::events::event::EventContext; use crate::{ @@ -13,7 +15,9 @@ pub struct ReadyEvent; impl OnEvent for ReadyEvent { fn on_event(self, ctx: &EventContext) -> crate::Result<()> { - ctx.pipe.broadcast(Json::serialize(&self)?.as_bytes())?; + if !ctx.rich_client.is_ready.swap(true, Ordering::SeqCst) { + ctx.pipe.broadcast(Json::serialize(&self)?.as_bytes())?; + } Ok(()) }