Skip to content

Commit

Permalink
futures_ext: remove slog dependency
Browse files Browse the repository at this point in the history
Summary:
We are migrating from `slog` to `tracing`.

The `YieldPeriodically` stream adapter has the ability to log to a `slog::Logger`.  This is difficult to migrate to `tracing`, as it allows capturing of code locations which is not possible in tracing, as tracing uses static locations generated by its macros.

Instead, we remove the logging dependency and abstract it out to a callback that is called when polling exceeds the budged by a significant amount.  It is then up to the caller to log appropriately.  Since the logging happens in the caller, the log context, file name and line number are all correct.

Reviewed By: andreacampi

Differential Revision: D68837434

fbshipit-source-id: 6348754ec8cb2a45bec24b3022e4d715c4f6b561
  • Loading branch information
markbt authored and facebook-github-bot committed Jan 30, 2025
1 parent dc9689e commit 84a517a
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 72 deletions.
2 changes: 0 additions & 2 deletions shed/futures_ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ license = "MIT OR Apache-2.0"
[dependencies]
anyhow = "1.0.95"
futures = { version = "0.3.30", features = ["async-await", "compat"] }
maybe-owned = "0.3.4"
pin-project = "0.4.30"
shared_error = { version = "0.1.0", path = "../shared_error" }
slog = { version = "2.7", features = ["max_level_trace", "nested-values"] }
thiserror = "2"
tokio = { version = "1.41.0", features = ["full", "test-util", "tracing"] }

Expand Down
12 changes: 1 addition & 11 deletions shed/futures_ext/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,7 @@ pub trait FbStreamExt: Stream {
where
Self: Sized,
{
let location = std::panic::Location::caller();

let location = slog::RecordLocation {
file: location.file(),
line: location.line(),
column: location.column(),
function: "",
module: "",
};

YieldPeriodically::new(self, location, Duration::from_millis(10))
YieldPeriodically::new(self, Duration::from_millis(10))
}
}

Expand Down
115 changes: 56 additions & 59 deletions shed/futures_ext/src/stream/yield_periodically.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,17 @@
* of this source tree.
*/

use std::fmt::Arguments;
use std::pin::Pin;
use std::time::Duration;
use std::time::Instant;

use futures::stream::Stream;
use futures::task::Context;
use futures::task::Poll;
use maybe_owned::MaybeOwned;
use pin_project::pin_project;
use slog::Logger;
use slog::Record;

/// If the budget is exceeded, we will log a warning if the total overshoot is more than this multiplier.
/// Multiplier for the number of times the budget we overshoot before calling
/// the on_large_overshoot callback.
const BUDGET_OVERSHOOT_MULTIPLIER: u32 = 3;

/// A stream that will yield control back to the caller if it runs for more than a given duration
Expand All @@ -36,23 +33,20 @@ pub struct YieldPeriodically<'a, S> {
current_budget: Duration,
/// Whether the next iteration must yield because the budget was exceeded.
must_yield: bool,
/// The code location where yield_periodically was called.
location: slog::RecordLocation,
/// Enable logging to the provided logger when the budget is exceeded by
/// BUDGET_OVERSHOOT_MULTIPLIER times or more.
logger: Option<MaybeOwned<'a, Logger>>,
/// Callback for when we overshoot the budget by more than
/// BUDGET_OVERSHOOT_MULTIPLIER times.
on_large_overshoot: Option<Box<dyn Fn(Duration, Duration) + Send + Sync + 'a>>,
}

impl<S> YieldPeriodically<'_, S> {
/// Create a new [YieldPeriodically].
pub fn new(inner: S, location: slog::RecordLocation, budget: Duration) -> Self {
pub fn new(inner: S, budget: Duration) -> Self {
Self {
inner,
budget,
current_budget: budget,
must_yield: false,
location,
logger: None,
on_large_overshoot: None,
}
}

Expand All @@ -63,13 +57,16 @@ impl<S> YieldPeriodically<'_, S> {
self
}

