diff --git a/CHANGELOG.md b/CHANGELOG.md index f4dedcf2..b5994254 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - All deserialization methods now accept `Bytes` instead of `std::io::Cursor` because deserialization from `std::io::Read` requires a temporary buffer. `Bytes` already provide cursor-like functionality. The crate now re-exported under `bevy_replicon::bytes`. - Use varint for `RepliconTick` because `postcard` provides more efficient encoding for it. - Improve panic message for non-registered functions. +- Log bytes count on receive. ### Fixed @@ -29,6 +30,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Update `ReplicatedClients` immediately to let users set visibility on `ClientConnected` trigger. +- Send and receive on packet split in the example backend. ## [0.30.0] - 2025-02-04 diff --git a/bevy_replicon_example_backend/src/tcp.rs b/bevy_replicon_example_backend/src/tcp.rs index 842641da..2ef2a68c 100644 --- a/bevy_replicon_example_backend/src/tcp.rs +++ b/bevy_replicon_example_backend/src/tcp.rs @@ -1,21 +1,28 @@ use std::{ error::Error, - io::{self, Read, Write}, + io::{self, IoSlice, Read, Write}, net::TcpStream, - slice, }; -pub(super) fn read_message(stream: &mut TcpStream) -> io::Result<(u8, Vec)> { - let mut channel_id = 0; - stream.read_exact(slice::from_mut(&mut channel_id))?; +use bevy_replicon::bytes::{Buf, Bytes}; - let mut size_bytes = [0; 2]; - stream.read_exact(&mut size_bytes)?; - let message_size = u16::from_le_bytes(size_bytes); +pub(super) fn read_message(stream: &mut TcpStream) -> io::Result<(u8, Bytes)> { + let mut header = [0; 3]; + match stream.peek(&mut header)? { + 0 => return Err(io::ErrorKind::UnexpectedEof.into()), // Socket was closed. + 1..3 => return Err(io::ErrorKind::WouldBlock.into()), // Wait for full header. + 3.. => (), + } - let mut message = vec![0; message_size as usize]; + let channel_id = header[0]; + let message_size = u16::from_le_bytes([header[1], header[2]]); + + let mut message = vec![0; header.len() + message_size as usize]; stream.read_exact(&mut message)?; + let mut message = Bytes::from(message); + message.advance(header.len()); + Ok((channel_id, message)) } @@ -25,9 +32,19 @@ pub(super) fn send_message( message: &[u8], ) -> Result<(), Box> { let message_size: u16 = message.len().try_into()?; - stream.write_all(&[channel_id])?; - stream.write_all(&message_size.to_le_bytes())?; - stream.write_all(message)?; + let channel_id = &[channel_id]; + let message_size = &message_size.to_le_bytes(); + let packet = [ + IoSlice::new(channel_id), + IoSlice::new(message_size), + IoSlice::new(message), + ]; + + // Write as a single message to avoid splitting between packets. + let len = stream.write_vectored(&packet)?; + if len != packet.iter().map(|s| s.len()).sum::() { + return Err(Box::new(io::Error::from(io::ErrorKind::UnexpectedEof))); + } Ok(()) } diff --git a/src/core/replicon_client.rs b/src/core/replicon_client.rs index 8ebb85fc..048447d6 100644 --- a/src/core/replicon_client.rs +++ b/src/core/replicon_client.rs @@ -77,8 +77,12 @@ impl RepliconClient { .unwrap_or_else(|| panic!("client should have a receive channel with id {channel_id}")); trace!( - "received {} message(s) from channel {channel_id}", - channel_messages.len() + "received {} message(s) totaling {} bytes from channel {channel_id}", + channel_messages.len(), + channel_messages + .iter() + .map(|bytes| bytes.len()) + .sum::() ); channel_messages.drain(..) diff --git a/src/core/replicon_server.rs b/src/core/replicon_server.rs index d07ab063..15b39d56 100644 --- a/src/core/replicon_server.rs +++ b/src/core/replicon_server.rs @@ -70,8 +70,12 @@ impl RepliconServer { .unwrap_or_else(|| panic!("server should have a receive channel with id {channel_id}")); trace!( - "received {} message(s) from channel {channel_id}", - channel_messages.len() + "received {} message(s) totaling {} bytes from channel {channel_id}", + channel_messages.len(), + channel_messages + .iter() + .map(|(_, bytes)| bytes.len()) + .sum::() ); channel_messages.drain(..)