Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subscriber: always ignore lock poisoning #1063

Merged
merged 4 commits into from
Oct 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tracing-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ default = ["env-filter", "smallvec", "fmt", "ansi", "chrono", "tracing-log", "js
env-filter = ["matchers", "regex", "lazy_static", "tracing"]
fmt = ["registry"]
ansi = ["fmt", "ansi_term"]
registry = ["sharded-slab", "thread_local"]
registry = ["sharded-slab"]
json = ["tracing-serde", "serde", "serde_json"]

[dependencies]
tracing-core = { path = "../tracing-core", version = "0.2" }
thread_local = { version = "1.0.1" }

# only required by the filter feature
tracing = { optional = true, path = "../tracing", version = "0.2" }
Expand All @@ -55,7 +56,6 @@ parking_lot = { version = ">= 0.7, <= 0.11", optional = true }

# registry
sharded-slab = { version = "0.1.0", optional = true }
thread_local = { version = "1.0.1", optional = true }

[dev-dependencies]
tracing = { path = "../tracing", version = "0.2" }
Expand Down
26 changes: 8 additions & 18 deletions tracing-subscriber/src/filter/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,7 @@ impl EnvFilter {
}

fn cares_about_span(&self, span: &span::Id) -> bool {
let spans = try_lock!(self.by_id.read(), else return false);
spans.contains_key(span)
self.by_id.read().contains_key(span)
}

fn base_interest(&self) -> Interest {
Expand All @@ -373,8 +372,7 @@ impl<S: Collect> Subscribe<S> for EnvFilter {
// dynamic filter that should be constructed for it. If so, it
// should always be enabled, since it influences filtering.
if let Some(matcher) = self.dynamics.matcher(metadata) {
let mut by_cs = try_lock!(self.by_cs.write(), else return self.base_interest());
by_cs.insert(metadata.callsite(), matcher);
self.by_cs.write().insert(metadata.callsite(), matcher);
return Interest::always();
}
}
Expand Down Expand Up @@ -409,12 +407,7 @@ impl<S: Collect> Subscribe<S> for EnvFilter {
if self.has_dynamics && self.dynamics.max_level >= *level {
if metadata.is_span() {
// If the metadata is a span, see if we care about its callsite.
let enabled_by_cs = self
.by_cs
.read()
.ok()
.map(|by_cs| by_cs.contains_key(&metadata.callsite()))
.unwrap_or(false);
let enabled_by_cs = self.by_cs.read().contains_key(&metadata.callsite());
if enabled_by_cs {
return true;
}
Expand Down Expand Up @@ -444,15 +437,14 @@ impl<S: Collect> Subscribe<S> for EnvFilter {
}

fn new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, _: Context<'_, S>) {
let by_cs = try_lock!(self.by_cs.read());
if let Some(cs) = by_cs.get(&attrs.metadata().callsite()) {
if let Some(cs) = self.by_cs.read().get(&attrs.metadata().callsite()) {
let span = cs.to_span_match(attrs);
try_lock!(self.by_id.write()).insert(id.clone(), span);
self.by_id.write().insert(id.clone(), span);
}
}

fn on_record(&self, id: &span::Id, values: &span::Record<'_>, _: Context<'_, S>) {
if let Some(span) = try_lock!(self.by_id.read()).get(id) {
if let Some(span) = self.by_id.read().get(id) {
span.record_update(values);
}
}
Expand All @@ -461,7 +453,7 @@ impl<S: Collect> Subscribe<S> for EnvFilter {
// XXX: This is where _we_ could push IDs to the stack instead, and use
// that to allow changing the filter while a span is already entered.
// But that might be much less efficient...
if let Some(span) = try_lock!(self.by_id.read()).get(id) {
if let Some(span) = self.by_id.read().get(id) {
SCOPE.with(|scope| scope.borrow_mut().push(span.level()));
}
}
Expand All @@ -477,9 +469,7 @@ impl<S: Collect> Subscribe<S> for EnvFilter {
if !self.cares_about_span(&id) {
return;
}

let mut spans = try_lock!(self.by_id.write());
spans.remove(&id);
self.by_id.write().remove(&id);
}
}

Expand Down
15 changes: 7 additions & 8 deletions tracing-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ pub mod registry;
pub mod reload;
pub mod subscribe;
pub(crate) mod sync;
pub(crate) mod thread;
pub mod util;

#[cfg(feature = "env-filter")]
Expand All @@ -134,18 +133,20 @@ cfg_feature!("registry", {
}
});

use std::cell::RefCell;
use std::default::Default;

/// Tracks the currently executing span on a per-thread basis.
#[derive(Debug)]
pub struct CurrentSpan {
current: thread::Local<Vec<Id>>,
current: thread_local::ThreadLocal<RefCell<Vec<Id>>>,
}

impl CurrentSpan {
/// Returns a new `CurrentSpan`.
pub fn new() -> Self {
Self {
current: thread::Local::new(),
current: thread_local::ThreadLocal::new(),
}
}

Expand All @@ -155,19 +156,17 @@ impl CurrentSpan {
///
/// [`Id`]: https://docs.rs/tracing/latest/tracing/span/struct.Id.html
pub fn id(&self) -> Option<Id> {
self.current.with(|current| current.last().cloned())?
self.current.get()?.borrow().last().cloned()
}

/// Records that the current thread has entered the span with the provided ID.
pub fn enter(&self, span: Id) {
self.current.with(|current| current.push(span));
self.current.get_or_default().borrow_mut().push(span);
}

/// Records that the current thread has exited a span.
pub fn exit(&self) {
self.current.with(|current| {
let _ = current.pop();
});
self.current.get_or_default().borrow_mut().pop();
}
}

Expand Down
15 changes: 0 additions & 15 deletions tracing-subscriber/src/macros.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,3 @@
macro_rules! try_lock {
($lock:expr) => {
try_lock!($lock, else return)
};
($lock:expr, else $els:expr) => {
if let Ok(l) = $lock {
l
} else if std::thread::panicking() {
$els
} else {
panic!("lock poisoned")
}
};
}

macro_rules! cfg_feature {
($name:literal, { $($item:item)* }) => {
$(
Expand Down
13 changes: 3 additions & 10 deletions tracing-subscriber/src/registry/sharded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,11 @@ impl<'a> SpanData<'a> for Data<'a> {
}

fn extensions(&self) -> Extensions<'_> {
Extensions::new(self.inner.extensions.read().expect("Mutex poisoned"))
Extensions::new(self.inner.extensions.read())
}

fn extensions_mut(&self) -> ExtensionsMut<'_> {
ExtensionsMut::new(self.inner.extensions.write().expect("Mutex poisoned"))
ExtensionsMut::new(self.inner.extensions.write())
}
}

Expand Down Expand Up @@ -428,14 +428,7 @@ impl Clear for DataInner {
}

// Clear (but do not deallocate!) the pooled `HashMap` for the span's extensions.
self.extensions
.get_mut()
.unwrap_or_else(|l| {
// This function can be called in a `Drop` impl, such as while
// panicking, so ignore lock poisoning.
l.into_inner()
})
.clear();
self.extensions.get_mut().clear();
}
}

Expand Down
69 changes: 29 additions & 40 deletions tracing-subscriber/src/reload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,8 @@ pub struct Handle<S> {
}

/// Indicates that an error occurred when reloading a subscriber.
#[derive(Debug)]
pub struct Error {
kind: ErrorKind,
}

#[derive(Debug)]
enum ErrorKind {
CollectorGone,
Poisoned,
_p: (),
}

// ===== impl Collect =====
Expand All @@ -60,12 +53,12 @@ where
{
#[inline]
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
try_lock!(self.inner.read(), else return Interest::sometimes()).register_callsite(metadata)
self.inner.read().register_callsite(metadata)
}

#[inline]
fn enabled(&self, metadata: &Metadata<'_>, ctx: subscribe::Context<'_, C>) -> bool {
try_lock!(self.inner.read(), else return false).enabled(metadata, ctx)
self.inner.read().enabled(metadata, ctx)
}

#[inline]
Expand All @@ -75,7 +68,7 @@ where
id: &span::Id,
ctx: subscribe::Context<'_, C>,
) {
try_lock!(self.inner.read()).new_span(attrs, id, ctx)
self.inner.read().new_span(attrs, id, ctx)
}

#[inline]
Expand All @@ -85,37 +78,37 @@ where
values: &span::Record<'_>,
ctx: subscribe::Context<'_, C>,
) {
try_lock!(self.inner.read()).on_record(span, values, ctx)
self.inner.read().on_record(span, values, ctx)
}

#[inline]
fn on_follows_from(&self, span: &span::Id, follows: &span::Id, ctx: subscribe::Context<'_, C>) {
try_lock!(self.inner.read()).on_follows_from(span, follows, ctx)
self.inner.read().on_follows_from(span, follows, ctx)
}

#[inline]
fn on_event(&self, event: &Event<'_>, ctx: subscribe::Context<'_, C>) {
try_lock!(self.inner.read()).on_event(event, ctx)
self.inner.read().on_event(event, ctx)
}

#[inline]
fn on_enter(&self, id: &span::Id, ctx: subscribe::Context<'_, C>) {
try_lock!(self.inner.read()).on_enter(id, ctx)
self.inner.read().on_enter(id, ctx)
}

#[inline]
fn on_exit(&self, id: &span::Id, ctx: subscribe::Context<'_, C>) {
try_lock!(self.inner.read()).on_exit(id, ctx)
self.inner.read().on_exit(id, ctx)
}

#[inline]
fn on_close(&self, id: span::Id, ctx: subscribe::Context<'_, C>) {
try_lock!(self.inner.read()).on_close(id, ctx)
self.inner.read().on_close(id, ctx)
}

#[inline]
fn on_id_change(&self, old: &span::Id, new: &span::Id, ctx: subscribe::Context<'_, C>) {
try_lock!(self.inner.read()).on_id_change(old, new, ctx)
self.inner.read().on_id_change(old, new, ctx)
}
}

