Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pool): don't leak connections if drop task doesn't run #1799

Merged
merged 1 commit into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 27 additions & 22 deletions sqlx-core/src/pool/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ pub(super) struct Idle<DB: Database> {
}

/// RAII wrapper for connections being handled by functions that may drop them
pub(super) struct Floating<'p, C> {
pub(super) struct Floating<DB: Database, C> {
pub(super) inner: C,
pub(super) guard: DecrementSizeGuard<'p>,
pub(super) guard: DecrementSizeGuard<DB>,
}

const DEREF_ERR: &str = "(bug) connection already released to pool";
Expand Down Expand Up @@ -89,7 +89,7 @@ impl<DB: Database> PoolConnection<DB> {
self.live
.take()
.expect("PoolConnection double-dropped")
.float(&self.pool)
.float(self.pool.clone())
.detach()
}

Expand All @@ -106,19 +106,20 @@ impl<DB: Database> PoolConnection<DB> {
///
/// This effectively runs the drop handler eagerly instead of spawning a task to do it.
pub(crate) fn return_to_pool(&mut self) -> impl Future<Output = ()> + Send + 'static {
// we want these to happen synchronously so the drop handler doesn't try to spawn a task anyway
// this also makes the returned future `'static`
let live = self.live.take();
let pool = self.pool.clone();
// float the connection in the pool before we move into the task
// in case the returned `Future` isn't executed, like if it's spawned into a dying runtime
// https://github.com/launchbadge/sqlx/issues/1396
let floating = self.live.take().map(|live| live.float(self.pool.clone()));

async move {
let mut floating = if let Some(live) = live {
live.float(&pool)
let mut floating = if let Some(floating) = floating {
floating
} else {
return;
};

// test the connection on-release to ensure it is still viable
// test the connection on-release to ensure it is still viable,
// and flush anything time-sensitive like transaction rollbacks
// if an Executor future/stream is dropped during an `.await` call, the connection
// is likely to be left in an inconsistent state, in which case it should not be
// returned to the pool; also of course, if it was dropped due to an error
Expand All @@ -135,7 +136,7 @@ impl<DB: Database> PoolConnection<DB> {
drop(floating);
} else {
// if the connection is still viable, release it to the pool
pool.release(floating);
floating.release();
}
}
}
Expand All @@ -157,7 +158,7 @@ impl<DB: Database> Drop for PoolConnection<DB> {
}

impl<DB: Database> Live<DB> {
pub fn float(self, pool: &SharedPool<DB>) -> Floating<'_, Self> {
pub fn float(self, pool: Arc<SharedPool<DB>>) -> Floating<DB, Self> {
Floating {
inner: self,
// create a new guard from a previously leaked permit
Expand Down Expand Up @@ -187,8 +188,8 @@ impl<DB: Database> DerefMut for Idle<DB> {
}
}

impl<'s, DB: Database> Floating<'s, Live<DB>> {
pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<'s>) -> Self {
impl<DB: Database> Floating<DB, Live<DB>> {
pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<DB>) -> Self {
Self {
inner: Live {
raw: conn,
Expand All @@ -213,6 +214,10 @@ impl<'s, DB: Database> Floating<'s, Live<DB>> {
}
}

pub fn release(self) {
self.guard.pool.clone().release(self);
}

pub async fn close(self) -> Result<(), Error> {
// `guard` is dropped as intended
self.inner.raw.close().await
Expand All @@ -222,19 +227,19 @@ impl<'s, DB: Database> Floating<'s, Live<DB>> {
self.inner.raw
}

pub fn into_idle(self) -> Floating<'s, Idle<DB>> {
pub fn into_idle(self) -> Floating<DB, Idle<DB>> {
Floating {
inner: self.inner.into_idle(),
guard: self.guard,
}
}
}

impl<'s, DB: Database> Floating<'s, Idle<DB>> {
impl<DB: Database> Floating<DB, Idle<DB>> {
pub fn from_idle(
idle: Idle<DB>,
pool: &'s SharedPool<DB>,
permit: SemaphoreReleaser<'s>,
pool: Arc<SharedPool<DB>>,
permit: SemaphoreReleaser<'_>,
) -> Self {
Self {
inner: idle,
Expand All @@ -246,14 +251,14 @@ impl<'s, DB: Database> Floating<'s, Idle<DB>> {
self.live.raw.ping().await
}

pub fn into_live(self) -> Floating<'s, Live<DB>> {
pub fn into_live(self) -> Floating<DB, Live<DB>> {
Floating {
inner: self.inner.live,
guard: self.guard,
}
}

pub async fn close(self) -> DecrementSizeGuard<'s> {
pub async fn close(self) -> DecrementSizeGuard<DB> {
// `guard` is dropped as intended
if let Err(e) = self.inner.live.raw.close().await {
log::debug!("error occurred while closing the pool connection: {}", e);
Expand All @@ -262,15 +267,15 @@ impl<'s, DB: Database> Floating<'s, Idle<DB>> {
}
}

impl<C> Deref for Floating<'_, C> {
impl<DB: Database, C> Deref for Floating<DB, C> {
type Target = C;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<C> DerefMut for Floating<'_, C> {
impl<DB: Database, C> DerefMut for Floating<DB, C> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
Expand Down
69 changes: 32 additions & 37 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<DB: Database> SharedPool<DB> {
self.is_closed.load(Ordering::Acquire)
}

pub(super) async fn close(&self) {
pub(super) async fn close(self: &Arc<Self>) {
let already_closed = self.is_closed.swap(true, Ordering::AcqRel);

if !already_closed {
Expand All @@ -93,12 +93,12 @@ impl<DB: Database> SharedPool<DB> {
.await;

while let Some(idle) = self.idle_conns.pop() {
let _ = idle.live.float(self).close().await;
let _ = idle.live.float((*self).clone()).close().await;
}
}

#[inline]
pub(super) fn try_acquire(&self) -> Option<Floating<'_, Idle<DB>>> {
pub(super) fn try_acquire(self: &Arc<Self>) -> Option<Floating<DB, Idle<DB>>> {
if self.is_closed() {
return None;
}
Expand All @@ -108,17 +108,17 @@ impl<DB: Database> SharedPool<DB> {
}

fn pop_idle<'a>(
&'a self,
self: &'a Arc<Self>,
permit: SemaphoreReleaser<'a>,
) -> Result<Floating<'a, Idle<DB>>, SemaphoreReleaser<'a>> {
) -> Result<Floating<DB, Idle<DB>>, SemaphoreReleaser<'a>> {
if let Some(idle) = self.idle_conns.pop() {
Ok(Floating::from_idle(idle, self, permit))
Ok(Floating::from_idle(idle, (*self).clone(), permit))
} else {
Err(permit)
}
}

pub(super) fn release(&self, mut floating: Floating<'_, Live<DB>>) {
pub(super) fn release(&self, mut floating: Floating<DB, Live<DB>>) {
if let Some(test) = &self.options.after_release {
if !test(&mut floating.raw) {
// drop the connection and do not return it to the pool
Expand All @@ -141,24 +141,24 @@ impl<DB: Database> SharedPool<DB> {
///
/// Returns `None` if we are at max_connections or if the pool is closed.
pub(super) fn try_increment_size<'a>(
&'a self,
self: &'a Arc<Self>,
permit: SemaphoreReleaser<'a>,
) -> Result<DecrementSizeGuard<'a>, SemaphoreReleaser<'a>> {
) -> Result<DecrementSizeGuard<DB>, SemaphoreReleaser<'a>> {
match self
.size
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |size| {
size.checked_add(1)
.filter(|size| size <= &self.options.max_connections)
}) {
// we successfully incremented the size
Ok(_) => Ok(DecrementSizeGuard::from_permit(self, permit)),
Ok(_) => Ok(DecrementSizeGuard::from_permit((*self).clone(), permit)),
// the pool is at max capacity
Err(_) => Err(permit),
}
}

#[allow(clippy::needless_lifetimes)]
pub(super) async fn acquire<'s>(&'s self) -> Result<Floating<'s, Live<DB>>, Error> {
pub(super) async fn acquire(self: &Arc<Self>) -> Result<Floating<DB, Live<DB>>, Error> {
if self.is_closed() {
return Err(Error::PoolClosed);
}
Expand Down Expand Up @@ -206,11 +206,11 @@ impl<DB: Database> SharedPool<DB> {
.map_err(|_| Error::PoolTimedOut)?
}

pub(super) async fn connection<'s>(
&'s self,
pub(super) async fn connection(
self: &Arc<Self>,
deadline: Instant,
guard: DecrementSizeGuard<'s>,
) -> Result<Floating<'s, Live<DB>>, Error> {
guard: DecrementSizeGuard<DB>,
) -> Result<Floating<DB, Live<DB>>, Error> {
if self.is_closed() {
return Err(Error::PoolClosed);
}
Expand Down Expand Up @@ -275,10 +275,10 @@ fn is_beyond_idle<DB: Database>(idle: &Idle<DB>, options: &PoolOptions<DB>) -> b
.map_or(false, |timeout| idle.since.elapsed() > timeout)
}

async fn check_conn<'s: 'p, 'p, DB: Database>(
mut conn: Floating<'s, Idle<DB>>,
options: &'p PoolOptions<DB>,
) -> Result<Floating<'s, Live<DB>>, DecrementSizeGuard<'s>> {
async fn check_conn<DB: Database>(
mut conn: Floating<DB, Idle<DB>>,
options: &PoolOptions<DB>,
) -> Result<Floating<DB, Live<DB>>, DecrementSizeGuard<DB>> {
// If the connection we pulled has expired, close the connection and
// immediately create a new connection
if is_beyond_lifetime(&conn, options) {
Expand Down Expand Up @@ -337,7 +337,7 @@ fn spawn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>) {
});
}

async fn do_reap<DB: Database>(pool: &SharedPool<DB>) {
async fn do_reap<DB: Database>(pool: &Arc<SharedPool<DB>>) {
// reap at most the current size minus the minimum idle
let max_reaped = pool.size().saturating_sub(pool.options.min_connections);

Expand All @@ -363,39 +363,34 @@ async fn do_reap<DB: Database>(pool: &SharedPool<DB>) {
///
/// Will decrement the pool size if dropped, to avoid semantically "leaking" connections
/// (where the pool thinks it has more connections than it does).
pub(in crate::pool) struct DecrementSizeGuard<'a> {
size: &'a AtomicU32,
semaphore: &'a Semaphore,
pub(in crate::pool) struct DecrementSizeGuard<DB: Database> {
pub(crate) pool: Arc<SharedPool<DB>>,
dropped: bool,
}

impl<'a> DecrementSizeGuard<'a> {
impl<DB: Database> DecrementSizeGuard<DB> {
/// Create a new guard that will release a semaphore permit on-drop.
pub fn new_permit<DB: Database>(pool: &'a SharedPool<DB>) -> Self {
pub fn new_permit(pool: Arc<SharedPool<DB>>) -> Self {
Self {
size: &pool.size,
semaphore: &pool.semaphore,
pool,
dropped: false,
}
}

pub fn from_permit<DB: Database>(
pool: &'a SharedPool<DB>,
mut permit: SemaphoreReleaser<'a>,
) -> Self {
pub fn from_permit(pool: Arc<SharedPool<DB>>, mut permit: SemaphoreReleaser<'_>) -> Self {
// here we effectively take ownership of the permit
permit.disarm();
Self::new_permit(pool)
}

/// Return `true` if the internal references point to the same fields in `SharedPool`.
pub fn same_pool<DB: Database>(&self, pool: &'a SharedPool<DB>) -> bool {
ptr::eq(self.size, &pool.size)
pub fn same_pool(&self, pool: &SharedPool<DB>) -> bool {
ptr::eq(&*self.pool, pool)
}

/// Release the semaphore permit without decreasing the pool size.
fn release_permit(self) {
self.semaphore.release(1);
self.pool.semaphore.release(1);
self.cancel();
}

Expand All @@ -404,13 +399,13 @@ impl<'a> DecrementSizeGuard<'a> {
}
}

impl Drop for DecrementSizeGuard<'_> {
impl<DB: Database> Drop for DecrementSizeGuard<DB> {
fn drop(&mut self) {
assert!(!self.dropped, "double-dropped!");
self.dropped = true;
self.size.fetch_sub(1, Ordering::SeqCst);
self.pool.size.fetch_sub(1, Ordering::SeqCst);

// and here we release the permit we got on construction
self.semaphore.release(1);
self.pool.semaphore.release(1);
}
}
2 changes: 1 addition & 1 deletion sqlx-core/src/pool/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl<DB: Database> PoolOptions<DB> {
}
}

async fn init_min_connections<DB: Database>(pool: &SharedPool<DB>) -> Result<(), Error> {
async fn init_min_connections<DB: Database>(pool: &Arc<SharedPool<DB>>) -> Result<(), Error> {
for _ in 0..cmp::max(pool.options.min_connections, 1) {
let deadline = Instant::now() + pool.options.connect_timeout;
let permit = pool.semaphore.acquire(1).await;
Expand Down