/// Enable debug logging.
pub fn with_logger<'a, L>(self, logger: L) -> YieldPeriodically<'a, S>
where
L: Into<MaybeOwned<'a, Logger>>,
{
/// If we are unable to yield in time because a single poll exceeds the
/// budget by more than BUDGET_OVERSHOOT_MULTIPLIER times, call this
/// callback. The caller can use this to log the location where long
/// polls are still happening.
pub fn on_large_overshoot<'a>(
self,
on_large_overshoot: impl Fn(Duration, Duration) + Send + Sync + 'a,
) -> YieldPeriodically<'a, S> {
YieldPeriodically {
logger: Some(logger.into()),
on_large_overshoot: Some(Box::new(on_large_overshoot)),
..self
}
}
Expand Down Expand Up @@ -101,15 +98,10 @@ impl<S: Stream> Stream for YieldPeriodically<'_, S> {
match this.current_budget.checked_sub(elapsed) {
Some(new_budget) => *this.current_budget = new_budget,
None => {
if (elapsed - current_budget) > *this.budget * BUDGET_OVERSHOOT_MULTIPLIER {
maybe_log(
this.logger,
this.location,
&format_args!(
"yield_periodically(): budget overshot: current_budget={:?}, elapsed={:?}",
current_budget, elapsed,
),
);
if let Some(on_large_overshoot) = &this.on_large_overshoot {
if (elapsed - current_budget) > *this.budget * BUDGET_OVERSHOOT_MULTIPLIER {
(on_large_overshoot)(current_budget, elapsed);
}
}
*this.must_yield = true;
*this.current_budget = *this.budget;
Expand All @@ -120,26 +112,11 @@ impl<S: Stream> Stream for YieldPeriodically<'_, S> {
}
}

fn maybe_log(
logger: &Option<MaybeOwned<'_, Logger>>,
location: &slog::RecordLocation,
fmt: &Arguments<'_>,
) {
if let Some(logger) = &logger {
logger.log(&Record::new(
&slog::RecordStatic {
location,
level: slog::Level::Warning,
tag: "futures_watchdog",
},
fmt,
slog::b!(),
));
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;
use std::sync::Mutex;

use futures::stream::StreamExt;

use super::*;
Expand All @@ -151,8 +128,7 @@ mod test {
std::thread::sleep(Duration::from_millis(1));
});

let stream =
YieldPeriodically::new(stream, location_for_test(), Duration::from_millis(100));
let stream = YieldPeriodically::new(stream, Duration::from_millis(100));

futures::pin_mut!(stream);

Expand Down Expand Up @@ -193,19 +169,40 @@ mod test {
})
.take(30);

let stream = YieldPeriodically::new(stream, location_for_test(), Duration::from_millis(10));
let stream = YieldPeriodically::new(stream, Duration::from_millis(10));
stream.collect::<Vec<_>>().await;
}

#[track_caller]
fn location_for_test() -> slog::RecordLocation {
let location = std::panic::Location::caller();
slog::RecordLocation {
file: location.file(),
line: location.line(),
column: location.column(),
function: "",
module: "",
}
#[tokio::test]
async fn test_on_large_overshoot() {
let stream = futures::stream::repeat(()).inspect(|_| {
// Simulate CPU work that takes longer than the budget
std::thread::sleep(Duration::from_millis(250));
});

let large_overshoots = Arc::new(Mutex::new(Vec::new()));

let stream =
YieldPeriodically::new(stream, Duration::from_millis(10)).on_large_overshoot({
let large_overshoots = large_overshoots.clone();
Box::new(move |budget, elapsed| {
large_overshoots.lock().unwrap().push((budget, elapsed));
})
});

futures::pin_mut!(stream);

let now = Instant::now();

let waker = futures::task::noop_waker();
let mut cx = futures::task::Context::from_waker(&waker);

assert!(stream.as_mut().poll_next(&mut cx).is_ready());
assert!(now.elapsed() > Duration::from_millis(200));

let large_overshoots = large_overshoots.lock().unwrap();
assert_eq!(large_overshoots.len(), 1);
assert_eq!(large_overshoots[0].0, Duration::from_millis(10));
assert!(large_overshoots[0].1 > Duration::from_millis(200));
}
}

0 comments on commit 84a517a

Please sign in to comment.