Skip to content

Commit

Permalink
Fix send and receive on packet split in the example backend (#416)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shatur authored Feb 13, 2025
1 parent 18bc874 commit 098f54e
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- All deserialization methods now accept `Bytes` instead of `std::io::Cursor` because deserialization from `std::io::Read` requires a temporary buffer. `Bytes` already provide cursor-like functionality. The crate now re-exported under `bevy_replicon::bytes`.
- Use varint for `RepliconTick` because `postcard` provides more efficient encoding for it.
- Improve panic message for non-registered functions.
- Log bytes count on receive.

### Fixed

Expand All @@ -29,6 +30,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- Update `ReplicatedClients` immediately to let users set visibility on `ClientConnected` trigger.
- Send and receive on packet split in the example backend.

## [0.30.0] - 2025-02-04

Expand Down
41 changes: 29 additions & 12 deletions bevy_replicon_example_backend/src/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
use std::{
error::Error,
io::{self, Read, Write},
io::{self, IoSlice, Read, Write},
net::TcpStream,
slice,
};

pub(super) fn read_message(stream: &mut TcpStream) -> io::Result<(u8, Vec<u8>)> {
let mut channel_id = 0;
stream.read_exact(slice::from_mut(&mut channel_id))?;
use bevy_replicon::bytes::{Buf, Bytes};

let mut size_bytes = [0; 2];
stream.read_exact(&mut size_bytes)?;
let message_size = u16::from_le_bytes(size_bytes);
pub(super) fn read_message(stream: &mut TcpStream) -> io::Result<(u8, Bytes)> {
let mut header = [0; 3];
match stream.peek(&mut header)? {
0 => return Err(io::ErrorKind::UnexpectedEof.into()), // Socket was closed.
1..3 => return Err(io::ErrorKind::WouldBlock.into()), // Wait for full header.
3.. => (),
}

let mut message = vec![0; message_size as usize];
let channel_id = header[0];
let message_size = u16::from_le_bytes([header[1], header[2]]);

let mut message = vec![0; header.len() + message_size as usize];
stream.read_exact(&mut message)?;

let mut message = Bytes::from(message);
message.advance(header.len());

Ok((channel_id, message))
}

Expand All @@ -25,9 +32,19 @@ pub(super) fn send_message(
message: &[u8],
) -> Result<(), Box<dyn Error + Send + Sync>> {
let message_size: u16 = message.len().try_into()?;
stream.write_all(&[channel_id])?;
stream.write_all(&message_size.to_le_bytes())?;
stream.write_all(message)?;
let channel_id = &[channel_id];
let message_size = &message_size.to_le_bytes();
let packet = [
IoSlice::new(channel_id),
IoSlice::new(message_size),
IoSlice::new(message),
];

// Write as a single message to avoid splitting between packets.
let len = stream.write_vectored(&packet)?;
if len != packet.iter().map(|s| s.len()).sum::<usize>() {
return Err(Box::new(io::Error::from(io::ErrorKind::UnexpectedEof)));
}

Ok(())
}
8 changes: 6 additions & 2 deletions src/core/replicon_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,12 @@ impl RepliconClient {
.unwrap_or_else(|| panic!("client should have a receive channel with id {channel_id}"));

trace!(
"received {} message(s) from channel {channel_id}",
channel_messages.len()
"received {} message(s) totaling {} bytes from channel {channel_id}",
channel_messages.len(),
channel_messages
.iter()
.map(|bytes| bytes.len())
.sum::<usize>()
);

channel_messages.drain(..)
Expand Down
8 changes: 6 additions & 2 deletions src/core/replicon_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,12 @@ impl RepliconServer {
.unwrap_or_else(|| panic!("server should have a receive channel with id {channel_id}"));

trace!(
"received {} message(s) from channel {channel_id}",
channel_messages.len()
"received {} message(s) totaling {} bytes from channel {channel_id}",
channel_messages.len(),
channel_messages
.iter()
.map(|(_, bytes)| bytes.len())
.sum::<usize>()
);

channel_messages.drain(..)
Expand Down

0 comments on commit 098f54e

Please sign in to comment.