Skip to content

Commit

Permalink
Add support for tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
cloneable committed Jan 26, 2023
1 parent fa7a5a7 commit 0fdecd5
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 18 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ socket2 = "0.4.2"
thiserror = "1.0.4"
tokio = { version = "1.0", features = ["io-util", "fs", "net", "time", "rt"] }
tokio-util = { version = "0.7.2", features = ["codec", "io"] }
tracing = { version = "0.1.37", default-features = false, features = ["attributes"], optional = true }
twox-hash = "1"
url = "2.1"

Expand Down Expand Up @@ -100,6 +101,7 @@ rustls-tls = [
"webpki-roots",
"rustls-pemfile",
]
tracing = ["dep:tracing"]
nightly = []

[lib]
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,19 @@ as well as `native-tls`-based TLS support.
[dependencies]
mysql_async = { version = "*", default-features = false, features = ["rustls-tls"] }

* `tracing` – enables instrumentation via `tracing` package.
Primary operations (`query`, `prepare`, `exec`) are instrumented at `INFO` level.
Remaining operations, incl. `get_conn`, are instrumented at `DEBUG` level.
Also at `DEBUG`, the SQL queries and parameters are added to the `query`, `prepare`
and `exec` spans.

**Example:**

```toml
[dependencies]
mysql_async = { version = "*", features = ["tracing"] }
```

[myslqcommonfeatures]: https://github.com/blackbeam/rust_mysql_common#crate-features

## TLS/SSL Support
Expand Down
13 changes: 13 additions & 0 deletions src/conn/pool/futures/get_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ use std::{
};

use futures_core::ready;
#[cfg(feature = "tracing")]
use {
std::sync::Arc,
tracing::{debug_span, Span},
};

