Skip to content

Commit

Permalink
fix(windows): use overlapped io for named pipes to avoid deadlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
vyfor committed Dec 6, 2024
1 parent 2e7d5d4 commit 2f73ce8
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 63 deletions.
9 changes: 6 additions & 3 deletions src/cord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,21 @@ impl Cord {
pub fn run(&mut self) -> crate::Result<()> {
self.start_rpc()?;
self.pipe.start()?;
self.start_event_loop();
self.start_event_loop()?;

Ok(())
}

fn start_event_loop(&mut self) {
fn start_event_loop(&mut self) -> crate::Result<()> {
for msg in self.rx.iter() {
msg.event.on_event(&EventContext {
client_id: msg.client_id,
pipe: &self.pipe,
rich_client: self.rich_client.clone(),
});
})?;
}

Ok(())
}

fn start_rpc(&self) -> crate::Result<()> {
Expand Down
2 changes: 2 additions & 0 deletions src/ipc/discord/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::ipc::discord::utils;
use crate::json::Json;
use crate::presence::types::{Activity, Packet};
use std::io::{self, Read, Write};
use std::sync::atomic::AtomicBool;

pub struct RichClient {
pub client_id: u64,
Expand All @@ -11,6 +12,7 @@ pub struct RichClient {
pub pipe: Option<std::os::unix::net::UnixStream>,
pub last_activity: Option<Activity>,
pub pid: u32,
pub is_ready: AtomicBool,
}

pub trait Connection {
Expand Down
1 change: 1 addition & 0 deletions src/ipc/discord/platform/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl Connection for RichClient {
pipe: Some(pipe),
last_activity: None,
pid: std::process::id(),
is_ready: false.into(),
})
}
Err(e) => match e.kind() {
Expand Down
1 change: 1 addition & 0 deletions src/ipc/discord/platform/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ impl Connection for RichClient {
pipe: Some(pipe),
last_activity: None,
pid: std::process::id(),
is_ready: false.into(),
})
}
Err(e) => match e.kind() {
Expand Down
3 changes: 2 additions & 1 deletion src/ipc/pipe/platform/unix/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;

use crate::ipc::pipe::{PipeClientImpl, PipeServerImpl};
use crate::local_event;
use crate::messages::events::local::ErrorEvent;
use crate::messages::message::Message;
use crate::{client_event, local_event};

use super::client::PipeClient;

Expand Down Expand Up @@ -62,6 +62,7 @@ impl PipeServerImpl for PipeServer {
Ok((stream, _)) => {
let client_id = next_client_id.fetch_add(1, Ordering::SeqCst);
let mut client = PipeClient::new(client_id, stream, tx.clone());
tx.send(client_event!(0, Connect)).ok();
client.start_read_thread().ok();
clients.lock().unwrap().insert(client_id, client);
}
Expand Down
108 changes: 89 additions & 19 deletions src/ipc/pipe/platform/windows/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::fs::File;
use std::io::{self, Read, Write};
use std::io::{self};
use std::os::windows::io::AsRawHandle;
use std::sync::mpsc::Sender;
use std::sync::Arc;
use std::thread::JoinHandle;

use super::{
CreateEventW, Overlapped, ReadFile, WaitForSingleObject, WriteFile, INFINITE, WAIT_OBJECT_0,
};
use crate::ipc::pipe::PipeClientImpl;
use crate::local_event;
use crate::messages::events::client::ClientEvent;
Expand Down Expand Up @@ -31,40 +35,106 @@ impl PipeClientImpl for PipeClient {
}

fn write(&mut self, data: &[u8]) -> io::Result<()> {
self.pipe.as_mut().map_or(
Err(io::Error::new(io::ErrorKind::NotFound, "Pipe not found")),
|pipe| pipe.write_all(data),
)
if let Some(pipe) = &self.pipe {
let handle = pipe.as_raw_handle();
unsafe {
let mut overlapped = Overlapped {
internal: 0,
internal_high: 0,
offset: 0,
offset_high: 0,
h_event: CreateEventW(std::ptr::null_mut(), 1, 0, std::ptr::null_mut()),
};

let mut bytes_written = 0;
let write_result = WriteFile(
handle,
data.as_ptr(),
data.len() as u32,
&mut bytes_written,
&mut overlapped,
);

if write_result == 0 {
let error = io::Error::last_os_error();
if error.raw_os_error() != Some(997) {
return Err(error);
}
}

if WaitForSingleObject(overlapped.h_event, INFINITE) != WAIT_OBJECT_0 {
return Err(io::Error::last_os_error());
}

Ok(())
}
} else {
Err(io::Error::new(io::ErrorKind::NotFound, "Pipe not found"))
}
}

fn start_read_thread(&mut self) -> io::Result<()> {
if let Some(mut pipe) = self.pipe.take() {
if let Some(pipe) = self.pipe.as_ref() {
let pipe = pipe.clone();
let tx = self.tx.clone();
let id = self.id;

let handle = std::thread::spawn(move || {
let mut buf = [0u8; 4096];
let handle = pipe.as_raw_handle();

loop {
match pipe.read(&mut buf) {
Ok(0) => {
tx.send(local_event!(id, ClientDisconnected)).ok();
break;
}
Ok(n) => {
if let Ok(message) =
ClientEvent::deserialize(&String::from_utf8_lossy(&buf[..n]))
{
tx.send(Message::new(id, Event::Client(message))).ok();
unsafe {
let mut overlapped = Overlapped {
internal: 0,
internal_high: 0,
offset: 0,
offset_high: 0,
h_event: CreateEventW(std::ptr::null_mut(), 1, 0, std::ptr::null_mut()),
};

let mut bytes_read = 0;
let read_result = ReadFile(
handle,
buf.as_mut_ptr(),
buf.len() as u32,
&mut bytes_read,
&mut overlapped,
);

if read_result == 0 {
let error = io::Error::last_os_error();
if error.raw_os_error() != Some(997) {
tx.send(local_event!(id, Error, ErrorEvent::new(Box::new(error))))
.ok();
break;
}
}
Err(e) => {
tx.send(local_event!(id, Error, ErrorEvent::new(Box::new(e))))
.ok();

if WaitForSingleObject(overlapped.h_event, INFINITE) != WAIT_OBJECT_0 {
tx.send(local_event!(
id,
Error,
ErrorEvent::new(Box::new(io::Error::last_os_error()))
))
.ok();
break;
}

if bytes_read == 0 {
tx.send(local_event!(id, ClientDisconnected)).ok();
break;
}

if let Ok(message) = ClientEvent::deserialize(&String::from_utf8_lossy(
&buf[..bytes_read as usize],
)) {
tx.send(Message::new(id, Event::Client(message))).ok();
}
}
}
});

self.thread_handle = Some(handle);
Ok(())
} else {
Expand Down
66 changes: 66 additions & 0 deletions src/ipc/pipe/platform/windows/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,68 @@
#![allow(clippy::upper_case_acronyms)]

pub mod client;
pub mod server;

pub type HANDLE = *mut std::ffi::c_void;
pub type DWORD = u32;
pub type BOOL = i32;
pub type LPCWSTR = *const u16;
pub type LPVOID = *mut std::ffi::c_void;

pub const INVALID_HANDLE_VALUE: HANDLE = -1isize as HANDLE;
pub const ERROR_PIPE_CONNECTED: DWORD = 535;
pub const ERROR_IO_PENDING: DWORD = 997;
pub const PIPE_ACCESS_DUPLEX: DWORD = 0x00000003;
pub const FILE_FLAG_OVERLAPPED: DWORD = 0x40000000;
pub const PIPE_TYPE_MESSAGE: DWORD = 0x00000004;
pub const PIPE_READMODE_MESSAGE: DWORD = 0x00000002;
pub const PIPE_UNLIMITED_INSTANCES: DWORD = 255;
pub const WAIT_OBJECT_0: DWORD = 0;
pub const INFINITE: DWORD = 0xFFFFFFFF;

#[repr(C)]
pub struct Overlapped {
pub internal: usize,
pub internal_high: usize,
pub offset: DWORD,
pub offset_high: DWORD,
pub h_event: HANDLE,
}

extern "system" {
pub fn CreateNamedPipeW(
lpName: LPCWSTR,
dwOpenMode: DWORD,
dwPipeMode: DWORD,
nMaxInstances: DWORD,
nOutBufferSize: DWORD,
nInBufferSize: DWORD,
nDefaultTimeOut: DWORD,
lpSecurityAttributes: LPVOID,
) -> HANDLE;

pub fn ConnectNamedPipe(hNamedPipe: HANDLE, lpOverlapped: LPVOID) -> BOOL;
pub fn GetLastError() -> DWORD;
pub fn CloseHandle(hObject: HANDLE) -> BOOL;
pub fn CreateEventW(
lpEventAttributes: LPVOID,
bManualReset: BOOL,
bInitialState: BOOL,
lpName: LPCWSTR,
) -> HANDLE;
pub fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD;
pub fn WriteFile(
hFile: HANDLE,
lpBuffer: *const u8,
nNumberOfBytesToWrite: DWORD,
lpNumberOfBytesWritten: *mut DWORD,
lpOverlapped: *mut Overlapped,
) -> BOOL;
pub fn ReadFile(
hFile: HANDLE,
lpBuffer: *mut u8,
nNumberOfBytesToRead: DWORD,
lpNumberOfBytesRead: *mut DWORD,
lpOverlapped: *mut Overlapped,
) -> BOOL;
}
Loading

0 comments on commit 2f73ce8

Please sign in to comment.