Skip to content

Commit

Permalink
core: add intrusive linked list for callsite registry (#988)
Browse files Browse the repository at this point in the history
This change adds an intrusive `LinkedList` for the
callsite registry. The linked list is lock-free and
can be written to from multiple threads. This list
also does not require any allocations due to its
intrusive nature. This though does require a
breaking change to the `Callsite` trait to allow
callsites to store the pointer to the next item in
the intrusive list.

Properoties of the intrusive atomically linked-list:

- Only supports `LinkedList::push` and `LinkedList::for_each`.
- The items in the list can only be added and not removed.

Closes #860
  • Loading branch information
LucioFranco authored Oct 2, 2020
1 parent 9c6dd2d commit 0fcc9fd
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 77 deletions.
301 changes: 239 additions & 62 deletions tracing-core/src/callsite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
use crate::stdlib::{
fmt,
hash::{Hash, Hasher},
sync::Mutex,
ptr,
sync::{
atomic::{AtomicPtr, Ordering},
Mutex, MutexGuard,
},
vec::Vec,
};
use crate::{
Expand All @@ -13,61 +17,18 @@ use crate::{
};

lazy_static! {
static ref REGISTRY: Mutex<Registry> = Mutex::new(Registry {
callsites: Vec::new(),
dispatchers: Vec::new(),
});
}

struct Registry {
callsites: Vec<&'static dyn Callsite>,
dispatchers: Vec<dispatcher::Registrar>,
static ref REGISTRY: Registry = Registry {
callsites: LinkedList::new(),
dispatchers: Mutex::new(Vec::new()),
};
}

impl Registry {
fn rebuild_callsite_interest(&self, callsite: &'static dyn Callsite) {
let meta = callsite.metadata();

// Iterate over the subscribers in the registry, and — if they are
// active — register the callsite with them.
let mut interests = self
.dispatchers
.iter()
.filter_map(|registrar| registrar.try_register(meta));

// Use the first subscriber's `Interest` as the base value.
let interest = if let Some(interest) = interests.next() {
// Combine all remaining `Interest`s.
interests.fold(interest, Interest::and)
} else {
// If nobody was interested in this thing, just return `never`.
Interest::never()
};

callsite.set_interest(interest)
}

fn rebuild_interest(&mut self) {
let mut max_level = LevelFilter::OFF;
self.dispatchers.retain(|registrar| {
if let Some(dispatch) = registrar.upgrade() {
// If the subscriber did not provide a max level hint, assume
// that it may enable every level.
let level_hint = dispatch.max_level_hint().unwrap_or(LevelFilter::TRACE);
if level_hint > max_level {
max_level = level_hint;
}
true
} else {
false
}
});
type Dispatchers = Vec<dispatcher::Registrar>;
type Callsites = LinkedList;

self.callsites.iter().for_each(|&callsite| {
self.rebuild_callsite_interest(callsite);
});
LevelFilter::set_max(max_level);
}
struct Registry {
callsites: Callsites,
dispatchers: Mutex<Dispatchers>,
}

/// Trait implemented by callsites.
Expand Down Expand Up @@ -105,6 +66,18 @@ pub struct Identifier(
pub &'static dyn Callsite,
);

/// A registration with the callsite registry.
///
/// Every [`Callsite`] implementation must provide a `&'static Registration`
/// when calling [`register`] to add itself to the global callsite registry.
///
/// [`Callsite`]: crate::callsite::Callsite
/// [`register`]: crate::callsite::register
pub struct Registration<T = &'static dyn Callsite> {
callsite: T,
next: AtomicPtr<Registration<T>>,
}

/// Clear and reregister interest on every [`Callsite`]
///
/// This function is intended for runtime reconfiguration of filters on traces
Expand All @@ -125,24 +98,76 @@ pub struct Identifier(
/// [`Interest::sometimes()`]: ../subscriber/struct.Interest.html#method.sometimes
/// [`Subscriber`]: ../subscriber/trait.Subscriber.html
pub fn rebuild_interest_cache() {
let mut registry = REGISTRY.lock().unwrap();
registry.rebuild_interest();
let mut dispatchers = REGISTRY.dispatchers.lock().unwrap();
let callsites = &REGISTRY.callsites;
rebuild_interest(callsites, &mut dispatchers);
}

/// Register a new `Callsite` with the global registry.
///
/// This should be called once per callsite after the callsite has been
/// constructed.
pub fn register(callsite: &'static dyn Callsite) {
let mut registry = REGISTRY.lock().unwrap();
registry.rebuild_callsite_interest(callsite);
registry.callsites.push(callsite);
pub fn register(registration: &'static Registration) {
let mut dispatchers = REGISTRY.dispatchers.lock().unwrap();
rebuild_callsite_interest(&mut dispatchers, registration.callsite);
REGISTRY.callsites.push(registration);
}

pub(crate) fn register_dispatch(dispatch: &Dispatch) {
let mut registry = REGISTRY.lock().unwrap();
registry.dispatchers.push(dispatch.registrar());
registry.rebuild_interest();
let mut dispatchers = REGISTRY.dispatchers.lock().unwrap();
let callsites = &REGISTRY.callsites;

dispatchers.push(dispatch.registrar());

rebuild_interest(callsites, &mut dispatchers);
}

fn rebuild_callsite_interest(
dispatchers: &mut MutexGuard<'_, Vec<dispatcher::Registrar>>,
callsite: &'static dyn Callsite,
) {
let meta = callsite.metadata();

// Iterate over the subscribers in the registry, and — if they are
// active — register the callsite with them.
let mut interests = dispatchers
.iter()
.filter_map(|registrar| registrar.try_register(meta));

// Use the first subscriber's `Interest` as the base value.
let interest = if let Some(interest) = interests.next() {
// Combine all remaining `Interest`s.
interests.fold(interest, Interest::and)
} else {
// If nobody was interested in this thing, just return `never`.
Interest::never()
};

callsite.set_interest(interest)
}

fn rebuild_interest(
callsites: &Callsites,
dispatchers: &mut MutexGuard<'_, Vec<dispatcher::Registrar>>,
) {
let mut max_level = LevelFilter::OFF;
dispatchers.retain(|registrar| {
if let Some(dispatch) = registrar.upgrade() {
// If the subscriber did not provide a max level hint, assume
// that it may enable every level.
let level_hint = dispatch.max_level_hint().unwrap_or(LevelFilter::TRACE);
if level_hint > max_level {
max_level = level_hint;
}
true
} else {
false
}
});

callsites.for_each(|reg| rebuild_callsite_interest(dispatchers, reg.callsite));

LevelFilter::set_max(max_level);
}

// ===== impl Identifier =====
Expand All @@ -169,3 +194,155 @@ impl Hash for Identifier {
(self.0 as *const dyn Callsite).hash(state)
}
}

// ===== impl Registration =====

impl<T> Registration<T> {
/// Construct a new `Registration` from some `&'static dyn Callsite`
pub const fn new(callsite: T) -> Self {
Self {
callsite,
next: AtomicPtr::new(ptr::null_mut()),
}
}
}

impl fmt::Debug for Registration {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Registration")
.field("callsite", &format_args!("{:p}", self.callsite))
.field(
"next",
&format_args!("{:p}", self.next.load(Ordering::Acquire)),
)
.finish()
}
}

// ===== impl LinkedList =====

/// An intrusive atomic push-only linked list.
struct LinkedList {
head: AtomicPtr<Registration>,
}

impl LinkedList {
fn new() -> Self {
LinkedList {
head: AtomicPtr::new(ptr::null_mut()),
}
}

fn for_each(&self, mut f: impl FnMut(&'static Registration)) {
let mut head = self.head.load(Ordering::Acquire);

while let Some(reg) = unsafe { head.as_ref() } {
f(reg);

head = reg.next.load(Ordering::Acquire);
}
}

fn push(&self, registration: &'static Registration) {
let mut head = self.head.load(Ordering::Acquire);

loop {
registration.next.store(head, Ordering::Release);

assert_ne!(
registration as *const _, head,
"Attempted to register a `Callsite` that already exists! \
This will cause an infinite loop when attempting to read from the \
callsite cache. This is likely a bug! You should only need to call \
`tracing-core::callsite::register` once per `Callsite`."
);

match self.head.compare_exchange(
head,
registration as *const _ as *mut _,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
break;
}
Err(current) => head = current,
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[derive(Eq, PartialEq)]
struct Cs1;
static CS1: Cs1 = Cs1;
static REG1: Registration = Registration::new(&CS1);

impl Callsite for Cs1 {
fn set_interest(&self, _interest: Interest) {}
fn metadata(&self) -> &Metadata<'_> {
unimplemented!("not needed for this test")
}
}

struct Cs2;
static CS2: Cs2 = Cs2;
static REG2: Registration = Registration::new(&CS2);

impl Callsite for Cs2 {
fn set_interest(&self, _interest: Interest) {}
fn metadata(&self) -> &Metadata<'_> {
unimplemented!("not needed for this test")
}
}

#[test]
fn linked_list_push() {
let linked_list = LinkedList::new();

linked_list.push(&REG1);
linked_list.push(&REG2);

let mut i = 0;

linked_list.for_each(|reg| {
if i == 0 {
assert!(
ptr::eq(reg, &REG2),
"Registration pointers need to match REG2"
);
} else {
assert!(
ptr::eq(reg, &REG1),
"Registration pointers need to match REG1"
);
}

i += 1;
});
}

#[test]
#[should_panic]
fn linked_list_repeated() {
let linked_list = LinkedList::new();

linked_list.push(&REG1);
// Pass in same reg and we should panic...
linked_list.push(&REG1);

linked_list.for_each(|_| {});
}

#[test]
fn linked_list_empty() {
let linked_list = LinkedList::new();

linked_list.for_each(|_| {
panic!("List should be empty");
});
}
}
1 change: 1 addition & 0 deletions tracing-core/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,7 @@ impl Drop for DefaultGuard {

#[cfg(test)]
mod test {

use super::*;
#[cfg(feature = "std")]
use crate::stdlib::sync::atomic::{AtomicUsize, Ordering};
Expand Down
Loading

0 comments on commit 0fcc9fd

Please sign in to comment.