Skip to content

Commit

Permalink
feat(ipc): continuously read data from Discord pipe to detect sudden …
Browse files Browse the repository at this point in the history
…disconnects
  • Loading branch information
vyfor committed Dec 30, 2024
1 parent 2b0fc96 commit 807291f
Show file tree
Hide file tree
Showing 15 changed files with 624 additions and 187 deletions.
28 changes: 4 additions & 24 deletions src/cord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ use crate::ipc::discord::client::{Connection, RichClient};
use crate::ipc::pipe::platform::server::PipeServer;
use crate::ipc::pipe::PipeServerImpl;
use crate::messages::events::event::{EventContext, OnEvent};
use crate::messages::events::local::ErrorEvent;
use crate::messages::events::server::LogEvent;
use crate::messages::message::Message;
use crate::protocol::msgpack::MsgPack;
use crate::session::SessionManager;
use crate::util::lockfile::ServerLock;
use crate::util::logger::{LogLevel, Logger};
use crate::{client_event, local_event, server_event};

/// Core application managing configuration, sessions, IPC with Discord, and logging.
///
Expand Down Expand Up @@ -115,31 +113,13 @@ impl Cord {
}

/// Starts RPC with Discord.
pub fn start_rpc(&self) -> crate::Result<()> {
pub fn start_rpc(&mut self) -> crate::Result<()> {
self.rich_client.handshake()?;
let rich_client = self.rich_client.clone();
let tx = self.tx.clone();
let logger = self.logger.clone();
std::thread::spawn(move || match rich_client.read() {
Ok(msg) => {
let msg = String::from_utf8_lossy(&msg);

if msg.contains("Invalid Client ID") {
logger.log(
LogLevel::Error,
format!("Invalid client ID: {}", msg).into(),
0,
);
tx.send(client_event!(0, Shutdown)).ok();
} else {
tx.send(server_event!(0, Ready)).ok();
}
}
Err(e) => {
tx.send(local_event!(0, Error, ErrorEvent::new(e.into())))
.ok();
}
});
Arc::get_mut(&mut self.rich_client)
.expect("Failed to start read thread")
.start_read_thread(tx.clone())?;

Ok(())
}
Expand Down
9 changes: 9 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::string::ParseError;
use std::{fmt, io};

use crate::cli::error::CliError;
use crate::ipc::discord::error::DiscordError;
use crate::protocol::error::ProtocolError;

/// Enumerates error types: IO, parsing, protocol, CLI, and others.
Expand All @@ -17,6 +18,8 @@ pub enum CordErrorKind {
Protocol,
/// Errors related to CLI operations.
Cli,
/// Errors related to Discord operations.
Discord,
/// Other unspecified errors.
Other,
}
Expand Down Expand Up @@ -92,6 +95,12 @@ impl From<CliError> for CordError {
}
}

impl From<DiscordError> for CordError {
fn from(err: DiscordError) -> Self {
Self::new(CordErrorKind::Discord, err)
}
}

