Skip to content

Commit

Permalink
Synthetic latency
Browse files Browse the repository at this point in the history
  • Loading branch information
quackzar committed Jun 25, 2024
1 parent 2afddf4 commit e707a3f
Show file tree
Hide file tree
Showing 2 changed files with 290 additions and 4 deletions.
218 changes: 215 additions & 3 deletions src/net/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! This could be background verification and 'anti-cheat' detection, error-reporting,
//! background beaver share generation, or other preproccessing actions.
use std::error::Error;
use std::{error::Error, time::Duration};

use futures::{SinkExt, StreamExt};
use futures_concurrency::future::Join;
Expand All @@ -35,7 +35,7 @@ use tokio_util::{
codec::{FramedRead, FramedWrite, LengthDelimitedCodec},
};

use crate::net::{Channel, RecvBytes, SendBytes, SplitChannel};
use crate::net::{connection::latency::Delayed, Channel, RecvBytes, SendBytes, SplitChannel};

pub struct Connection<R: AsyncRead, W: AsyncWrite> {
sender: Sending<W>,
Expand All @@ -54,7 +54,7 @@ pub enum ConnectionError {
Unknown(#[from] Box<dyn Error + Send + Sync + 'static>),
}

impl<R: AsyncRead, W: AsyncWrite> Connection<R, W> {
impl<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> Connection<R, W> {
/// Construct a new connection from a reader and writer
/// Messages are serialized with bincode and length delimated.
///
Expand All @@ -77,6 +77,20 @@ impl<R: AsyncRead, W: AsyncWrite> Connection<R, W> {
// it is really only unsent packages holding us back
(receiver.0.into_inner(), sender.0.into_inner())
}

pub fn delayed_read(self, delay: Duration) -> Connection<Delayed<R>, W> {
let r = self.receiver.0.into_inner();
let r = Delayed::new(r, delay);
let w = self.sender.0.into_inner();
Connection::new(r, w)
}

pub fn delayed_write(self, delay: Duration) -> Connection<R, Delayed<W>> {
let w = self.sender.0.into_inner();
let w = Delayed::new(w, delay);
let r = self.receiver.0.into_inner();
Connection::new(r, w)
}
}

pub struct Sending<W: AsyncWrite>(FramedWrite<W, LengthDelimitedCodec>);
Expand Down Expand Up @@ -168,6 +182,7 @@ impl TcpConnection {
///
/// * `stream`: TCP stream to use
pub fn from_tcp_stream(stream: TcpStream) -> Self {
let _ = stream.set_nodelay(true);
let (reader, writer) = stream.into_split();
Self::new(reader, writer)
}
Expand Down Expand Up @@ -216,6 +231,113 @@ impl DuplexConnection {
}
}

pub mod latency {
//! Primitive latency for arbitrary `AsyncWrite`/`AsyncRead` types
//! for similating network latency.
//!
//! When reading or flushing we run a timer in which we wait until it is done,
//! after which the
//!
//! Writing is probably the most realistic, given that it runs a timer after flushing,
//! which could be slow.
use std::{pin::Pin, time::Duration};

use futures::{pin_mut, FutureExt};
use tokio::{
io::{AsyncRead, AsyncWrite},
time::Sleep,
};

pub struct Delayed<T> {
inner: T,
delay: Duration,
timer: Pin<Box<Sleep>>,
}

impl<T> Delayed<T> {
pub fn new(t: T, delay: Duration) -> Self {
Self {
inner: t,
delay,
timer: Box::pin(tokio::time::sleep(delay)),
}
}

pub fn destroy(self) -> T {
self.inner
}
}

impl<T> AsyncWrite for Delayed<T>
where
T: AsyncWrite + Unpin,
{
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
let writer = &mut self.inner;
pin_mut!(writer);
writer.poll_write(cx, buf)
}

fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
let Delayed {
timer,
inner,
delay,
} = &mut *self;
match timer.poll_unpin(cx) {
std::task::Poll::Ready(()) => {
pin_mut!(inner);
timer.set(tokio::time::sleep(*delay));
inner.poll_flush(cx)
}
std::task::Poll::Pending => std::task::Poll::Pending,
}
}

fn poll_shutdown(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
let writer = &mut self.inner;
pin_mut!(writer);
writer.poll_shutdown(cx)
}
}

impl<T> AsyncRead for Delayed<T>
where
T: AsyncRead + Unpin,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let Delayed {
timer,
inner,
delay,
} = &mut *self;
match timer.poll_unpin(cx) {
std::task::Poll::Ready(()) => {
pin_mut!(inner);
timer.set(tokio::time::sleep(*delay));
inner.poll_read(cx, buf)
}
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
}

#[cfg(test)]
mod test {

Expand Down Expand Up @@ -283,4 +405,94 @@ mod test {

futures::join!(h1, h2);
}

#[tokio::test]
async fn in_memory_delayed_read() {
use std::time::Instant;
let (conn1, conn2) = DuplexConnection::in_memory();
let h1 = async move {
let mut conn = conn1.delayed_read(Duration::from_millis(50));
let t0 = Instant::now();
conn.send(&"Hello").await.unwrap();
let delta_t = Instant::now() - t0;
println!("[1] Message sent in {delta_t:#?}");

let t0 = Instant::now();
conn.send(&"Buddy").await.unwrap();
let delta_t = Instant::now() - t0;
println!("[1] Message sent in {delta_t:#?}");

let t0 = Instant::now();
let msg: Box<str> = conn.recv().await.unwrap();
let delta_t = Instant::now() - t0;
println!("[1] Message received in {delta_t:#?}");
assert_eq!(msg, "Greetings friend".into());
};
let h2 = async move {
let mut conn = conn2.delayed_read(Duration::from_millis(50));
let t0 = Instant::now();
let msg: Box<str> = conn.recv().await.unwrap();
let delta_t = Instant::now() - t0;
println!("[2] Message received in {delta_t:#?}");
assert_eq!(msg, "Hello".into());

let t0 = Instant::now();
let msg: Box<str> = conn.recv().await.unwrap();
let delta_t = Instant::now() - t0;
println!("[2] Message received in {delta_t:#?}");
assert_eq!(msg, "Buddy".into());

let t0 = Instant::now();
conn.send(&"Greetings friend").await.unwrap();
let delta_t = Instant::now() - t0;
println!("[2] Message sent in {delta_t:#?}");
};

futures::join!(h1, h2);
}

#[tokio::test]
async fn in_memory_delayed_write() {
use std::time::Instant;
let (conn1, conn2) = DuplexConnection::in_memory();
let h1 = async move {
let mut conn = conn1.delayed_write(Duration::from_millis(50));
let t0 = Instant::now();
conn.send(&"Hello").await.unwrap();
let delta_t = Instant::now() - t0;
println!("[1] Message sent in {delta_t:#?}");

let t0 = Instant::now();
conn.send(&"Buddy").await.unwrap();
let delta_t = Instant::now() - t0;
println!("[1] Message sent in {delta_t:#?}");

let t0 = Instant::now();
let msg: Box<str> = conn.recv().await.unwrap();
let delta_t = Instant::now() - t0;
println!("[1] Message received in {delta_t:#?}");
assert_eq!(msg, "Greetings friend".into());
};
let h2 = async move {
let mut conn = conn2.delayed_write(Duration::from_millis(50));
let t0 = Instant::now();
let msg: Box<str> = conn.recv().await.unwrap();
let delta_t = Instant::now() - t0;
println!("[2] Message received in {delta_t:#?}");
assert_eq!(msg, "Hello".into());

let t0 = Instant::now();
let msg: Box<str> = conn.recv().await.unwrap();
let delta_t = Instant::now() - t0;
println!("[2] Message received in {delta_t:#?}");
assert_eq!(msg, "Buddy".into());

let t0 = Instant::now();
conn.send(&"Greetings friend").await.unwrap();
let delta_t = Instant::now() - t0;
println!("[2] Message sent in {delta_t:#?}");
};

futures::join!(h1, h2);
}
}
76 changes: 75 additions & 1 deletion src/net/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ impl<C: SplitChannel> Tuneable for Network<C> {
}
}

impl<C: SplitChannel> Communicate for Network<C> {}

/// Network containing only duplex connections.
/// Used for local testing.
pub type InMemoryNetwork = Network<DuplexConnection>;
Expand Down Expand Up @@ -494,6 +496,30 @@ impl InMemoryNetwork {
.map(|conn| async move { conn.shutdown().await });
join_all(futs).await.into_iter().map_ok(|_| {}).collect()
}

pub fn add_read_latency(self, delay: Duration) -> Network<impl SplitChannel> {
let connections = self
.connections
.into_iter()
.map(|c| c.delayed_read(delay))
.collect();
Network {
connections,
index: self.index,
}
}

pub fn add_write_latency(self, delay: Duration) -> Network<impl SplitChannel> {
let connections = self
.connections
.into_iter()
.map(|c| c.delayed_write(delay))
.collect();
Network {
connections,
index: self.index,
}
}
}

