Skip to content

Commit

Permalink
sync: don't leak tracing spans in mutex guards (#5469)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn authored Feb 18, 2023
1 parent b921fe4 commit 24aac0a
Show file tree
Hide file tree
Showing 8 changed files with 492 additions and 359 deletions.
170 changes: 109 additions & 61 deletions tokio/src/sync/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ use crate::util::trace;

use std::cell::UnsafeCell;
use std::error::Error;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::{fmt, marker, mem};
use std::{fmt, mem};

/// An asynchronous `Mutex`-like type.
///
Expand Down Expand Up @@ -144,6 +145,8 @@ pub struct Mutex<T: ?Sized> {
#[clippy::has_significant_drop]
#[must_use = "if unused the Mutex will immediately unlock"]
pub struct MutexGuard<'a, T: ?Sized> {
// When changing the fields in this struct, make sure to update the
// `skip_drop` method.
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
lock: &'a Mutex<T>,
Expand Down Expand Up @@ -179,10 +182,29 @@ pub struct OwnedMutexGuard<T: ?Sized> {
#[clippy::has_significant_drop]
#[must_use = "if unused the Mutex will immediately unlock"]
pub struct MappedMutexGuard<'a, T: ?Sized> {
// When changing the fields in this struct, make sure to update the
// `skip_drop` method.
s: &'a semaphore::Semaphore,
data: *mut T,
// Needed to tell the borrow checker that we are holding a `&mut T`
marker: marker::PhantomData<&'a mut T>,
marker: PhantomData<&'a mut T>,
}

/// A helper type used when taking apart a `MutexGuard` without running its
/// Drop implementation.
#[allow(dead_code)] // Unused fields are still used in Drop.
struct MutexGuardInner<'a, T: ?Sized> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
lock: &'a Mutex<T>,
}

/// A helper type used when taking apart a `MappedMutexGuard` without running
/// its Drop implementation.
#[allow(dead_code)] // Unused fields are still used in Drop.
struct MappedMutexGuardInner<'a, T: ?Sized> {
s: &'a semaphore::Semaphore,
data: *mut T,
}

// As long as T: Send, it's fine to send and share Mutex<T> between threads.
Expand Down Expand Up @@ -340,15 +362,27 @@ impl<T: ?Sized> Mutex<T> {
/// }
/// ```
pub async fn lock(&self) -> MutexGuard<'_, T> {
let acquire_fut = async {
self.acquire().await;

MutexGuard {
lock: self,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span.clone(),
}
};

#[cfg(all(tokio_unstable, feature = "tracing"))]
trace::async_op(
|| self.acquire(),
let acquire_fut = trace::async_op(
move || acquire_fut,
self.resource_span.clone(),
"Mutex::lock",
"poll",
false,
)
.await;
);

#[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
let guard = acquire_fut.await;

#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
Expand All @@ -358,14 +392,7 @@ impl<T: ?Sized> Mutex<T> {
);
});

#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
self.acquire().await;

MutexGuard {
lock: self,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span.clone(),
}
guard
}

/// Blockingly locks this `Mutex`. When the lock has been acquired, function returns a
Expand Down Expand Up @@ -512,34 +539,39 @@ impl<T: ?Sized> Mutex<T> {
/// [`Arc`]: std::sync::Arc
pub async fn lock_owned(self: Arc<Self>) -> OwnedMutexGuard<T> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
trace::async_op(
|| self.acquire(),
self.resource_span.clone(),
let resource_span = self.resource_span.clone();

let acquire_fut = async {
self.acquire().await;

OwnedMutexGuard {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span.clone(),
lock: self,
}
};

#[cfg(all(tokio_unstable, feature = "tracing"))]
let acquire_fut = trace::async_op(
move || acquire_fut,
resource_span,
"Mutex::lock_owned",
"poll",
false,
)
.await;
);

#[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
let guard = acquire_fut.await;

#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
guard.resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
locked = true,
);
});

#[cfg(all(tokio_unstable, feature = "tracing"))]
let resource_span = self.resource_span.clone();

#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
self.acquire().await;

OwnedMutexGuard {
lock: self,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span,
}
guard
}