impl From<String> for CordError {
fn from(err: String) -> Self {
Self::new(
Expand Down
116 changes: 116 additions & 0 deletions src/ipc/bindings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#![allow(clippy::upper_case_acronyms)]

#[cfg(target_os = "windows")]
mod windows {
pub type HANDLE = *mut std::ffi::c_void;
pub type DWORD = u32;
pub type BOOL = i32;
pub type LPCWSTR = *const u16;
pub type LPVOID = *mut std::ffi::c_void;

pub const GENERIC_READ: DWORD = 0x80000000;
pub const GENERIC_WRITE: DWORD = 0x40000000;
pub const OPEN_EXISTING: DWORD = 3;
pub const INVALID_HANDLE_VALUE: HANDLE = -1isize as HANDLE;
pub const ERROR_PIPE_CONNECTED: DWORD = 535;
pub const ERROR_IO_PENDING: DWORD = 997;
pub const PIPE_ACCESS_DUPLEX: DWORD = 0x00000003;
pub const FILE_FLAG_OVERLAPPED: DWORD = 0x40000000;
pub const PIPE_TYPE_MESSAGE: DWORD = 0x00000004;
pub const PIPE_READMODE_MESSAGE: DWORD = 0x00000002;
pub const PIPE_WAIT: DWORD = 0x00000000;
pub const PIPE_UNLIMITED_INSTANCES: DWORD = 255;

#[repr(C)]
pub struct Overlapped {
pub internal: usize,
pub internal_high: usize,
pub offset: DWORD,
pub offset_high: DWORD,
pub h_event: HANDLE,
}

impl Default for Overlapped {
fn default() -> Self {
Self {
internal: 0,
internal_high: 0,
offset: 0,
offset_high: 0,
h_event: unsafe {
CreateEventW(
std::ptr::null_mut(),
1,
0,
std::ptr::null_mut(),
)
},
}
}
}

extern "system" {
pub fn CreateFileW(
lfFileName: LPCWSTR,
dwDesiredAccess: DWORD,
dwShareMode: DWORD,
lpSecurityAttributes: LPVOID,
dwCreationDisposition: DWORD,
dwFlagsAndAttributes: DWORD,
hTemplateFile: HANDLE,
) -> HANDLE;

pub fn CreateNamedPipeW(
lpName: LPCWSTR,
dwOpenMode: DWORD,
dwPipeMode: DWORD,
nMaxInstances: DWORD,
nOutBufferSize: DWORD,
nInBufferSize: DWORD,
nDefaultTimeOut: DWORD,
lpSecurityAttributes: LPVOID,
) -> HANDLE;

pub fn ConnectNamedPipe(
hNamedPipe: HANDLE,
lpOverlapped: *mut Overlapped,
) -> BOOL;

pub fn GetLastError() -> DWORD;

pub fn CloseHandle(hObject: HANDLE) -> BOOL;

pub fn CreateEventW(
lpEventAttributes: LPVOID,
bManualReset: BOOL,
bInitialState: BOOL,
lpName: LPCWSTR,
) -> HANDLE;

pub fn WriteFile(
hFile: HANDLE,
lpBuffer: *const u8,
nNumberOfBytesToWrite: DWORD,
lpNumberOfBytesWritten: *mut DWORD,
lpOverlapped: *mut Overlapped,
) -> BOOL;

pub fn ReadFile(
hFile: HANDLE,
lpBuffer: *mut u8,
nNumberOfBytesToRead: DWORD,
lpNumberOfBytesRead: *mut DWORD,
lpOverlapped: *mut Overlapped,
) -> BOOL;

pub fn GetOverlappedResult(
hFile: HANDLE,
lpOverlapped: *mut Overlapped,
lpNumberOfBytesTransferred: *mut DWORD,
bWait: BOOL,
) -> BOOL;
}
}

#[cfg(target_os = "windows")]
pub use windows::*;
51 changes: 13 additions & 38 deletions src/ipc/discord/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::io::{Read, Write};
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::Sender;
use std::sync::Arc;
use std::thread::JoinHandle;

use crate::ipc::discord::utils;
use crate::messages::message::Message;
use crate::presence::packet::Packet;
use crate::protocol::json::Json;

Expand All @@ -15,11 +17,14 @@ use crate::protocol::json::Json;
pub struct RichClient {
pub client_id: u64,
#[cfg(target_os = "windows")]
pub pipe: Option<std::fs::File>,
pub pipe: Option<Arc<std::fs::File>>,
#[cfg(not(target_os = "windows"))]
pub pipe: Option<std::os::unix::net::UnixStream>,
pub read_pipe: Option<std::os::unix::net::UnixStream>,
#[cfg(not(target_os = "windows"))]
pub write_pipe: Option<std::os::unix::net::UnixStream>,
pub pid: u32,
pub is_ready: AtomicBool,
pub thread_handle: Option<JoinHandle<()>>,
}

/// Defines methods for connecting and closing the client.
Expand All @@ -28,43 +33,13 @@ pub trait Connection {
fn connect(client_id: u64) -> crate::Result<RichClient>;
/// Closes the connection to Discord.
fn close(&mut self);
/// Start reading from Discord in a separate thread
fn start_read_thread(&mut self, tx: Sender<Message>) -> crate::Result<()>;
/// Write data to Discord
fn write(&self, opcode: u32, data: Option<&[u8]>) -> crate::Result<()>;
}

impl RichClient {
/// Sends data to Discord.
pub fn write(&self, opcode: u32, data: Option<&[u8]>) -> crate::Result<()> {
self.pipe
.as_ref()
.map_or(Err("Pipe not found".into()), |mut pipe| {
let payload = match data {
Some(packet) => {
let mut payload =
utils::encode(opcode, packet.len() as u32);
payload.extend_from_slice(packet);
payload
}
None => utils::encode(opcode, 0),
};
pipe.write_all(&payload)?;

Ok(())
})
}

/// Receives data from Discord.
pub fn read(&self) -> crate::Result<Vec<u8>> {
self.pipe
.as_ref()
.map_or(Err("Pipe not found".into()), |mut pipe| {
let mut header = [0; 8];
pipe.read_exact(&mut header)?;
let size = utils::decode(&header) as usize;
let mut buffer = vec![0u8; size];
pipe.read_exact(&mut buffer)?;
Ok(buffer)
})
}

/// Establishes a connection with Discord.
pub fn handshake(&self) -> crate::Result<()> {
self.write(
Expand Down
48 changes: 48 additions & 0 deletions src/ipc/discord/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::{fmt, io};

#[derive(Debug)]
pub enum DiscordError {
Io(io::Error),
InvalidClientId(String),
ConnectionClosed,
PipeNotFound,
Custom(String),
}

impl fmt::Display for DiscordError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DiscordError::Io(err) => write!(f, "IO error: {}", err),
DiscordError::InvalidClientId(id) => {
write!(f, "'{}' is not a valid client ID", id)
}
DiscordError::ConnectionClosed => {
write!(f, "The connection was forcibly closed")
}
DiscordError::PipeNotFound => {
write!(f, "Discord IPC pipe not found")
}
DiscordError::Custom(msg) => write!(f, "{}", msg),
}
}
}

impl std::error::Error for DiscordError {}

impl From<io::Error> for DiscordError {
fn from(err: io::Error) -> Self {
DiscordError::Io(err)
}
}

impl From<&str> for DiscordError {
fn from(err: &str) -> Self {
DiscordError::Custom(err.to_string())
}
}

impl From<String> for DiscordError {
fn from(err: String) -> Self {
DiscordError::Custom(err)
}
}
2 changes: 2 additions & 0 deletions src/ipc/discord/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod client;
pub mod error;
pub mod opcodes;
pub mod platform;
mod utils;
28 changes: 28 additions & 0 deletions src/ipc/discord/opcodes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/// Discord IPC opcodes
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Opcode {
Handshake = 0,
Frame = 1,
Close = 2,
Ping = 3,
Pong = 4,
}

impl From<u32> for Opcode {
fn from(code: u32) -> Self {
match code {
0 => Opcode::Handshake,
1 => Opcode::Frame,
2 => Opcode::Close,
3 => Opcode::Ping,
4 => Opcode::Pong,
_ => Opcode::Frame,
}
}
}

impl From<Opcode> for u32 {
fn from(op: Opcode) -> Self {
op as u32
}
}
Loading

0 comments on commit 807291f

Please sign in to comment.