Skip to content

Commit

Permalink
TcpListener: drain all pending accepts (#224)
Browse files Browse the repository at this point in the history
This commit makes `TcpListener::accept` try to accept the next
connection without waiting for a notify, if the previous connection
couldn't be established because the peer already went away.

The change is meant to fix a bug where a pending connection sometimes
wouldn't be accepted. Consider this series of events:

 * Server runs `TcpListener::accept`. There are no connections pending,
   so it starts waiting on the `notify`.
 * Client 1 sends a SYN, which gets pushed to `ServerSocket::deque` and
   `ServerSocket::notify` is notified.
 * Client 2 sends a SYN, which gets pushed to `ServerSocket::deque` and
   `ServerSocket::notify` is notified.
 * Client 1 goes away.
 * Server wakes up because the `notify` was notified.
 * Server tries to accept the connection from client 1, which fails
   because client 1 went away.
 * Server starts waiting on the `notify` again.

We end up in a situation where the connection from client 2 is still
pending but the server doesn't accept it because it's waiting for a
notification that was already sent.
  • Loading branch information
teskje authored Mar 4, 2025
1 parent 9a4729c commit 248b159
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 26 deletions.
58 changes: 32 additions & 26 deletions src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,42 +64,48 @@ impl TcpListener {
/// established, the corresponding [`TcpStream`] and the remote peer’s
/// address will be returned.
pub async fn accept(&self) -> Result<(TcpStream, SocketAddr)> {
loop {
let origin = loop {
let maybe_accept = World::current(|world| {
let host = world.current_host_mut();
let (syn, origin) = host.tcp.accept(self.local_addr)?;

tracing::trace!(target: TRACING_TARGET, src = ?origin, dst = ?self.local_addr, protocol = %"TCP SYN", "Recv");
host.tcp.accept(self.local_addr)
});

// Send SYN-ACK -> origin. If Ok we proceed (acts as the ACK),
// else we return early to avoid host mutations.
let ack = syn.ack.send(());
tracing::trace!(target: TRACING_TARGET, src = ?self.local_addr, dst = ?origin, protocol = %"TCP SYN-ACK", "Send");
let Some((syn, origin)) = maybe_accept else {
// Wait for a new incoming connection, then retry.
self.notify.notified().await;
continue;
};

if ack.is_err() {
return None;
}
tracing::trace!(target: TRACING_TARGET, src = ?origin, dst = ?self.local_addr, protocol = %"TCP SYN", "Recv");

let mut my_addr = self.local_addr;
if origin.ip().is_loopback() {
my_addr.set_ip(origin.ip());
}
if my_addr.ip().is_unspecified() {
my_addr.set_ip(host.addr);
}
// Send SYN-ACK -> origin. If Ok we proceed (acts as the ACK), else
// we retry.
let ack = syn.ack.send(());
tracing::trace!(target: TRACING_TARGET, src = ?self.local_addr, dst = ?origin, protocol = %"TCP SYN-ACK", "Send");

let pair = SocketPair::new(my_addr, origin);
let rx = host.tcp.new_stream(pair);
if ack.is_ok() {
break origin;
}
};

Some((TcpStream::new(pair, rx), origin))
});
let stream = World::current(|world| {
let host = world.current_host_mut();

if let Some(accepted) = maybe_accept {
return Ok(accepted);
let mut my_addr = self.local_addr;
if origin.ip().is_loopback() {
my_addr.set_ip(origin.ip());
}
if my_addr.ip().is_unspecified() {
my_addr.set_ip(host.addr);
}

self.notify.notified().await;
}
let pair = SocketPair::new(my_addr, origin);
let rx = host.tcp.new_stream(pair);
TcpStream::new(pair, rx)
});

tracing::trace!(target: TRACING_TARGET, src = ?self.local_addr, dst = ?origin, "Accepted");
Ok((stream, origin))
}

/// Returns the local address that this listener is bound to.
Expand Down
42 changes: 42 additions & 0 deletions tests/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,48 @@ fn accept_front_of_line_blocking() -> Result {
sim.run()
}

#[test]
fn accept_front_of_line_dropped() -> Result {
let wait = Rc::new(Notify::new());
let notify = wait.clone();

let mut sim = Builder::new()
.min_message_latency(Duration::ZERO)
.max_message_latency(Duration::ZERO)
.build();

sim.host("server", move || {
let wait = Rc::clone(&wait);
async move {
let listener = bind().await?;
wait.notified().await;

while let Ok((_, peer)) = listener.accept().await {
tracing::debug!("peer {}", peer);
}

Ok(())
}
});

sim.client("client", async move {
// Queue up a number of broken connections at the server.
for _ in 0..5 {
let connect = TcpStream::connect(("server", PORT));
assert!(timeout(Duration::from_secs(1), connect).await.is_err());
}

// After allowing the server to accept, the next connection attempt
// should succeed.
notify.notify_one();
let _ = TcpStream::connect(("server", PORT)).await?;

Ok(())
});

sim.run()
}

#[test]
fn send_upon_accept() -> Result {
let mut sim = Builder::new().build();
Expand Down

0 comments on commit 248b159

Please sign in to comment.