Skip to content

Commit

Permalink
TcpListener: drain all pending accepts
Browse files Browse the repository at this point in the history
This commit makes `TcpListener::accept` try to accept the next
connection without waiting for a notify, if the previous connection
couldn't be established because the peer already went away.

The change is meant to fix a bug where a pending connection sometimes
wouldn't be accepted. Consider this series of events:

 * Server runs `TcpListener::accept`. There are no connections pending,
   so it starts waiting on the `notify`.
 * Client 1 sends a SYN, which gets pushed to `ServerSocket::deque` and
   `ServerSocket::notify` is notified.
 * Client 2 sends a SYN, which gets pushed to `ServerSocket::deque` and
   `ServerSocket::notify` is notified.
 * Client 1 goes away.
 * Server wakes up because the `notify` was notified.
 * Server tries to accept the connection from client 1, which fails
   because client 1 went away.
 * Server starts waiting on the `notify` again.

We end up in a situation where the connection from client 2 is still
pending but the server doesn't accept it because it's waiting for a
notification that was already sent.
  • Loading branch information
teskje committed Mar 1, 2025
1 parent 7964685 commit 652ed46
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 8 deletions.
26 changes: 18 additions & 8 deletions src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,15 @@ impl TcpListener {
/// established, the corresponding [`TcpStream`] and the remote peer’s
/// address will be returned.
pub async fn accept(&self) -> Result<(TcpStream, SocketAddr)> {
enum Retry {
Now,
Wait,
}

loop {
let maybe_accept = World::current(|world| {
let accept_result = World::current(|world| {
let host = world.current_host_mut();
let (syn, origin) = host.tcp.accept(self.local_addr)?;
let (syn, origin) = host.tcp.accept(self.local_addr).ok_or(Retry::Wait)?;

tracing::trace!(target: TRACING_TARGET, src = ?origin, dst = ?self.local_addr, protocol = %"TCP SYN", "Recv");

Expand All @@ -77,7 +82,7 @@ impl TcpListener {
tracing::trace!(target: TRACING_TARGET, src = ?self.local_addr, dst = ?origin, protocol = %"TCP SYN-ACK", "Send");

if ack.is_err() {
return None;
return Err(Retry::Now);
}

let mut my_addr = self.local_addr;
Expand All @@ -91,14 +96,19 @@ impl TcpListener {
let pair = SocketPair::new(my_addr, origin);
let rx = host.tcp.new_stream(pair);

Some((TcpStream::new(pair, rx), origin))
tracing::trace!(target: TRACING_TARGET, src = ?self.local_addr, dst = ?origin, "Accepted");
Ok((TcpStream::new(pair, rx), origin))
});

if let Some(accepted) = maybe_accept {
return Ok(accepted);
match accept_result {
Ok(accepted) => return Ok(accepted),
Err(Retry::Now) => (),
Err(Retry::Wait) => {
tracing::trace!(target: TRACING_TARGET, src = ?self.local_addr, "Waiting for notify");
self.notify.notified().await;
tracing::trace!(target: TRACING_TARGET, src = ?self.local_addr, "Received notify");
}
}

self.notify.notified().await;
}
}

Expand Down
42 changes: 42 additions & 0 deletions tests/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,48 @@ fn accept_front_of_line_blocking() -> Result {
sim.run()
}

#[test]
fn accept_front_of_line_dropped() -> Result {
let wait = Rc::new(Notify::new());
let notify = wait.clone();

let mut sim = Builder::new()
.min_message_latency(Duration::ZERO)
.max_message_latency(Duration::ZERO)
.build();

sim.host("server", move || {
let wait = Rc::clone(&wait);
async move {
let listener = bind().await?;
wait.notified().await;

while let Ok((_, peer)) = listener.accept().await {
tracing::debug!("peer {}", peer);
}

Ok(())
}
});

sim.client("client", async move {
// Queue up a number of broken connections at the server.
for _ in 0..5 {
let connect = TcpStream::connect(("server", PORT));
assert!(timeout(Duration::from_secs(1), connect).await.is_err());
}

// After allowing the server to accept, the next connection attempt
// should succeed.
notify.notify_one();
let _ = TcpStream::connect(("server", PORT)).await?;

Ok(())
});

sim.run()
}

#[test]
fn send_upon_accept() -> Result {
let mut sim = Builder::new().build();
Expand Down

0 comments on commit 652ed46

Please sign in to comment.