Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TcpListener: drain all pending accepts #224

Merged
merged 1 commit into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 32 additions & 26 deletions src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,42 +64,48 @@ impl TcpListener {
/// established, the corresponding [`TcpStream`] and the remote peer’s
/// address will be returned.
pub async fn accept(&self) -> Result<(TcpStream, SocketAddr)> {
loop {
let origin = loop {
let maybe_accept = World::current(|world| {
let host = world.current_host_mut();
let (syn, origin) = host.tcp.accept(self.local_addr)?;

tracing::trace!(target: TRACING_TARGET, src = ?origin, dst = ?self.local_addr, protocol = %"TCP SYN", "Recv");
host.tcp.accept(self.local_addr)
});

// Send SYN-ACK -> origin. If Ok we proceed (acts as the ACK),
// else we return early to avoid host mutations.
let ack = syn.ack.send(());
tracing::trace!(target: TRACING_TARGET, src = ?self.local_addr, dst = ?origin, protocol = %"TCP SYN-ACK", "Send");
let Some((syn, origin)) = maybe_accept else {
// Wait for a new incoming connection, then retry.
self.notify.notified().await;
continue;
};

if ack.is_err() {
return None;
}
tracing::trace!(target: TRACING_TARGET, src = ?origin, dst = ?self.local_addr, protocol = %"TCP SYN", "Recv");

let mut my_addr = self.local_addr;
if origin.ip().is_loopback() {
my_addr.set_ip(origin.ip());
}
if my_addr.ip().is_unspecified() {
my_addr.set_ip(host.addr);
}
// Send SYN-ACK -> origin. If Ok we proceed (acts as the ACK), else
// we retry.
let ack = syn.ack.send(());
tracing::trace!(target: TRACING_TARGET, src = ?self.local_addr, dst = ?origin, protocol = %"TCP SYN-ACK", "Send");

let pair = SocketPair::new(my_addr, origin);
let rx = host.tcp.new_stream(pair);
if ack.is_ok() {
break origin;
}
};

Some((TcpStream::new(pair, rx), origin))
});
let stream = World::current(|world| {
let host = world.current_host_mut();

if let Some(accepted) = maybe_accept {
return Ok(accepted);
let mut my_addr = self.local_addr;
if origin.ip().is_loopback() {
my_addr.set_ip(origin.ip());
}
if my_addr.ip().is_unspecified() {
my_addr.set_ip(host.addr);
}

self.notify.notified().await;
}
let pair = SocketPair::new(my_addr, origin);
let rx = host.tcp.new_stream(pair);
TcpStream::new(pair, rx)
});

tracing::trace!(target: TRACING_TARGET, src = ?self.local_addr, dst = ?origin, "Accepted");
Ok((stream, origin))
}

/// Returns the local address that this listener is bound to.
Expand Down
42 changes: 42 additions & 0 deletions tests/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,48 @@ fn accept_front_of_line_blocking() -> Result {
sim.run()
}

#[test]
fn accept_front_of_line_dropped() -> Result {
let wait = Rc::new(Notify::new());
let notify = wait.clone();

let mut sim = Builder::new()
.min_message_latency(Duration::ZERO)
.max_message_latency(Duration::ZERO)
.build();

sim.host("server", move || {
let wait = Rc::clone(&wait);
async move {
let listener = bind().await?;
wait.notified().await;

while let Ok((_, peer)) = listener.accept().await {
tracing::debug!("peer {}", peer);
}

Ok(())
}
});

sim.client("client", async move {
// Queue up a number of broken connections at the server.
for _ in 0..5 {
let connect = TcpStream::connect(("server", PORT));
assert!(timeout(Duration::from_secs(1), connect).await.is_err());
}

// After allowing the server to accept, the next connection attempt
// should succeed.
notify.notify_one();
let _ = TcpStream::connect(("server", PORT)).await?;

Ok(())
});

sim.run()
}

#[test]
fn send_upon_accept() -> Result {
let mut sim = Builder::new().build();
Expand Down