Skip to content

Commit

Permalink
Add Pool::get_owned (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidpdrsn authored Aug 11, 2021
1 parent 28e7b39 commit a34763c
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 5 deletions.
27 changes: 24 additions & 3 deletions bb8/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::error;
use std::fmt;
use std::marker::PhantomData;
Expand Down Expand Up @@ -54,6 +55,14 @@ impl<M: ManageConnection> Pool<M> {
self.inner.get().await
}

/// Retrieves an owned connection from the pool
///
/// Using an owning `PooledConnection` makes it easier to leak the connection pool. Therefore, [`Pool::get`]
/// (which stores a lifetime-bound reference to the pool) should be preferred whenever possible.
pub async fn get_owned(&self) -> Result<PooledConnection<'static, M>, RunError<M::Error>> {
self.inner.get_owned().await
}

/// Get a new dedicated connection that will not be managed by the pool.
/// An application may want a persistent connection (e.g. to do a
/// postgres LISTEN) that will not be closed or repurposed by the pool.
Expand Down Expand Up @@ -285,7 +294,7 @@ pub struct PooledConnection<'a, M>
where
M: ManageConnection,
{
pool: &'a PoolInner<M>,
pool: Cow<'a, PoolInner<M>>,
conn: Option<Conn<M::Connection>>,
}

Expand All @@ -295,7 +304,7 @@ where
{
pub(crate) fn new(pool: &'a PoolInner<M>, conn: Conn<M::Connection>) -> Self {
Self {
pool,
pool: Cow::Borrowed(pool),
conn: Some(conn),
}
}
Expand All @@ -305,6 +314,18 @@ where
}
}

impl<M> PooledConnection<'static, M>
where
M: ManageConnection,
{
pub(crate) fn new_owned(pool: PoolInner<M>, conn: Conn<M::Connection>) -> Self {
Self {
pool: Cow::Owned(pool),
conn: Some(conn),
}
}
}

impl<'a, M> Deref for PooledConnection<'a, M>
where
M: ManageConnection,
Expand Down Expand Up @@ -340,7 +361,7 @@ where
M: ManageConnection,
{
fn drop(&mut self) {
self.pool.put_back(self.conn.take());
self.pool.as_ref().put_back(self.conn.take())
}
}

Expand Down
27 changes: 25 additions & 2 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,36 @@ where
}

pub(crate) async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> {
self.make_pooled(|this, conn| PooledConnection::new(this, conn))
.await
}

pub(crate) async fn get_owned(
&self,
) -> Result<PooledConnection<'static, M>, RunError<M::Error>> {
self.make_pooled(|this, conn| {
let pool = PoolInner {
inner: Arc::clone(&this.inner),
};
PooledConnection::new_owned(pool, conn)
})
.await
}

pub(crate) async fn make_pooled<'a, 'b, F>(
&'a self,
make_pool: F,
) -> Result<PooledConnection<'b, M>, RunError<M::Error>>
where
F: Fn(&'a Self, Conn<M::Connection>) -> PooledConnection<'b, M>,
{
loop {
let mut conn = {
let mut locked = self.inner.internals.lock();
match locked.pop(&self.inner.statics) {
Some((conn, approvals)) => {
self.spawn_replenishing_approvals(approvals);
PooledConnection::new(self, conn)
make_pool(self, conn)
}
None => break,
}
Expand Down Expand Up @@ -117,7 +140,7 @@ where
};

match timeout(self.inner.statics.connection_timeout, rx).await {
Ok(Ok(mut guard)) => Ok(PooledConnection::new(self, guard.extract())),
Ok(Ok(mut guard)) => Ok(make_pool(self, guard.extract())),
_ => Err(RunError::TimedOut),
}
}
Expand Down

0 comments on commit a34763c

Please sign in to comment.