From 11dff31640167bddef9b9518c6c9b498427138f5 Mon Sep 17 00:00:00 2001 From: Theron Tock <83027591+theron-eg@users.noreply.github.com> Date: Fri, 16 Feb 2024 11:40:59 -0800 Subject: [PATCH 1/6] Add worker id to on_thread_park callback It's possible to find out from WorkerMetrics which workers are not actively polling for tasks. However it's not possible to tell if non-polling workers are merely idle (parked) or if they are stuck. By adding the worker id (same usize as used in WorkerMetrics calls) to the on_thread_park() and on_thread_unpark() callbacks it is possible to track which specific workers are parked. Then any worker that is not polling tasks and is not parked is a worker that is stuck. With this information it's possible to create a watchdog that alerts or kills the process if a worker is stuck for too long. --- tokio/src/runtime/builder.rs | 16 ++-- tokio/src/runtime/config.rs | 6 +- tokio/src/runtime/mod.rs | 3 + .../runtime/scheduler/current_thread/mod.rs | 4 +- .../runtime/scheduler/multi_thread/worker.rs | 14 +-- .../scheduler/multi_thread_alt/worker.rs | 4 +- tokio/tests/rt_metrics.rs | 86 ++++++++++++++++++- 7 files changed, 112 insertions(+), 21 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index e20a3c4955b..fe65e9ba4d8 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,5 +1,5 @@ use crate::runtime::handle::Handle; -use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime}; +use crate::runtime::{blocking, driver, Callback, CallbackWorker, HistogramBuilder, Runtime}; use crate::util::rand::{RngSeed, RngSeedGenerator}; use std::fmt; @@ -73,10 +73,10 @@ pub struct Builder { pub(super) before_stop: Option, /// To run before each worker thread is parked. - pub(super) before_park: Option, + pub(super) before_park: Option, /// To run after each thread is unparked. - pub(super) after_unpark: Option, + pub(super) after_unpark: Option, /// Customizable keep alive timeout for `BlockingPool` pub(super) keep_alive: Option, @@ -592,7 +592,7 @@ impl Builder { /// .worker_threads(1) /// .on_thread_park({ /// let barrier = barrier.clone(); - /// move || { + /// move |_| { /// let barrier = barrier.clone(); /// if once.swap(false, Ordering::Relaxed) { /// tokio::spawn(async move { barrier.wait().await; }); @@ -620,7 +620,7 @@ impl Builder { /// let runtime = runtime::Builder::new_current_thread() /// .on_thread_park({ /// let barrier = barrier.clone(); - /// move || { + /// move |_| { /// let barrier = barrier.clone(); /// if once.swap(false, Ordering::Relaxed) { /// tokio::spawn(async move { barrier.wait().await; }); @@ -638,7 +638,7 @@ impl Builder { #[cfg(not(loom))] pub fn on_thread_park(&mut self, f: F) -> &mut Self where - F: Fn() + Send + Sync + 'static, + F: Fn(usize) + Send + Sync + 'static, { self.before_park = Some(std::sync::Arc::new(f)); self @@ -659,7 +659,7 @@ impl Builder { /// # use tokio::runtime; /// # pub fn main() { /// let runtime = runtime::Builder::new_multi_thread() - /// .on_thread_unpark(|| { + /// .on_thread_unpark(|_| { /// println!("thread unparking"); /// }) /// .build(); @@ -673,7 +673,7 @@ impl Builder { #[cfg(not(loom))] pub fn on_thread_unpark(&mut self, f: F) -> &mut Self where - F: Fn() + Send + Sync + 'static, + F: Fn(usize) + Send + Sync + 'static, { self.after_unpark = Some(std::sync::Arc::new(f)); self diff --git a/tokio/src/runtime/config.rs b/tokio/src/runtime/config.rs index d846a0d224a..5dc301531ca 100644 --- a/tokio/src/runtime/config.rs +++ b/tokio/src/runtime/config.rs @@ -2,7 +2,7 @@ any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"), allow(dead_code) )] -use crate::runtime::Callback; +use crate::runtime::CallbackWorker; use crate::util::RngSeedGenerator; pub(crate) struct Config { @@ -16,10 +16,10 @@ pub(crate) struct Config { pub(crate) local_queue_capacity: usize, /// Callback for a worker parking itself - pub(crate) before_park: Option, + pub(crate) before_park: Option, /// Callback for a worker unparking itself - pub(crate) after_unpark: Option, + pub(crate) after_unpark: Option, /// The multi-threaded scheduler includes a per-worker LIFO slot used to /// store the last scheduled task. This can improve certain usage patterns, diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 3d333960f3d..ae90b29e741 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -406,4 +406,7 @@ cfg_rt! { /// After thread starts / before thread stops type Callback = std::sync::Arc; + + /// Before thread parks / after thread unparks + type CallbackWorker = std::sync::Arc; } diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 36bcefc4406..f6542704228 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -351,7 +351,7 @@ impl Context { let mut driver = core.driver.take().expect("driver missing"); if let Some(f) = &handle.shared.config.before_park { - let (c, ()) = self.enter(core, || f()); + let (c, ()) = self.enter(core, || f(0)); core = c; } @@ -371,7 +371,7 @@ impl Context { } if let Some(f) = &handle.shared.config.after_unpark { - let (c, ()) = self.enter(core, || f()); + let (c, ()) = self.enter(core, || f(0)); core = c; } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 9998870ab4d..6e2f1ef330f 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -100,6 +100,9 @@ pub(super) struct Worker { /// Core data struct Core { + /// Index of this core + index: usize, + /// Used to schedule bookkeeping tasks every so often. tick: u32, @@ -252,7 +255,7 @@ pub(super) fn create( let mut worker_metrics = Vec::with_capacity(size); // Create the local queues - for _ in 0..size { + for i in 0..size { let (steal, run_queue) = queue::local(); let park = park.clone(); @@ -261,6 +264,7 @@ pub(super) fn create( let stats = Stats::new(&metrics); cores.push(Box::new(Core { + index: i, tick: 0, lifo_slot: None, lifo_enabled: !config.disable_lifo_slot, @@ -306,10 +310,10 @@ pub(super) fn create( let mut launch = Launch(vec![]); - for (index, core) in cores.drain(..).enumerate() { + for core in cores.drain(..) { launch.0.push(Arc::new(Worker { handle: handle.clone(), - index, + index: core.index, core: AtomicCell::new(Some(core)), })); } @@ -684,7 +688,7 @@ impl Context { /// after all the IOs get dispatched fn park(&self, mut core: Box) -> Box { if let Some(f) = &self.worker.handle.shared.config.before_park { - f(); + f(core.index); } if core.transition_to_parked(&self.worker) { @@ -702,7 +706,7 @@ impl Context { } if let Some(f) = &self.worker.handle.shared.config.after_unpark { - f(); + f(core.index); } core } diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index 54c6b0ed7ba..feb407b9d21 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -1129,7 +1129,7 @@ impl Worker { fn park(&mut self, cx: &Context, mut core: Box) -> NextTaskResult { if let Some(f) = &cx.shared().config.before_park { - f(); + f(core.index); } if self.can_transition_to_parked(&mut core) { @@ -1140,7 +1140,7 @@ impl Worker { } if let Some(f) = &cx.shared().config.after_unpark { - f(); + f(core.index); } Ok((None, core)) diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index ec4856cb5c2..ea28acd6fc5 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -2,7 +2,7 @@ #![cfg(all(feature = "full", tokio_unstable, not(target_os = "wasi")))] use std::future::Future; -use std::sync::{Arc, Mutex}; +use std::sync::{atomic, Arc, Mutex}; use std::task::Poll; use tokio::macros::support::poll_fn; @@ -680,6 +680,90 @@ fn budget_exhaustion_yield_with_joins() { assert_eq!(1, rt.metrics().budget_forced_yield_count()); } +#[test] +fn on_thread_park_unpark() { + const THREADS: usize = 8; + + // Keeps track whether or not each worker is parked + let mut bools = Vec::new(); + for _ in 0..THREADS { + bools.push(atomic::AtomicBool::new(false)); + } + let bools = Arc::new(bools); + let bools_park = bools.clone(); + let bools_unpark = bools.clone(); + + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(THREADS) + .enable_all() + .on_thread_park(move |worker| { + // worker is parked + bools_park[worker].store(true, atomic::Ordering::Release); + }) + .on_thread_unpark(move |worker| { + bools_unpark[worker].store(false, atomic::Ordering::Release); + }) + .build() + .unwrap(); + let metrics = rt.metrics(); + + rt.block_on(async { + // Spawn some tasks to do things, but less than the number of workers. Some + // workers won't have any work to do and will stay parked the duration of the + // test. We rely on bools to distinguish between a busy (unparked) worker that + // isn't polling, vs. ones that are merely parked the entire time. + for _ in 0..(THREADS - 1) { + tokio::spawn(async { + loop { + tokio::time::sleep(Duration::from_millis(4)).await; + } + }); + } + + // Give the spawned tasks a chance to both poll and park. Not really necessary. + tokio::time::sleep(Duration::from_millis(30)).await; + + let _ = tokio::spawn(async move { + let mut counts = Vec::new(); + for ii in 0..THREADS { + counts.push(metrics.worker_poll_count(ii)); + } + + let start_time = std::time::Instant::now(); + while start_time.elapsed() < Duration::from_millis(100) { + // Uncomment the line below and the test fails (current worker is no + // longer "stuck" and not yielding back to tokio). + + // tokio::task::yield_now().await; + } + + let mut stuck = 0; + for ii in 0..THREADS { + let parked = bools[ii].load(atomic::Ordering::Acquire); + // Uncomment below to verify that some workers are not doing any polls, + // yet only one of them is not parked. + + // if !parked { + // println!("task {} is not parked", ii); + // } + // if metrics.worker_poll_count(ii) == counts[ii] { + // println!("task {} has same poll count", ii); + // } + + if !parked && metrics.worker_poll_count(ii) == counts[ii] { + stuck += 1; + } + } + + assert_eq!( + stuck, 1, + "should be exactly one non-polling, non-parked thread" + ); + }) + .await; + }); +} + #[cfg(any(target_os = "linux", target_os = "macos"))] #[test] fn io_driver_fd_count() { From dde346f29750f4cbcc1aca03043e2698c2ba63e0 Mon Sep 17 00:00:00 2001 From: Theron Tock <83027591+theron-eg@users.noreply.github.com> Date: Wed, 28 Feb 2024 16:33:32 -0800 Subject: [PATCH 2/6] Add on_thread_park_id() as unstable method on runtime Builder --- tokio/src/runtime/builder.rs | 96 +++++++++++++++++++++++++++++++++--- 1 file changed, 89 insertions(+), 7 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index fe65e9ba4d8..771fafd6f6b 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -592,7 +592,7 @@ impl Builder { /// .worker_threads(1) /// .on_thread_park({ /// let barrier = barrier.clone(); - /// move |_| { + /// move || { /// let barrier = barrier.clone(); /// if once.swap(false, Ordering::Relaxed) { /// tokio::spawn(async move { barrier.wait().await; }); @@ -620,7 +620,7 @@ impl Builder { /// let runtime = runtime::Builder::new_current_thread() /// .on_thread_park({ /// let barrier = barrier.clone(); - /// move |_| { + /// move || { /// let barrier = barrier.clone(); /// if once.swap(false, Ordering::Relaxed) { /// tokio::spawn(async move { barrier.wait().await; }); @@ -638,9 +638,9 @@ impl Builder { #[cfg(not(loom))] pub fn on_thread_park(&mut self, f: F) -> &mut Self where - F: Fn(usize) + Send + Sync + 'static, + F: Fn() + Send + Sync + 'static, { - self.before_park = Some(std::sync::Arc::new(f)); + self.before_park = Some(std::sync::Arc::new(move |_id| f())); self } @@ -659,7 +659,7 @@ impl Builder { /// # use tokio::runtime; /// # pub fn main() { /// let runtime = runtime::Builder::new_multi_thread() - /// .on_thread_unpark(|_| { + /// .on_thread_unpark(|| { /// println!("thread unparking"); /// }) /// .build(); @@ -673,9 +673,9 @@ impl Builder { #[cfg(not(loom))] pub fn on_thread_unpark(&mut self, f: F) -> &mut Self where - F: Fn(usize) + Send + Sync + 'static, + F: Fn() + Send + Sync + 'static, { - self.after_unpark = Some(std::sync::Arc::new(f)); + self.after_unpark = Some(std::sync::Arc::new(move |_id| f())); self } @@ -938,6 +938,88 @@ impl Builder { self.seed_generator = RngSeedGenerator::new(seed); self } + + /// Same behavior as `on_thread_park` except the id of the thread that is parked is passed + /// to the callback function `f`. The id corresponds to the same `usize` that is used in + /// calls to `RuntimeMetrics`. + /// + /// Note: if both `on_thread_park` and `on_thread_park_id` are called, only the last one + /// will be saved. + /// + /// # Examples + /// + /// ## Stuck task detector + /// + /// ``` + /// # use std::sync::atomic::{AtomicBool, Ordering}; + /// # use std::{thread, time}; + /// # use tokio::runtime; + /// + /// fn main() { + /// const WORKERS: usize = 4; + /// const UNPARKED: AtomicBool = AtomicBool::new(false); + /// static PARKED: [AtomicBool; WORKERS] = [UNPARKED; WORKERS]; + /// + /// let runtime = runtime::Builder::new_multi_thread() + /// .worker_threads(WORKERS) + /// .on_thread_park_id(|id| PARKED[id].store(true, Ordering::Release)) + /// .on_thread_unpark_id(|id| PARKED[id].store(false, Ordering::Release)) + /// .build() + /// .unwrap(); + /// + /// let metrics = runtime.handle().metrics(); + /// thread::spawn(move || { + /// let mut stuck_secs = [0; WORKERS]; + /// let mut prev_poll_counts = [None; WORKERS]; + /// loop { + /// thread::sleep(time::Duration::from_secs(1)); + /// for ii in 0..WORKERS { + /// if PARKED[ii].load(Ordering::Acquire) { + /// prev_poll_counts[ii] = None; + /// } else { + /// let poll_count = metrics.worker_poll_count(ii); + /// if Some(poll_count) == prev_poll_counts[ii] { + /// stuck_secs[ii] += 1; + /// println!("*** worker {} is stuck >= {} secs ***", ii, stuck_secs[ii]); + /// } else { + /// prev_poll_counts[ii] = Some(poll_count); + /// stuck_secs[ii] = 0; + /// } + /// } + /// } + /// } + /// }); + /// + /// // Spawn a "stuck" task that never yields back to tokio + /// runtime.spawn(async { loop {} }); + /// runtime.block_on(async { + /// // Do some work + /// # loop { tokio::task::yield_now().await; } + /// }); + /// } + /// ``` + #[cfg(not(loom))] + pub fn on_thread_park_id(&mut self, f: F) -> &mut Self + where + F: Fn(usize) + Send + Sync + 'static, + { + self.before_park = Some(std::sync::Arc::new(f)); + self + } + + /// Same behavior as `on_thread_unpark` except the id of the thread that is parked is passed + /// to the callback function `f`. The id corresponds to the same `usize` that is used in + /// calls to `RuntimeMetrics`. + /// + /// See `on_thread_park_id` for example stuck thread detector. + #[cfg(not(loom))] + pub fn on_thread_unpark_id(&mut self, f: F) -> &mut Self + where + F: Fn(usize) + Send + Sync + 'static, + { + self.after_unpark = Some(std::sync::Arc::new(f)); + self + } } cfg_metrics! { From 4d4b823f70b3ae3514bbed9bd0e8e037cb255a3c Mon Sep 17 00:00:00 2001 From: Theron Tock <83027591+theron-eg@users.noreply.github.com> Date: Thu, 29 Feb 2024 08:17:30 -0800 Subject: [PATCH 3/6] Ensure `unstable` tests are using the new on_thread_park_id() method --- tokio/tests/rt_metrics.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index ea28acd6fc5..6442351ac31 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -681,7 +681,7 @@ fn budget_exhaustion_yield_with_joins() { } #[test] -fn on_thread_park_unpark() { +fn on_thread_park_unpark_id() { const THREADS: usize = 8; // Keeps track whether or not each worker is parked @@ -696,11 +696,11 @@ fn on_thread_park_unpark() { let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(THREADS) .enable_all() - .on_thread_park(move |worker| { + .on_thread_park_id(move |worker| { // worker is parked bools_park[worker].store(true, atomic::Ordering::Release); }) - .on_thread_unpark(move |worker| { + .on_thread_unpark_id(move |worker| { bools_unpark[worker].store(false, atomic::Ordering::Release); }) .build() From dab40f7a0728fe53b28980095e82ec3eeee7310b Mon Sep 17 00:00:00 2001 From: Theron Tock <83027591+theron-eg@users.noreply.github.com> Date: Thu, 29 Feb 2024 16:06:01 -0800 Subject: [PATCH 4/6] Better behaved example for stuck task watchdog --- tokio/src/runtime/builder.rs | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 771fafd6f6b..b822ad894d2 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -953,48 +953,51 @@ impl Builder { /// ``` /// # use std::sync::atomic::{AtomicBool, Ordering}; /// # use std::{thread, time}; - /// # use tokio::runtime; /// /// fn main() { /// const WORKERS: usize = 4; /// const UNPARKED: AtomicBool = AtomicBool::new(false); - /// static PARKED: [AtomicBool; WORKERS] = [UNPARKED; WORKERS]; + /// static IS_PARKED: [AtomicBool; WORKERS] = [UNPARKED; WORKERS]; /// - /// let runtime = runtime::Builder::new_multi_thread() + /// let runtime = tokio::runtime::Builder::new_multi_thread() /// .worker_threads(WORKERS) - /// .on_thread_park_id(|id| PARKED[id].store(true, Ordering::Release)) - /// .on_thread_unpark_id(|id| PARKED[id].store(false, Ordering::Release)) + /// .on_thread_park_id(|id| IS_PARKED[id].store(true, Ordering::Release)) + /// .on_thread_unpark_id(|id| IS_PARKED[id].store(false, Ordering::Release)) /// .build() /// .unwrap(); /// /// let metrics = runtime.handle().metrics(); + /// let (done_tx, done_rx) = tokio::sync::oneshot::channel(); /// thread::spawn(move || { - /// let mut stuck_secs = [0; WORKERS]; + /// let mut stuck_since = [time::Instant::now(); WORKERS]; /// let mut prev_poll_counts = [None; WORKERS]; /// loop { - /// thread::sleep(time::Duration::from_secs(1)); + /// thread::sleep(time::Duration::from_millis(250)); + /// let now = time::Instant::now(); /// for ii in 0..WORKERS { - /// if PARKED[ii].load(Ordering::Acquire) { + /// if IS_PARKED[ii].load(Ordering::Acquire) { /// prev_poll_counts[ii] = None; /// } else { /// let poll_count = metrics.worker_poll_count(ii); /// if Some(poll_count) == prev_poll_counts[ii] { - /// stuck_secs[ii] += 1; - /// println!("*** worker {} is stuck >= {} secs ***", ii, stuck_secs[ii]); + /// let duration = now.duration_since(stuck_since[ii]); + /// println!("*** worker {} is stuck for {:?} ***", ii, duration); + /// if duration > time::Duration::from_secs(1) { + /// let _ = done_tx.send(()); + /// return; + /// } /// } else { /// prev_poll_counts[ii] = Some(poll_count); - /// stuck_secs[ii] = 0; + /// stuck_since[ii] = now; /// } /// } /// } /// } /// }); /// - /// // Spawn a "stuck" task that never yields back to tokio - /// runtime.spawn(async { loop {} }); + /// runtime.spawn(async { thread::sleep(time::Duration::from_secs(3)) }); /// runtime.block_on(async { - /// // Do some work - /// # loop { tokio::task::yield_now().await; } + /// let _ = done_rx.await; /// }); /// } /// ``` From 2301515686e4ee40da35738e6e9247e108921ca6 Mon Sep 17 00:00:00 2001 From: Theron Tock <83027591+theron-eg@users.noreply.github.com> Date: Thu, 29 Feb 2024 16:23:20 -0800 Subject: [PATCH 5/6] Add comment about spawning a stuck task --- tokio/src/runtime/builder.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index b822ad894d2..a4bf7e3d285 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -995,6 +995,7 @@ impl Builder { /// } /// }); /// + /// // Spawn a "stuck" task that doesn't yield properly (should be detected). /// runtime.spawn(async { thread::sleep(time::Duration::from_secs(3)) }); /// runtime.block_on(async { /// let _ = done_rx.await; From 451410f760e5570cc51f95dc4837b66c91883b9d Mon Sep 17 00:00:00 2001 From: Theron Tock <83027591+theron-eg@users.noreply.github.com> Date: Fri, 1 Mar 2024 10:37:42 -0800 Subject: [PATCH 6/6] Minor change to trigger new test run. --- tokio/src/runtime/builder.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index a4bf7e3d285..b6193cfb43b 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -939,9 +939,9 @@ impl Builder { self } - /// Same behavior as `on_thread_park` except the id of the thread that is parked is passed - /// to the callback function `f`. The id corresponds to the same `usize` that is used in - /// calls to `RuntimeMetrics`. + /// Has the same behavior as `on_thread_park` except the id of the thread that is parked + /// is passed to the callback function `f`. The id corresponds to the same `usize` that + /// is used in calls to `RuntimeMetrics`. /// /// Note: if both `on_thread_park` and `on_thread_park_id` are called, only the last one /// will be saved.