From e0f7517b6ac38fb516caa46c0d00f4dfb01c0d4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Thu, 16 Nov 2023 12:50:11 -0100 Subject: [PATCH 1/3] Refactor pool creation in tests --- src/conn/pool/mod.rs | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index df3ac251..3949ef03 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -423,6 +423,12 @@ mod test { }; } + fn pool_with_one_connection() -> Pool { + let pool_opts = PoolOpts::new().with_constraints(PoolConstraints::new(1, 1).unwrap()); + let opts = get_opts().pool_opts(pool_opts.clone()); + Pool::new(opts) + } + #[tokio::test] async fn should_opt_out_of_connection_reset() -> super::Result<()> { let pool_opts = PoolOpts::new().with_constraints(PoolConstraints::new(1, 1).unwrap()); @@ -571,10 +577,7 @@ mod test { #[tokio::test] async fn should_reuse_connections() -> super::Result<()> { - let constraints = PoolConstraints::new(1, 1).unwrap(); - let opts = get_opts().pool_opts(PoolOpts::default().with_constraints(constraints)); - - let pool = Pool::new(opts); + let pool = pool_with_one_connection(); let mut conn = pool.get_conn().await?; let server_version = conn.server_version(); @@ -613,10 +616,7 @@ mod test { #[tokio::test] async fn should_start_transaction() -> super::Result<()> { - let constraints = PoolConstraints::new(1, 1).unwrap(); - let opts = get_opts().pool_opts(PoolOpts::default().with_constraints(constraints)); - - let pool = Pool::new(opts); + let pool = pool_with_one_connection(); "CREATE TABLE IF NOT EXISTS mysql.tmp(id int)" .ignore(&pool) @@ -909,10 +909,7 @@ mod test { #[tokio::test] async fn should_ignore_non_fatal_errors_while_returning_to_a_pool() -> super::Result<()> { - let pool_constraints = PoolConstraints::new(1, 1).unwrap(); - let pool_opts = PoolOpts::default().with_constraints(pool_constraints); - - let pool = Pool::new(get_opts().pool_opts(pool_opts)); + let pool = pool_with_one_connection(); let id = pool.get_conn().await?.id(); // non-fatal errors are ignored @@ -927,10 +924,7 @@ mod test { #[tokio::test] async fn should_remove_waker_of_cancelled_task() { - let pool_constraints = PoolConstraints::new(1, 1).unwrap(); - let pool_opts = PoolOpts::default().with_constraints(pool_constraints); - - let pool = Pool::new(get_opts().pool_opts(pool_opts)); + let pool = pool_with_one_connection(); let only_conn = pool.get_conn().await.unwrap(); let join_handle = tokio::spawn(timeout(Duration::from_secs(1), pool.get_conn())); From 906c453393c5b20220cf24a2274b2d4468ad8a33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Thu, 14 Dec 2023 15:04:37 -0100 Subject: [PATCH 2/3] Add a failing test The next patch fixes it, but by adding a failing test we get a documentation of what the bug was. --- src/conn/pool/mod.rs | 68 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 65 insertions(+), 3 deletions(-) diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index 3949ef03..59a67ea3 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -392,14 +392,14 @@ impl Drop for Conn { #[cfg(test)] mod test { use futures_util::{ - future::{join_all, select, select_all, try_join_all}, - try_join, FutureExt, + future::{join_all, select, select_all, try_join_all, Either}, + poll, try_join, FutureExt, }; use tokio::time::{sleep, timeout}; use std::{ cmp::Reverse, - task::{RawWaker, RawWakerVTable, Waker}, + task::{Poll, RawWaker, RawWakerVTable, Waker}, time::Duration, }; @@ -1053,6 +1053,68 @@ mod test { Ok(()) } + #[tokio::test] + async fn check_priorities() -> super::Result<()> { + let pool = pool_with_one_connection(); + + let queue_len = || { + let exchange = pool.inner.exchange.lock().unwrap(); + exchange.waiting.queue.len() + }; + + // Get a connection, so we know the next futures will be + // queued. + let conn = pool.get_conn().await.unwrap(); + + let get_pending = || async { + let fut = async { + pool.get_conn().await.unwrap(); + } + .shared(); + let p = poll!(fut.clone()); + assert!(matches!(p, Poll::Pending)); + fut + }; + + let fut1 = get_pending().await; + let fut2 = get_pending().await; + + // Both futures are queued + assert_eq!(queue_len(), 2); + + drop(conn); // This will pop fut1 from the queue, making it [2] + while queue_len() != 1 { + tokio::time::sleep(Duration::from_millis(100)).await; + } + + // We called wake on fut1, but with the select fut2 will + // resolve first + let Either::Left((_, fut1)) = select(fut2, fut1).await else { + panic!("wrong future"); + }; + + // We dropped the connection of fut2, but very likely hasn't + // made it through the recycler yet. + assert_eq!(queue_len(), 1); + + let p = poll!(fut1.clone()); + assert!(matches!(p, Poll::Pending)); + assert_eq!(queue_len(), 2); // Now fut1 is queued again + + // The connection will pass by the recycler and unblock fut1 + // and pop it from the queue. + fut1.await; + assert_eq!(queue_len(), 1); + + // Since the queue is not empty, a new future will be pending + let fut3 = get_pending().await; + assert_eq!(queue_len(), 2); + + println!("we get here"); + fut3.await; + panic!("we never get here"); + } + #[cfg(feature = "nightly")] mod bench { use futures_util::future::{FutureExt, TryFutureExt}; From be693e0798f9b72d1189fd69d494922fcce5bcf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Thu, 16 Nov 2023 12:19:41 -0100 Subject: [PATCH 3/3] Use an explicit priority check This fixes the case where wake is called for one future, but another future gets the connection. --- src/conn/pool/futures/get_conn.rs | 4 +-- src/conn/pool/mod.rs | 45 ++++++++++++++++++------------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/src/conn/pool/futures/get_conn.rs b/src/conn/pool/futures/get_conn.rs index 8b21e685..22543fad 100644 --- a/src/conn/pool/futures/get_conn.rs +++ b/src/conn/pool/futures/get_conn.rs @@ -112,10 +112,8 @@ impl Future for GetConn { loop { match self.inner { GetConnInner::New => { - let queued = self.queue_id.is_some(); let queue_id = *self.queue_id.get_or_insert_with(QueueId::next); - let next = - ready!(Pin::new(self.pool_mut()).poll_new_conn(cx, queued, queue_id))?; + let next = ready!(Pin::new(self.pool_mut()).poll_new_conn(cx, queue_id))?; match next { GetConnInner::Connecting(conn_fut) => { self.inner = GetConnInner::Connecting(conn_fut); diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index 59a67ea3..f13a5a2c 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -133,8 +133,8 @@ impl Waitlist { self.queue.remove(&tmp); } - fn is_empty(&self) -> bool { - self.queue.is_empty() + fn peek_id(&mut self) -> Option { + self.queue.peek().map(|(qw, _)| qw.queue_id) } } @@ -303,16 +303,14 @@ impl Pool { fn poll_new_conn( self: Pin<&mut Self>, cx: &mut Context<'_>, - queued: bool, queue_id: QueueId, ) -> Poll> { - self.poll_new_conn_inner(cx, queued, queue_id) + self.poll_new_conn_inner(cx, queue_id) } fn poll_new_conn_inner( self: Pin<&mut Self>, cx: &mut Context<'_>, - queued: bool, queue_id: QueueId, ) -> Poll> { let mut exchange = self.inner.exchange.lock().unwrap(); @@ -326,8 +324,15 @@ impl Pool { exchange.spawn_futures_if_needed(&self.inner); - // Check if others are waiting and we're not queued. - if !exchange.waiting.is_empty() && !queued { + // Check if we are higher priority than anything current + let highest = if let Some(cur) = exchange.waiting.peek_id() { + queue_id > cur + } else { + true + }; + + // If we are not, just queue + if !highest { exchange.waiting.push(cx.waker().clone(), queue_id); return Poll::Pending; } @@ -1087,32 +1092,34 @@ mod test { tokio::time::sleep(Duration::from_millis(100)).await; } - // We called wake on fut1, but with the select fut2 will + // We called wake on fut1, and even with the select fut1 will // resolve first - let Either::Left((_, fut1)) = select(fut2, fut1).await else { + let Either::Right((_, fut2)) = select(fut2, fut1).await else { panic!("wrong future"); }; - // We dropped the connection of fut2, but very likely hasn't + // We dropped the connection of fut1, but very likely hasn't // made it through the recycler yet. assert_eq!(queue_len(), 1); - let p = poll!(fut1.clone()); + let p = poll!(fut2.clone()); assert!(matches!(p, Poll::Pending)); - assert_eq!(queue_len(), 2); // Now fut1 is queued again + assert_eq!(queue_len(), 1); // The queue still has fut2 - // The connection will pass by the recycler and unblock fut1 + // The connection will pass by the recycler and unblock fut2 // and pop it from the queue. - fut1.await; - assert_eq!(queue_len(), 1); + fut2.await; + assert_eq!(queue_len(), 0); - // Since the queue is not empty, a new future will be pending + // The recycler is probably not done, so a new future will be + // pending. let fut3 = get_pending().await; - assert_eq!(queue_len(), 2); + assert_eq!(queue_len(), 1); - println!("we get here"); + // It is OK to await it. fut3.await; - panic!("we never get here"); + + Ok(()) } #[cfg(feature = "nightly")]