diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 36ef265..e34b88d 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -64,10 +64,15 @@ impl TcpListener { /// established, the corresponding [`TcpStream`] and the remote peer’s /// address will be returned. pub async fn accept(&self) -> Result<(TcpStream, SocketAddr)> { + enum Retry { + Now, + Wait, + } + loop { - let maybe_accept = World::current(|world| { + let accept_result = World::current(|world| { let host = world.current_host_mut(); - let (syn, origin) = host.tcp.accept(self.local_addr)?; + let (syn, origin) = host.tcp.accept(self.local_addr).ok_or(Retry::Wait)?; tracing::trace!(target: TRACING_TARGET, src = ?origin, dst = ?self.local_addr, protocol = %"TCP SYN", "Recv"); @@ -77,7 +82,7 @@ impl TcpListener { tracing::trace!(target: TRACING_TARGET, src = ?self.local_addr, dst = ?origin, protocol = %"TCP SYN-ACK", "Send"); if ack.is_err() { - return None; + return Err(Retry::Now); } let mut my_addr = self.local_addr; @@ -91,14 +96,19 @@ impl TcpListener { let pair = SocketPair::new(my_addr, origin); let rx = host.tcp.new_stream(pair); - Some((TcpStream::new(pair, rx), origin)) + tracing::trace!(target: TRACING_TARGET, src = ?self.local_addr, dst = ?origin, "Accepted"); + Ok((TcpStream::new(pair, rx), origin)) }); - if let Some(accepted) = maybe_accept { - return Ok(accepted); + match accept_result { + Ok(accepted) => return Ok(accepted), + Err(Retry::Now) => (), + Err(Retry::Wait) => { + tracing::trace!(target: TRACING_TARGET, src = ?self.local_addr, "Waiting for notify"); + self.notify.notified().await; + tracing::trace!(target: TRACING_TARGET, src = ?self.local_addr, "Received notify"); + } } - - self.notify.notified().await; } } diff --git a/tests/tcp.rs b/tests/tcp.rs index 8ce5aa6..81fc1de 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -292,6 +292,48 @@ fn accept_front_of_line_blocking() -> Result { sim.run() } +#[test] +fn accept_front_of_line_dropped() -> Result { + let wait = Rc::new(Notify::new()); + let notify = wait.clone(); + + let mut sim = Builder::new() + .min_message_latency(Duration::ZERO) + .max_message_latency(Duration::ZERO) + .build(); + + sim.host("server", move || { + let wait = Rc::clone(&wait); + async move { + let listener = bind().await?; + wait.notified().await; + + while let Ok((_, peer)) = listener.accept().await { + tracing::debug!("peer {}", peer); + } + + Ok(()) + } + }); + + sim.client("client", async move { + // Queue up a number of broken connections at the server. + for _ in 0..5 { + let connect = TcpStream::connect(("server", PORT)); + assert!(timeout(Duration::from_secs(1), connect).await.is_err()); + } + + // After allowing the server to accept, the next connection attempt + // should succeed. + notify.notify_one(); + let _ = TcpStream::connect(("server", PORT)).await?; + + Ok(()) + }); + + sim.run() +} + #[test] fn send_upon_accept() -> Result { let mut sim = Builder::new().build();