async fn acquire(&self) {
Expand Down Expand Up @@ -570,6 +602,12 @@ impl<T: ?Sized> Mutex<T> {
pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> {
match self.s.try_acquire(1) {
Ok(_) => {
let guard = MutexGuard {
lock: self,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span.clone(),
};

#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
tracing::trace!(
Expand All @@ -578,11 +616,7 @@ impl<T: ?Sized> Mutex<T> {
);
});

Ok(MutexGuard {
lock: self,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span.clone(),
})
Ok(guard)
}
Err(_) => Err(TryLockError(())),
}
Expand Down Expand Up @@ -639,22 +673,21 @@ impl<T: ?Sized> Mutex<T> {
pub fn try_lock_owned(self: Arc<Self>) -> Result<OwnedMutexGuard<T>, TryLockError> {
match self.s.try_acquire(1) {
Ok(_) => {
let guard = OwnedMutexGuard {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span.clone(),
lock: self,
};

#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
guard.resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
locked = true,
);
});

#[cfg(all(tokio_unstable, feature = "tracing"))]
let resource_span = self.resource_span.clone();

Ok(OwnedMutexGuard {
lock: self,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span,
})
Ok(guard)
}
Err(_) => Err(TryLockError(())),
}
Expand Down Expand Up @@ -714,6 +747,17 @@ where
// === impl MutexGuard ===

impl<'a, T: ?Sized> MutexGuard<'a, T> {
fn skip_drop(self) -> MutexGuardInner<'a, T> {
let me = mem::ManuallyDrop::new(self);
// SAFETY: This duplicates the `resource_span` and then forgets the
// original. In the end, we have not duplicated or forgotten any values.
MutexGuardInner {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: unsafe { std::ptr::read(&me.resource_span) },
lock: me.lock,
}
}

/// Makes a new [`MappedMutexGuard`] for a component of the locked data.
///
/// This operation cannot fail as the [`MutexGuard`] passed in already locked the mutex.
Expand Down Expand Up @@ -750,12 +794,11 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> {
F: FnOnce(&mut T) -> &mut U,
{
let data = f(&mut *this) as *mut U;
let s = &this.lock.s;
mem::forget(this);
let inner = this.skip_drop();
MappedMutexGuard {
s,
s: &inner.lock.s,
data,
marker: marker::PhantomData,
marker: PhantomData,
}
}

Expand Down Expand Up @@ -800,12 +843,11 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> {
Some(data) => data as *mut U,
None => return Err(this),
};
let s = &this.lock.s;
mem::forget(this);
let inner = this.skip_drop();
Ok(MappedMutexGuard {
s,
s: &inner.lock.s,
data,
marker: marker::PhantomData,
marker: PhantomData,
})
}

Expand Down Expand Up @@ -946,6 +988,14 @@ impl<T: ?Sized + fmt::Display> fmt::Display for OwnedMutexGuard<T> {
// === impl MappedMutexGuard ===

impl<'a, T: ?Sized> MappedMutexGuard<'a, T> {
fn skip_drop(self) -> MappedMutexGuardInner<'a, T> {
let me = mem::ManuallyDrop::new(self);
MappedMutexGuardInner {
s: me.s,
data: me.data,
}
}

/// Makes a new [`MappedMutexGuard`] for a component of the locked data.
///
/// This operation cannot fail as the [`MappedMutexGuard`] passed in already locked the mutex.
Expand All @@ -960,12 +1010,11 @@ impl<'a, T: ?Sized> MappedMutexGuard<'a, T> {
F: FnOnce(&mut T) -> &mut U,
{
let data = f(&mut *this) as *mut U;
let s = this.s;
mem::forget(this);
let inner = this.skip_drop();
MappedMutexGuard {
s,
s: inner.s,
data,
marker: marker::PhantomData,
marker: PhantomData,
}
}

Expand All @@ -987,12 +1036,11 @@ impl<'a, T: ?Sized> MappedMutexGuard<'a, T> {
Some(data) => data as *mut U,
None => return Err(this),
};
let s = this.s;
mem::forget(this);
let inner = this.skip_drop();
Ok(MappedMutexGuard {
s,
s: inner.s,
data,
marker: marker::PhantomData,
marker: PhantomData,
})
}
}
Expand Down
Loading

0 comments on commit 24aac0a

Please sign in to comment.