Skip to content

Commit f385450

Browse files
committed
fix(postgres): avoid recursively spawning tasks in PgListener::drop()
refactor(pool): deprecate `PoolConnection::release()`, provide renamed alts
1 parent c04f83b commit f385450

File tree

2 files changed

+67
-25
lines changed

2 files changed

+67
-25
lines changed

sqlx-core/src/pool/connection.rs

+62-25
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::database::Database;
1010
use crate::error::Error;
1111

1212
use super::inner::{DecrementSizeGuard, SharedPool};
13+
use std::future::Future;
1314

1415
/// A connection managed by a [`Pool`][crate::pool::Pool].
1516
///
@@ -60,43 +61,79 @@ impl<DB: Database> DerefMut for PoolConnection<DB> {
6061

6162
impl<DB: Database> PoolConnection<DB> {
6263
/// Explicitly release a connection from the pool
64+
#[deprecated = "renamed to `.detach()` for clarity"]
6365
pub fn release(mut self) -> DB::Connection {
66+
self.detach()
67+
}
68+
69+
/// Detach this connection from the pool, allowing it to open a replacement.
70+
///
71+
/// Note that if your application uses a single shared pool, this
72+
/// effectively lets the application exceed the `max_connections` setting.
73+
///
74+
/// If you want the pool to treat this connection as permanently checked-out,
75+
/// use [`.leak()`][Self::leak] instead.
76+
pub fn detach(mut self) -> DB::Connection {
6477
self.live
6578
.take()
6679
.expect("PoolConnection double-dropped")
6780
.float(&self.pool)
6881
.detach()
6982
}
83+
84+
/// Detach this connection from the pool, treating it as permanently checked-out.
85+
///
86+
/// This effectively will reduce the maximum capacity of the pool by 1 every time it is used.
87+
///
88+
/// If you don't want to impact the pool's capacity, use [`.detach()`][Self::detach] instead.
89+
pub fn leak(mut self) -> DB::Connection {
90+
self.live.take().expect("PoolConnection double-dropped").raw
91+
}
92+
93+
/// Test the connection to make sure it is still live before returning it to the pool.
94+
///
95+
/// This effectively runs the drop handler eagerly instead of spawning a task to do it.
96+
pub(crate) fn return_to_pool(&mut self) -> impl Future<Output = ()> + Send + 'static {
97+
// we want these to happen synchronously so the drop handler doesn't try to spawn a task anyway
98+
// this also makes the returned future `'static`
99+
let live = self.live.take();
100+
let pool = self.pool.clone();
101+
102+
async move {
103+
let mut floating = if let Some(live) = live {
104+
live.float(&pool)
105+
} else {
106+
return;
107+
};
108+
109+
// test the connection on-release to ensure it is still viable
110+
// if an Executor future/stream is dropped during an `.await` call, the connection
111+
// is likely to be left in an inconsistent state, in which case it should not be
112+
// returned to the pool; also of course, if it was dropped due to an error
113+
// this is simply a band-aid as SQLx-next (0.6) connections should be able
114+
// to recover from cancellations
115+
if let Err(e) = floating.raw.ping().await {
116+
log::warn!(
117+
"error occurred while testing the connection on-release: {}",
118+
e
119+
);
120+
121+
// we now consider the connection to be broken; just drop it to close
122+
// trying to close gracefully might cause something weird to happen
123+
drop(floating);
124+
} else {
125+
// if the connection is still viable, release it to the pool
126+
pool.release(floating);
127+
}
128+
}
129+
}
70130
}
71131

72132
/// Returns the connection to the [`Pool`][crate::pool::Pool] it was checked-out from.
73133
impl<DB: Database> Drop for PoolConnection<DB> {
74134
fn drop(&mut self) {
75-
if let Some(live) = self.live.take() {
76-
let pool = self.pool.clone();
77-
sqlx_rt::spawn(async move {
78-
let mut floating = live.float(&pool);
79-
80-
// test the connection on-release to ensure it is still viable
81-
// if an Executor future/stream is dropped during an `.await` call, the connection
82-
// is likely to be left in an inconsistent state, in which case it should not be
83-
// returned to the pool; also of course, if it was dropped due to an error
84-
// this is simply a band-aid as SQLx-next (0.6) connections should be able
85-
// to recover from cancellations
86-
if let Err(e) = floating.raw.ping().await {
87-
log::warn!(
88-
"error occurred while testing the connection on-release: {}",
89-
e
90-
);
91-
92-
// we now consider the connection to be broken; just drop it to close
93-
// trying to close gracefully might cause something weird to happen
94-
drop(floating);
95-
} else {
96-
// if the connection is still viable, release it to th epool
97-
pool.release(floating);
98-
}
99-
});
135+
if self.live.is_some() {
136+
sqlx_rt::spawn(self.return_to_pool());
100137
}
101138
}
102139
}

sqlx-core/src/postgres/listener.rs

+5
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,11 @@ impl Drop for PgListener {
263263
// Unregister any listeners before returning the connection to the pool.
264264
sqlx_rt::spawn(async move {
265265
let _ = conn.execute("UNLISTEN *").await;
266+
267+
// inline the drop handler from `PoolConnection` so it doesn't try to spawn another task
268+
// otherwise, it may trigger a panic if this task is dropped because the runtime is going away:
269+
// https://github.com/launchbadge/sqlx/issues/1389
270+
conn.return_to_pool().await;
266271
});
267272
}
268273
}

0 commit comments

Comments
 (0)