use crate::{
conn::{
Expand Down Expand Up @@ -64,6 +69,8 @@ pub struct GetConn {
pub(crate) queue_id: Option<QueueId>,
pub(crate) pool: Option<Pool>,
pub(crate) inner: GetConnInner,
#[cfg(feature = "tracing")]
span: Arc<Span>,
}

impl GetConn {
Expand All @@ -72,6 +79,8 @@ impl GetConn {
queue_id: None,
pool: Some(pool.clone()),
inner: GetConnInner::New,
#[cfg(feature = "tracing")]
span: Arc::new(debug_span!("mysql_async::get_conn")),
}
}

Expand All @@ -94,6 +103,10 @@ impl Future for GetConn {
type Output = Result<Conn>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(feature = "tracing")]
let span = self.span.clone();
#[cfg(feature = "tracing")]
let _span_guard = span.enter();
loop {
match self.inner {
GetConnInner::New => {
Expand Down
35 changes: 32 additions & 3 deletions src/conn/routines/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::mem;
use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use mysql_common::{packets::ComStmtExecuteRequestBuilder, params::Params};
#[cfg(feature = "tracing")]
use tracing::{field, info_span, Instrument, Level, Span};

use crate::{BinaryProtocol, Conn, DriverError, Statement};

Expand All @@ -23,10 +25,33 @@ impl<'a> ExecRoutine<'a> {

impl Routine<()> for ExecRoutine<'_> {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
async move {
#[cfg(feature = "tracing")]
let span = info_span!(
"mysql_async::exec",
mysql_async.connection.id = conn.id(),
mysql_async.statement.id = self.stmt.id(),
mysql_async.query.params = field::Empty,
);

let fut = async move {
loop {
match self.params {
Params::Positional(ref params) => {
#[cfg(feature = "tracing")]
if tracing::span_enabled!(Level::DEBUG) {
// The params may contain sensitive data. Restrict to DEBUG.
// TODO: make more efficient
// TODO: use intersperse() once stable
let sep = std::iter::repeat(", ");
let ps = params
.iter()
.map(|p| p.as_sql(true))
.zip(sep)
.map(|(val, sep)| val + sep)
.collect::<String>();
Span::current().record("mysql_async.query.params", ps);
}

if self.stmt.num_params() as usize != params.len() {
Err(DriverError::StmtParamsMismatch {
required: self.stmt.num_params(),
Expand Down Expand Up @@ -76,7 +101,11 @@ impl Routine<()> for ExecRoutine<'_> {
}
}
Ok(())
}
.boxed()
};

#[cfg(feature = "tracing")]
let fut = fut.instrument(span);

fut.boxed()
}
}
17 changes: 14 additions & 3 deletions src/conn/routines/next_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::marker::PhantomData;

use futures_core::future::BoxFuture;
use futures_util::FutureExt;
#[cfg(feature = "tracing")]
use tracing::{debug_span, Instrument};

use crate::{queryable::Protocol, Conn};

Expand All @@ -22,11 +24,20 @@ where
P: Protocol,
{
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
#[cfg(feature = "tracing")]
let span = debug_span!(
"mysql_async::next_set",
mysql_async.connection.id = conn.id()
);
conn.sync_seq_id();
async move {
let fut = async move {
conn.read_result_set::<P>(false).await?;
Ok(())
}
.boxed()
};

#[cfg(feature = "tracing")]
let fut = fut.instrument(span);

fut.boxed()
}
}
15 changes: 12 additions & 3 deletions src/conn/routines/ping.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use mysql_common::constants::Command;
#[cfg(feature = "tracing")]
use tracing::{debug_span, Instrument};

use crate::Conn;

Expand All @@ -12,11 +14,18 @@ pub struct PingRoutine;

impl Routine<()> for PingRoutine {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
async move {
#[cfg(feature = "tracing")]
let span = debug_span!("mysql_async::ping", mysql_async.connection.id = conn.id());

let fut = async move {
conn.write_command_data(Command::COM_PING, &[]).await?;
conn.read_packet().await?;
Ok(())
}
.boxed()
};

#[cfg(feature = "tracing")]
let fut = fut.instrument(span);

fut.boxed()
}
}
30 changes: 27 additions & 3 deletions src/conn/routines/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::{borrow::Cow, sync::Arc};
use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use mysql_common::constants::Command;
#[cfg(feature = "tracing")]
use tracing::{field, info_span, Instrument, Level, Span};

use crate::{queryable::stmt::StmtInner, Conn};

Expand All @@ -24,12 +26,30 @@ impl PrepareRoutine {

impl Routine<Arc<StmtInner>> for PrepareRoutine {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<Arc<StmtInner>>> {
async move {
#[cfg(feature = "tracing")]
let span = info_span!(
"mysql_async::prepare",
mysql_async.connection.id = conn.id(),
mysql_async.statement.id = field::Empty,
mysql_async.query.sql = field::Empty,
);
#[cfg(feature = "tracing")]
if tracing::span_enabled!(Level::DEBUG) {
// The statement may contain sensitive data. Restrict to DEBUG.
span.record(
"mysql_async.query.sql",
String::from_utf8_lossy(&*self.query).as_ref(),
);
}

let fut = async move {
conn.write_command_data(Command::COM_STMT_PREPARE, &self.query)
.await?;

let packet = conn.read_packet().await?;
let mut inner_stmt = StmtInner::from_payload(&*packet, conn.id(), self.query.clone())?;
#[cfg(feature = "tracing")]
Span::current().record("mysql_async.statement.id", inner_stmt.id());

if inner_stmt.num_params() > 0 {
let params = conn.read_column_defs(inner_stmt.num_params()).await?;
Expand All @@ -42,7 +62,11 @@ impl Routine<Arc<StmtInner>> for PrepareRoutine {
}

Ok(Arc::new(inner_stmt))
}
.boxed()
};

#[cfg(feature = "tracing")]
let fut = fut.instrument(span);

fut.boxed()
}
}
27 changes: 24 additions & 3 deletions src/conn/routines/query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use mysql_common::constants::Command;
#[cfg(feature = "tracing")]
use tracing::{field, info_span, Instrument, Level};

use crate::{Conn, TextProtocol};

Expand All @@ -20,12 +22,31 @@ impl<'a> QueryRoutine<'a> {

impl Routine<()> for QueryRoutine<'_> {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
async move {
#[cfg(feature = "tracing")]
let span = info_span!(
"mysql_async::query",
mysql_async.connection.id = conn.id(),
mysql_async.query.sql = field::Empty
);
#[cfg(feature = "tracing")]
if tracing::span_enabled!(Level::DEBUG) {
// The statement may contain sensitive data. Restrict to DEBUG.
span.record(
"mysql_async.query.sql",
String::from_utf8_lossy(self.data).as_ref(),
);
}

let fut = async move {
conn.write_command_data(Command::COM_QUERY, self.data)
.await?;
conn.read_result_set::<TextProtocol>(true).await?;
Ok(())
}
.boxed()
};

#[cfg(feature = "tracing")]
let fut = fut.instrument(span);

fut.boxed()
}
}
15 changes: 12 additions & 3 deletions src/conn/routines/reset.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use mysql_common::constants::Command;
#[cfg(feature = "tracing")]
use tracing::{debug_span, Instrument};

use crate::Conn;

Expand All @@ -12,12 +14,19 @@ pub struct ResetRoutine;

impl Routine<()> for ResetRoutine {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
async move {
#[cfg(feature = "tracing")]
let span = debug_span!("mysql_async::reset", mysql_async.connection.id = conn.id());

let fut = async move {
conn.write_command_data(Command::COM_RESET_CONNECTION, &[])
.await?;
conn.read_packet().await?;
Ok(())
}
.boxed()
};

#[cfg(feature = "tracing")]
let fut = fut.instrument(span);

fut.boxed()
}
}

0 comments on commit 0fdecd5

Please sign in to comment.