/// TCP Network based on TCP Streams.
Expand Down Expand Up @@ -563,7 +589,55 @@ impl TcpNetwork {
}
}

impl<C: SplitChannel> Communicate for Network<C> {}
mod builder {
use std::{net::SocketAddr, time::Duration};

use crate::net::{
connection::TcpConnection,
network::{NetResult, TcpNetwork},
};

pub struct NetworkBuilder {
delay: Option<Duration>,
}

pub struct TcpNetworkBuilder {
parent: NetworkBuilder,
addr: SocketAddr,
parties: Vec<SocketAddr>,
}

impl NetworkBuilder {
pub fn add_delay(mut self, lag: Duration) -> Self {
self.delay = Some(lag);
self
}

pub fn tcp(self, addr: SocketAddr) -> TcpNetworkBuilder {
TcpNetworkBuilder {
parent: self,
addr,
parties: vec![],
}
}
}

impl TcpNetworkBuilder {
pub fn add_party(mut self, addr: SocketAddr) -> Self {
self.parties.push(addr);
self
}

pub fn add_parties(mut self, addrs: &[SocketAddr]) -> Self {
self.parties.extend_from_slice(addrs);
self
}

pub async fn connect(self) -> NetResult<TcpNetwork, TcpConnection> {
TcpNetwork::connect(self.addr, &self.parties).await
}
}
}

#[cfg(test)]
mod test {
Expand Down

0 comments on commit e707a3f

Please sign in to comment.