Skip to content

Commit

Permalink
Add a way to opt-out of pooled connection reset
Browse files Browse the repository at this point in the history
  • Loading branch information
blackbeam committed Apr 14, 2023
1 parent 32c6f2a commit 7c6572d
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 7 deletions.
7 changes: 7 additions & 0 deletions src/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@ impl Conn {
self.inner.last_ok_packet.as_ref()
}

/// Turns on/off automatic connection reset (see [`crate::PoolOpts::with_reset_connection`]).
///
/// Only makes sense for pooled connections.
pub fn reset_connection(&mut self, reset_connection: bool) {
self.inner.reset_upon_returning_to_a_pool = reset_connection;
}

pub(crate) fn stream_mut(&mut self) -> Result<&mut Stream> {
self.inner.stream_mut()
}
Expand Down
55 changes: 52 additions & 3 deletions src/conn/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ impl Pool {

/// Async function that resolves to `Conn`.
pub fn get_conn(&self) -> GetConn {
GetConn::new(self, true)
let reset_connection = self.opts.pool_opts().reset_connection();
GetConn::new(self, reset_connection)
}

/// Starts a new transaction.
Expand Down Expand Up @@ -382,7 +383,6 @@ mod test {
future::{join_all, select, select_all, try_join_all},
try_join, FutureExt,
};
use mysql_common::row::Row;
use tokio::time::{sleep, timeout};

use std::{
Expand All @@ -396,7 +396,7 @@ mod test {
opts::PoolOpts,
prelude::*,
test_misc::get_opts,
PoolConstraints, TxOpts,
PoolConstraints, Row, TxOpts, Value,
};

macro_rules! conn_ex_field {
Expand All @@ -411,6 +411,55 @@ mod test {
};
}

#[tokio::test]
async fn should_opt_out_of_connection_reset() -> super::Result<()> {
let pool_opts = PoolOpts::new().with_constraints(PoolConstraints::new(1, 1).unwrap());
let opts = get_opts().pool_opts(pool_opts.clone());

let pool = Pool::new(opts.clone());

let mut conn = pool.get_conn().await.unwrap();
assert_eq!(
conn.query_first::<Value, _>("SELECT @foo").await?.unwrap(),
Value::NULL
);
conn.query_drop("SET @foo = 'foo'").await?;
assert_eq!(
conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
"foo",
);
drop(conn);

conn = pool.get_conn().await.unwrap();
assert_eq!(
conn.query_first::<Value, _>("SELECT @foo").await?.unwrap(),
Value::NULL
);
conn.query_drop("SET @foo = 'foo'").await?;
conn.reset_connection(false);
drop(conn);

conn = pool.get_conn().await.unwrap();
assert_eq!(
conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
"foo",
);
drop(conn);
pool.disconnect().await.unwrap();

let pool = Pool::new(opts.pool_opts(pool_opts.with_reset_connection(false)));
conn = pool.get_conn().await.unwrap();
conn.query_drop("SET @foo = 'foo'").await?;
drop(conn);
conn = pool.get_conn().await.unwrap();
assert_eq!(
conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
"foo",
);
drop(conn);
pool.disconnect().await
}

#[test]
fn should_not_hang() -> super::Result<()> {
pub struct Database {
Expand Down
2 changes: 1 addition & 1 deletion src/conn/routines/change_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::Conn;

use super::Routine;

/// A routine that performs `COM_RESET_CONNECTION`.
/// A routine that performs `COM_CHANGE_USER`.
#[derive(Debug, Copy, Clone)]
pub struct ChangeUser;

Expand Down
65 changes: 62 additions & 3 deletions src/opts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,15 @@ pub struct PoolOpts {
constraints: PoolConstraints,
inactive_connection_ttl: Duration,
ttl_check_interval: Duration,
reset_connection: bool,
}

impl PoolOpts {
/// Calls `Self::default`.
pub fn new() -> Self {
Self::default()
}

/// Creates the default [`PoolOpts`] with the given constraints.
pub fn with_constraints(mut self, constraints: PoolConstraints) -> Self {
self.constraints = constraints;
Expand All @@ -223,6 +229,50 @@ impl PoolOpts {
self.constraints
}

/// Sets whether to reset connection upon returning it to a pool (defaults to `true`).
///
/// Default behavior increases reliability but comes with cons:
///
/// * reset procedure removes all prepared statements, i.e. kills prepared statements cache
/// * connection reset is quite fast but requires additional client-server roundtrip
/// (might also requires requthentication for older servers)
///
/// The purpose of the reset procedure is to:
///
/// * rollback any opened transactions (`mysql_async` is able to do this without explicit reset)
/// * reset transaction isolation level
/// * reset session variables
/// * delete user variables
/// * remove temporary tables
/// * remove all PREPARE statement (this action kills prepared statements cache)
///
/// So to encrease overall performance you can safely opt-out of the default behavior
/// if you are not willing to change the session state in an unpleasant way.
///
/// It is also possible to selectively opt-in/out using [`Conn::reset_connection`].
///
/// # Connection URL
///
/// You can use `reset_connection` URL parameter to set this value. E.g.
///
/// ```
/// # use mysql_async::*;
/// # use std::time::Duration;
/// # fn main() -> Result<()> {
/// let opts = Opts::from_url("mysql://localhost/db?reset_connection=false")?;
/// assert_eq!(opts.pool_opts().reset_connection(), false);
/// # Ok(()) }
/// ```
pub fn with_reset_connection(mut self, reset_connection: bool) -> Self {
self.reset_connection = reset_connection;
self
}

/// Returns the `reset_connection` value (see [`PoolOpts::with_reset_connection`]).
pub fn reset_connection(&self) -> bool {
self.reset_connection
}

/// Pool will recycle inactive connection if it is outside of the lower bound of the pool
/// and if it is idling longer than this value (defaults to
/// [`DEFAULT_INACTIVE_CONNECTION_TTL`]).
Expand Down Expand Up @@ -309,6 +359,7 @@ impl Default for PoolOpts {
constraints: DEFAULT_POOL_CONSTRAINTS,
inactive_connection_ttl: DEFAULT_INACTIVE_CONNECTION_TTL,
ttl_check_interval: DEFAULT_TTL_CHECK_INTERVAL,
reset_connection: true,
}
}
}
Expand Down Expand Up @@ -1340,7 +1391,6 @@ fn mysqlopts_from_url(url: &Url) -> std::result::Result<MysqlOpts, UrlError> {
Ok(value) => {
opts.pool_opts = opts
.pool_opts
.clone()
.with_inactive_connection_ttl(Duration::from_secs(value))
}
_ => {
Expand All @@ -1355,7 +1405,6 @@ fn mysqlopts_from_url(url: &Url) -> std::result::Result<MysqlOpts, UrlError> {
Ok(value) => {
opts.pool_opts = opts
.pool_opts
.clone()
.with_ttl_check_interval(Duration::from_secs(value))
}
_ => {
Expand Down Expand Up @@ -1421,6 +1470,16 @@ fn mysqlopts_from_url(url: &Url) -> std::result::Result<MysqlOpts, UrlError> {
});
}
}
} else if key == "reset_connection" {
match bool::from_str(&*value) {
Ok(parsed) => opts.pool_opts = opts.pool_opts.with_reset_connection(parsed),
Err(_) => {
return Err(UrlError::InvalidParamValue {
param: key.to_string(),
value,
});
}
}
} else if key == "tcp_nodelay" {
match bool::from_str(&*value) {
Ok(value) => opts.tcp_nodelay = value,
Expand Down Expand Up @@ -1538,7 +1597,7 @@ fn mysqlopts_from_url(url: &Url) -> std::result::Result<MysqlOpts, UrlError> {
}

if let Some(pool_constraints) = PoolConstraints::new(pool_min, pool_max) {
opts.pool_opts = opts.pool_opts.clone().with_constraints(pool_constraints);
opts.pool_opts = opts.pool_opts.with_constraints(pool_constraints);
} else {
return Err(UrlError::InvalidPoolConstraints {
min: pool_min,
Expand Down

0 comments on commit 7c6572d

Please sign in to comment.