From 84a517a874e401cfebd40aa92195e770dfa2734b Mon Sep 17 00:00:00 2001 From: Mark Juggurnauth-Thomas Date: Thu, 30 Jan 2025 15:26:28 -0800 Subject: [PATCH] futures_ext: remove slog dependency Summary: We are migrating from `slog` to `tracing`. The `YieldPeriodically` stream adapter has the ability to log to a `slog::Logger`. This is difficult to migrate to `tracing`, as it allows capturing of code locations which is not possible in tracing, as tracing uses static locations generated by its macros. Instead, we remove the logging dependency and abstract it out to a callback that is called when polling exceeds the budged by a significant amount. It is then up to the caller to log appropriately. Since the logging happens in the caller, the log context, file name and line number are all correct. Reviewed By: andreacampi Differential Revision: D68837434 fbshipit-source-id: 6348754ec8cb2a45bec24b3022e4d715c4f6b561 --- shed/futures_ext/Cargo.toml | 2 - shed/futures_ext/src/stream.rs | 12 +- .../src/stream/yield_periodically.rs | 115 +++++++++--------- 3 files changed, 57 insertions(+), 72 deletions(-) diff --git a/shed/futures_ext/Cargo.toml b/shed/futures_ext/Cargo.toml index 6a8af9c33..265784284 100644 --- a/shed/futures_ext/Cargo.toml +++ b/shed/futures_ext/Cargo.toml @@ -13,10 +13,8 @@ license = "MIT OR Apache-2.0" [dependencies] anyhow = "1.0.95" futures = { version = "0.3.30", features = ["async-await", "compat"] } -maybe-owned = "0.3.4" pin-project = "0.4.30" shared_error = { version = "0.1.0", path = "../shared_error" } -slog = { version = "2.7", features = ["max_level_trace", "nested-values"] } thiserror = "2" tokio = { version = "1.41.0", features = ["full", "test-util", "tracing"] } diff --git a/shed/futures_ext/src/stream.rs b/shed/futures_ext/src/stream.rs index 75212d0ea..663200c0c 100644 --- a/shed/futures_ext/src/stream.rs +++ b/shed/futures_ext/src/stream.rs @@ -72,17 +72,7 @@ pub trait FbStreamExt: Stream { where Self: Sized, { - let location = std::panic::Location::caller(); - - let location = slog::RecordLocation { - file: location.file(), - line: location.line(), - column: location.column(), - function: "", - module: "", - }; - - YieldPeriodically::new(self, location, Duration::from_millis(10)) + YieldPeriodically::new(self, Duration::from_millis(10)) } } diff --git a/shed/futures_ext/src/stream/yield_periodically.rs b/shed/futures_ext/src/stream/yield_periodically.rs index 41311f161..b77cad008 100644 --- a/shed/futures_ext/src/stream/yield_periodically.rs +++ b/shed/futures_ext/src/stream/yield_periodically.rs @@ -7,7 +7,6 @@ * of this source tree. */ -use std::fmt::Arguments; use std::pin::Pin; use std::time::Duration; use std::time::Instant; @@ -15,12 +14,10 @@ use std::time::Instant; use futures::stream::Stream; use futures::task::Context; use futures::task::Poll; -use maybe_owned::MaybeOwned; use pin_project::pin_project; -use slog::Logger; -use slog::Record; -/// If the budget is exceeded, we will log a warning if the total overshoot is more than this multiplier. +/// Multiplier for the number of times the budget we overshoot before calling +/// the on_large_overshoot callback. const BUDGET_OVERSHOOT_MULTIPLIER: u32 = 3; /// A stream that will yield control back to the caller if it runs for more than a given duration @@ -36,23 +33,20 @@ pub struct YieldPeriodically<'a, S> { current_budget: Duration, /// Whether the next iteration must yield because the budget was exceeded. must_yield: bool, - /// The code location where yield_periodically was called. - location: slog::RecordLocation, - /// Enable logging to the provided logger when the budget is exceeded by - /// BUDGET_OVERSHOOT_MULTIPLIER times or more. - logger: Option>, + /// Callback for when we overshoot the budget by more than + /// BUDGET_OVERSHOOT_MULTIPLIER times. + on_large_overshoot: Option>, } impl YieldPeriodically<'_, S> { /// Create a new [YieldPeriodically]. - pub fn new(inner: S, location: slog::RecordLocation, budget: Duration) -> Self { + pub fn new(inner: S, budget: Duration) -> Self { Self { inner, budget, current_budget: budget, must_yield: false, - location, - logger: None, + on_large_overshoot: None, } } @@ -63,13 +57,16 @@ impl YieldPeriodically<'_, S> { self } - /// Enable debug logging. - pub fn with_logger<'a, L>(self, logger: L) -> YieldPeriodically<'a, S> - where - L: Into>, - { + /// If we are unable to yield in time because a single poll exceeds the + /// budget by more than BUDGET_OVERSHOOT_MULTIPLIER times, call this + /// callback. The caller can use this to log the location where long + /// polls are still happening. + pub fn on_large_overshoot<'a>( + self, + on_large_overshoot: impl Fn(Duration, Duration) + Send + Sync + 'a, + ) -> YieldPeriodically<'a, S> { YieldPeriodically { - logger: Some(logger.into()), + on_large_overshoot: Some(Box::new(on_large_overshoot)), ..self } } @@ -101,15 +98,10 @@ impl Stream for YieldPeriodically<'_, S> { match this.current_budget.checked_sub(elapsed) { Some(new_budget) => *this.current_budget = new_budget, None => { - if (elapsed - current_budget) > *this.budget * BUDGET_OVERSHOOT_MULTIPLIER { - maybe_log( - this.logger, - this.location, - &format_args!( - "yield_periodically(): budget overshot: current_budget={:?}, elapsed={:?}", - current_budget, elapsed, - ), - ); + if let Some(on_large_overshoot) = &this.on_large_overshoot { + if (elapsed - current_budget) > *this.budget * BUDGET_OVERSHOOT_MULTIPLIER { + (on_large_overshoot)(current_budget, elapsed); + } } *this.must_yield = true; *this.current_budget = *this.budget; @@ -120,26 +112,11 @@ impl Stream for YieldPeriodically<'_, S> { } } -fn maybe_log( - logger: &Option>, - location: &slog::RecordLocation, - fmt: &Arguments<'_>, -) { - if let Some(logger) = &logger { - logger.log(&Record::new( - &slog::RecordStatic { - location, - level: slog::Level::Warning, - tag: "futures_watchdog", - }, - fmt, - slog::b!(), - )); - } -} - #[cfg(test)] mod test { + use std::sync::Arc; + use std::sync::Mutex; + use futures::stream::StreamExt; use super::*; @@ -151,8 +128,7 @@ mod test { std::thread::sleep(Duration::from_millis(1)); }); - let stream = - YieldPeriodically::new(stream, location_for_test(), Duration::from_millis(100)); + let stream = YieldPeriodically::new(stream, Duration::from_millis(100)); futures::pin_mut!(stream); @@ -193,19 +169,40 @@ mod test { }) .take(30); - let stream = YieldPeriodically::new(stream, location_for_test(), Duration::from_millis(10)); + let stream = YieldPeriodically::new(stream, Duration::from_millis(10)); stream.collect::>().await; } - #[track_caller] - fn location_for_test() -> slog::RecordLocation { - let location = std::panic::Location::caller(); - slog::RecordLocation { - file: location.file(), - line: location.line(), - column: location.column(), - function: "", - module: "", - } + #[tokio::test] + async fn test_on_large_overshoot() { + let stream = futures::stream::repeat(()).inspect(|_| { + // Simulate CPU work that takes longer than the budget + std::thread::sleep(Duration::from_millis(250)); + }); + + let large_overshoots = Arc::new(Mutex::new(Vec::new())); + + let stream = + YieldPeriodically::new(stream, Duration::from_millis(10)).on_large_overshoot({ + let large_overshoots = large_overshoots.clone(); + Box::new(move |budget, elapsed| { + large_overshoots.lock().unwrap().push((budget, elapsed)); + }) + }); + + futures::pin_mut!(stream); + + let now = Instant::now(); + + let waker = futures::task::noop_waker(); + let mut cx = futures::task::Context::from_waker(&waker); + + assert!(stream.as_mut().poll_next(&mut cx).is_ready()); + assert!(now.elapsed() > Duration::from_millis(200)); + + let large_overshoots = large_overshoots.lock().unwrap(); + assert_eq!(large_overshoots.len(), 1); + assert_eq!(large_overshoots[0].0, Duration::from_millis(10)); + assert!(large_overshoots[0].1 > Duration::from_millis(200)); } }