Expand Down Expand Up @@ -151,15 +144,11 @@ impl<S> Handle<S> {
/// Invokes a closure with a mutable reference to the current subscriber,
/// allowing it to be modified in place.
pub fn modify(&self, f: impl FnOnce(&mut S)) -> Result<(), Error> {
let inner = self.inner.upgrade().ok_or(Error {
kind: ErrorKind::CollectorGone,
})?;
let inner = self.inner.upgrade().ok_or_else(Error::new)?;

let mut lock = try_lock!(inner.write(), else return Err(Error::poisoned()));
f(&mut *lock);
// Release the lock before rebuilding the interest cache, as that
// function will lock the new subscriber.
drop(lock);
f(&mut *inner.write());

callsite::rebuild_interest_cache();
Ok(())
Expand All @@ -177,11 +166,9 @@ impl<S> Handle<S> {
/// Invokes a closure with a borrowed reference to the current subscriber,
/// returning the result (or an error if the collector no longer exists).
pub fn with_current<T>(&self, f: impl FnOnce(&S) -> T) -> Result<T, Error> {
let inner = self.inner.upgrade().ok_or(Error {
kind: ErrorKind::CollectorGone,
})?;
let inner = try_lock!(inner.read(), else return Err(Error::poisoned()));
Ok(f(&*inner))
let inner = self.inner.upgrade().ok_or_else(Error::new)?;
let lock = inner.read();
Ok(f(&*lock))
}
}

Expand All @@ -196,32 +183,34 @@ impl<S> Clone for Handle<S> {
// ===== impl Error =====

impl Error {
fn poisoned() -> Self {
Self {
kind: ErrorKind::Poisoned,
}
fn new() -> Self {
Self { _p: () }
}

/// Returns `true` if this error occurred because the subscriber was poisoned by
/// a panic on another thread.
pub fn is_poisoned(&self) -> bool {
matches!(self.kind, ErrorKind::Poisoned)
false
}

/// Returns `true` if this error occurred because the `Collector`
/// containing the reloadable subscriber was dropped.
pub fn is_dropped(&self) -> bool {
matches!(self.kind, ErrorKind::CollectorGone)
true
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let msg = match self.kind {
ErrorKind::CollectorGone => "subscriber no longer exists",
ErrorKind::Poisoned => "lock poisoned",
};
f.pad(msg)
f.pad("subscriber no longer exists")
}
}

impl fmt::Debug for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Error")
.field("kind", &format_args!("CollectorGone"))
.finish()
}
}

Expand Down
Loading