Skip to content

Commit

Permalink
Fix #3156
Browse files Browse the repository at this point in the history
Our `R2D2Connection::is_broken` implementations where broken in such a
way that they marked any valid connection as broken as soon as it was
checked into the pool again. This resulted in the pool opening new
connections everytime a connection was checked out of the pool, which
obviously removes the possibility of reusing the same connection again
and again. This commit fixes that issue and adds some tests to ensure
that we do not break this again in the future.
  • Loading branch information
weiznich committed May 2, 2022
1 parent 3228fec commit fb70fd7
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 22 deletions.
2 changes: 1 addition & 1 deletion diesel/src/mysql/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl crate::r2d2::R2D2Connection for MysqlConnection {
self.transaction_state
.status
.transaction_depth()
.map(|d| d.is_none())
.map(|d| d.is_some())
.unwrap_or(true)
}
}
Expand Down
2 changes: 1 addition & 1 deletion diesel/src/pg/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl crate::r2d2::R2D2Connection for PgConnection {
self.transaction_state
.status
.transaction_depth()
.map(|d| d.is_none())
.map(|d| d.is_some())
.unwrap_or(true)
}
}
Expand Down
144 changes: 125 additions & 19 deletions diesel/src/r2d2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,28 @@ where
}
}

#[derive(QueryId)]
pub(crate) struct CheckConnectionQuery;

impl<DB> QueryFragment<DB> for CheckConnectionQuery
where
DB: Backend,
{
fn walk_ast<'b>(
&'b self,
mut pass: crate::query_builder::AstPass<'_, 'b, DB>,
) -> QueryResult<()> {
pass.push_sql("SELECT 1");
Ok(())
}
}

impl Query for CheckConnectionQuery {
type SqlType = crate::sql_types::Integer;
}

impl<C> RunQueryDsl<C> for CheckConnectionQuery {}

#[cfg(test)]
mod tests {
use std::sync::mpsc;
Expand Down Expand Up @@ -394,26 +416,110 @@ mod tests {
let query = select("foo".into_sql::<Text>());
assert_eq!("foo", query.get_result::<String>(&mut conn).unwrap());
}
}

#[derive(QueryId)]
pub(crate) struct CheckConnectionQuery;
#[test]
fn check_pool_does_actually_hold_connections() {
use std::sync::atomic::{AtomicU32, Ordering};

#[derive(Debug)]
struct TestEventHandler {
acquire_count: Arc<AtomicU32>,
release_count: Arc<AtomicU32>,
checkin_count: Arc<AtomicU32>,
checkout_count: Arc<AtomicU32>,
}

impl<DB> QueryFragment<DB> for CheckConnectionQuery
where
DB: Backend,
{
fn walk_ast<'b>(
&'b self,
mut pass: crate::query_builder::AstPass<'_, 'b, DB>,
) -> QueryResult<()> {
pass.push_sql("SELECT 1");
Ok(())
}
}
impl r2d2::HandleEvent for TestEventHandler {
fn handle_acquire(&self, _event: r2d2::event::AcquireEvent) {
self.acquire_count.fetch_add(1, Ordering::Relaxed);
}
fn handle_release(&self, _event: r2d2::event::ReleaseEvent) {
self.release_count.fetch_add(1, Ordering::Relaxed);
}
fn handle_checkout(&self, _event: r2d2::event::CheckoutEvent) {
self.checkout_count.fetch_add(1, Ordering::Relaxed);
}
fn handle_checkin(&self, _event: r2d2::event::CheckinEvent) {
self.checkin_count.fetch_add(1, Ordering::Relaxed);
}
}

impl Query for CheckConnectionQuery {
type SqlType = crate::sql_types::Integer;
}
let acquire_count = Arc::new(AtomicU32::new(0));
let release_count = Arc::new(AtomicU32::new(0));
let checkin_count = Arc::new(AtomicU32::new(0));
let checkout_count = Arc::new(AtomicU32::new(0));

impl<C> RunQueryDsl<C> for CheckConnectionQuery {}
let handler = Box::new(TestEventHandler {
acquire_count: acquire_count.clone(),
release_count: release_count.clone(),
checkin_count: checkin_count.clone(),
checkout_count: checkout_count.clone(),
});

let manager = ConnectionManager::<TestConnection>::new(database_url());
let pool = Pool::builder()
.max_size(1)
.test_on_check_out(true)
.event_handler(handler)
.build(manager)
.unwrap();

assert_eq!(acquire_count.load(Ordering::Relaxed), 1);
assert_eq!(release_count.load(Ordering::Relaxed), 0);
assert_eq!(checkin_count.load(Ordering::Relaxed), 0);
assert_eq!(checkout_count.load(Ordering::Relaxed), 0);

// check that we reuse connections with the pool
{
let conn = pool.get().unwrap();

assert_eq!(acquire_count.load(Ordering::Relaxed), 1);
assert_eq!(release_count.load(Ordering::Relaxed), 0);
assert_eq!(checkin_count.load(Ordering::Relaxed), 0);
assert_eq!(checkout_count.load(Ordering::Relaxed), 1);
std::mem::drop(conn);
}

assert_eq!(acquire_count.load(Ordering::Relaxed), 1);
assert_eq!(release_count.load(Ordering::Relaxed), 0);
assert_eq!(checkin_count.load(Ordering::Relaxed), 1);
assert_eq!(checkout_count.load(Ordering::Relaxed), 1);

// check that we remove a connection with open transactions from the pool
{
let mut conn = pool.get().unwrap();

assert_eq!(acquire_count.load(Ordering::Relaxed), 1);
assert_eq!(release_count.load(Ordering::Relaxed), 0);
assert_eq!(checkin_count.load(Ordering::Relaxed), 1);
assert_eq!(checkout_count.load(Ordering::Relaxed), 2);

<TestConnection as Connection>::TransactionManager::begin_transaction(&mut *conn)
.unwrap();
}

assert_eq!(acquire_count.load(Ordering::Relaxed), 1);
assert_eq!(release_count.load(Ordering::Relaxed), 1);
assert_eq!(checkin_count.load(Ordering::Relaxed), 2);
assert_eq!(checkout_count.load(Ordering::Relaxed), 2);

// check that we remove a connection from the pool that was
// open during panicing
#[allow(unreachable_code, unused_variables)]
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let conn = pool.get();
assert_eq!(acquire_count.load(Ordering::Relaxed), 2);
assert_eq!(release_count.load(Ordering::Relaxed), 1);
assert_eq!(checkin_count.load(Ordering::Relaxed), 2);
assert_eq!(checkout_count.load(Ordering::Relaxed), 3);
panic!();
std::mem::drop(conn);
}))
.unwrap_err();

assert_eq!(acquire_count.load(Ordering::Relaxed), 2);
assert_eq!(release_count.load(Ordering::Relaxed), 2);
assert_eq!(checkin_count.load(Ordering::Relaxed), 3);
assert_eq!(checkout_count.load(Ordering::Relaxed), 3);
}
}
2 changes: 1 addition & 1 deletion diesel/src/sqlite/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl crate::r2d2::R2D2Connection for crate::sqlite::SqliteConnection {
self.transaction_state
.status
.transaction_depth()
.map(|d| d.is_none())
.map(|d| d.is_some())
.unwrap_or(true)
}
}
Expand Down

0 comments on commit fb70fd7

Please sign in to comment.