Skip to content

Commit

Permalink
Use an explicit priority check
Browse files Browse the repository at this point in the history
This fixes the case where wake is called for one future, but another
future gets the connection.
  • Loading branch information
espindola committed Dec 14, 2023
1 parent 906c453 commit be693e0
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 22 deletions.
4 changes: 1 addition & 3 deletions src/conn/pool/futures/get_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,8 @@ impl Future for GetConn {
loop {
match self.inner {
GetConnInner::New => {
let queued = self.queue_id.is_some();
let queue_id = *self.queue_id.get_or_insert_with(QueueId::next);
let next =
ready!(Pin::new(self.pool_mut()).poll_new_conn(cx, queued, queue_id))?;
let next = ready!(Pin::new(self.pool_mut()).poll_new_conn(cx, queue_id))?;
match next {
GetConnInner::Connecting(conn_fut) => {
self.inner = GetConnInner::Connecting(conn_fut);
Expand Down
45 changes: 26 additions & 19 deletions src/conn/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ impl Waitlist {
self.queue.remove(&tmp);
}

fn is_empty(&self) -> bool {
self.queue.is_empty()
fn peek_id(&mut self) -> Option<QueueId> {
self.queue.peek().map(|(qw, _)| qw.queue_id)
}
}

Expand Down Expand Up @@ -303,16 +303,14 @@ impl Pool {
fn poll_new_conn(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
queued: bool,
queue_id: QueueId,
) -> Poll<Result<GetConnInner>> {
self.poll_new_conn_inner(cx, queued, queue_id)
self.poll_new_conn_inner(cx, queue_id)
}

fn poll_new_conn_inner(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
queued: bool,
queue_id: QueueId,
) -> Poll<Result<GetConnInner>> {
let mut exchange = self.inner.exchange.lock().unwrap();
Expand All @@ -326,8 +324,15 @@ impl Pool {

exchange.spawn_futures_if_needed(&self.inner);

// Check if others are waiting and we're not queued.
if !exchange.waiting.is_empty() && !queued {
// Check if we are higher priority than anything current
let highest = if let Some(cur) = exchange.waiting.peek_id() {
queue_id > cur
} else {
true
};

// If we are not, just queue
if !highest {
exchange.waiting.push(cx.waker().clone(), queue_id);
return Poll::Pending;
}
Expand Down Expand Up @@ -1087,32 +1092,34 @@ mod test {
tokio::time::sleep(Duration::from_millis(100)).await;
}

// We called wake on fut1, but with the select fut2 will
// We called wake on fut1, and even with the select fut1 will
// resolve first
let Either::Left((_, fut1)) = select(fut2, fut1).await else {
let Either::Right((_, fut2)) = select(fut2, fut1).await else {
panic!("wrong future");
};

// We dropped the connection of fut2, but very likely hasn't
// We dropped the connection of fut1, but very likely hasn't
// made it through the recycler yet.
assert_eq!(queue_len(), 1);

let p = poll!(fut1.clone());
let p = poll!(fut2.clone());
assert!(matches!(p, Poll::Pending));
assert_eq!(queue_len(), 2); // Now fut1 is queued again
assert_eq!(queue_len(), 1); // The queue still has fut2

// The connection will pass by the recycler and unblock fut1
// The connection will pass by the recycler and unblock fut2
// and pop it from the queue.
fut1.await;
assert_eq!(queue_len(), 1);
fut2.await;
assert_eq!(queue_len(), 0);

// Since the queue is not empty, a new future will be pending
// The recycler is probably not done, so a new future will be
// pending.
let fut3 = get_pending().await;
assert_eq!(queue_len(), 2);
assert_eq!(queue_len(), 1);

println!("we get here");
// It is OK to await it.
fut3.await;
panic!("we never get here");

Ok(())
}

#[cfg(feature = "nightly")]
Expand Down

0 comments on commit be693e0

Please sign in to comment.