From e440d832895462378cb7dc689873f206e0c2b81c Mon Sep 17 00:00:00 2001 From: Sebastian Urban Date: Fri, 19 Jul 2024 19:17:17 +0200 Subject: [PATCH 1/4] metrics: add worker_park_unpark_count This counts the number of times a worker was parked and unparked. Thus it is odd if the worker is parked and even if the worker is unparked. --- tokio/src/runtime/metrics/batch.rs | 11 ++++ tokio/src/runtime/metrics/mock.rs | 1 + tokio/src/runtime/metrics/runtime.rs | 53 +++++++++++++++++++ tokio/src/runtime/metrics/worker.rs | 3 ++ .../runtime/scheduler/current_thread/mod.rs | 3 ++ .../runtime/scheduler/multi_thread/stats.rs | 4 ++ .../runtime/scheduler/multi_thread/worker.rs | 5 ++ .../scheduler/multi_thread_alt/stats.rs | 4 ++ .../scheduler/multi_thread_alt/worker.rs | 3 ++ tokio/tests/rt_unstable_metrics.rs | 20 +++++++ 10 files changed, 107 insertions(+) diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index 1d0f3dea30a..72268fe56ff 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -7,6 +7,9 @@ pub(crate) struct MetricsBatch { /// Number of times the worker parked. park_count: u64, + /// Number of times the worker parked and unparked. + park_unpark_count: u64, + /// Number of times the worker woke w/o doing work. noop_count: u64, @@ -54,6 +57,7 @@ impl MetricsBatch { MetricsBatch { park_count: 0, + park_unpark_count: 0, noop_count: 0, steal_count: 0, steal_operations: 0, @@ -76,6 +80,7 @@ impl MetricsBatch { pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) { worker.mean_poll_time.store(mean_poll_time, Relaxed); worker.park_count.store(self.park_count, Relaxed); + worker.park_unpark_count.store(self.park_unpark_count, Relaxed); worker.noop_count.store(self.noop_count, Relaxed); worker.steal_count.store(self.steal_count, Relaxed); worker @@ -101,6 +106,7 @@ impl MetricsBatch { /// The worker is about to park. pub(crate) fn about_to_park(&mut self) { self.park_count += 1; + self.park_unpark_count += 1; if self.poll_count_on_last_park == self.poll_count { self.noop_count += 1; @@ -109,6 +115,11 @@ impl MetricsBatch { } } + /// The worker was unparked. + pub(crate) fn unparked(&mut self) { + self.park_unpark_count += 1; + } + /// Start processing a batch of tasks pub(crate) fn start_processing_scheduled_tasks(&mut self) { self.processing_scheduled_tasks_started_at = Instant::now(); diff --git a/tokio/src/runtime/metrics/mock.rs b/tokio/src/runtime/metrics/mock.rs index e4bb3a99d0c..f4dc116539b 100644 --- a/tokio/src/runtime/metrics/mock.rs +++ b/tokio/src/runtime/metrics/mock.rs @@ -39,6 +39,7 @@ impl MetricsBatch { pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {} pub(crate) fn about_to_park(&mut self) {} + pub(crate) fn unparked(&mut self) {} pub(crate) fn inc_local_schedule_count(&mut self) {} pub(crate) fn start_processing_scheduled_tasks(&mut self) {} pub(crate) fn end_processing_scheduled_tasks(&mut self) {} diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index fdbda6f3cb9..a923f91f8e7 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -242,6 +242,59 @@ impl RuntimeMetrics { .load(Relaxed) } + /// Returns the total number of times the given worker thread has parked + /// and unparked. + /// + /// The worker park/unpark count starts at zero when the runtime is created + /// and increases by one each time the worker parks the thread waiting for + /// new inbound events to process. This usually means the worker has processed + /// all pending work and is currently idle. When new work becomes available, + /// the worker is unparked and the park/unpark count is again increased by one. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. An odd count means that the worker is currently parked; + /// an even count means that the worker is active. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// let n = metrics.worker_park_unpark_count(0); + /// + /// println!("worker 0 parked and unparked {} times", n); + /// + /// if n % 2 == 0 { + /// println!("worker 0 is active"); + /// } else { + /// println!("worker 0 is parked"); + /// } + /// } + /// ``` + pub fn worker_park_unpark_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .park_unpark_count + .load(Relaxed) + } + + /// Returns the number of times the given worker thread unparked but /// performed no work before parking again. /// diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index e5d2c6f17fd..02dddc85247 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -16,6 +16,9 @@ pub(crate) struct WorkerMetrics { /// Number of times the worker parked. pub(crate) park_count: MetricAtomicU64, + /// Number of times the worker parked and unparked. + pub(crate) park_unpark_count: MetricAtomicU64, + /// Number of times the worker woke then parked again without doing work. pub(crate) noop_count: MetricAtomicU64, diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 42a0a8822f4..4a3d849d264 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -368,6 +368,9 @@ impl Context { }); core = c; + + core.metrics.unparked(); + core.submit_metrics(handle); } if let Some(f) = &handle.shared.config.after_unpark { diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index 9d495706e8d..c59d4373ab8 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -74,6 +74,10 @@ impl Stats { self.batch.about_to_park(); } + pub(crate) fn unparked(&mut self) { + self.batch.unparked(); + } + pub(crate) fn inc_local_schedule_count(&mut self) { self.batch.inc_local_schedule_count(); } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 8ef487b09fd..d71e62df53a 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -699,8 +699,13 @@ impl Context { if core.transition_to_parked(&self.worker) { while !core.is_shutdown && !core.is_traced { core.stats.about_to_park(); + core.stats + .submit(&self.worker.handle.shared.worker_metrics[self.worker.index]); + core = self.park_timeout(core, None); + core.stats.unparked(); + // Run regularly scheduled maintenance core.maintenance(&self.worker); diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs b/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs index c2045602797..b7fdd82c9ef 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs @@ -100,6 +100,10 @@ impl Stats { self.batch.about_to_park(); } + pub(crate) fn unparked(&mut self) { + self.batch.unparked(); + } + pub(crate) fn inc_local_schedule_count(&mut self) { self.batch.inc_local_schedule_count(); } diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index 9ceb7815a53..8f07b84297a 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -658,6 +658,9 @@ impl Worker { let n = cmp::max(core.run_queue.remaining_slots() / 2, 1); let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n); + core.stats.unparked(); + self.flush_metrics(cx, &mut core); + Ok((maybe_task, core)) } diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index 6640c524a69..6c3248e19f1 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -170,6 +170,26 @@ fn worker_park_count() { assert!(1 <= metrics.worker_park_count(1)); } +#[test] +fn worker_park_unpark_count() { + let rt = current_thread(); + let metrics = rt.metrics(); + rt.block_on(async { + time::sleep(Duration::from_millis(1)).await; + }); + drop(rt); + assert!(2 <= metrics.worker_park_unpark_count(0)); + + let rt = threaded(); + let metrics = rt.metrics(); + rt.block_on(async { + time::sleep(Duration::from_millis(1)).await; + }); + drop(rt); + assert!(2 <= metrics.worker_park_unpark_count(0)); + assert!(2 <= metrics.worker_park_unpark_count(1)); +} + #[test] fn worker_noop_count() { // There isn't really a great way to generate no-op parks as they happen as From bf4631e623a8f8cba792ce5d32892534cccf769e Mon Sep 17 00:00:00 2001 From: Sebastian Urban Date: Fri, 19 Jul 2024 21:28:53 +0200 Subject: [PATCH 2/4] rustfmt --- tokio/src/runtime/metrics/batch.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index 72268fe56ff..6118bcd04ca 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -80,7 +80,9 @@ impl MetricsBatch { pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) { worker.mean_poll_time.store(mean_poll_time, Relaxed); worker.park_count.store(self.park_count, Relaxed); - worker.park_unpark_count.store(self.park_unpark_count, Relaxed); + worker + .park_unpark_count + .store(self.park_unpark_count, Relaxed); worker.noop_count.store(self.noop_count, Relaxed); worker.steal_count.store(self.steal_count, Relaxed); worker From 8511ab18cd370c27db3a588b1086e4ba40e6046e Mon Sep 17 00:00:00 2001 From: Sebastian Urban Date: Mon, 22 Jul 2024 11:06:38 +0200 Subject: [PATCH 3/4] Implement code review --- tokio/src/runtime/metrics/runtime.rs | 6 ++++-- tokio/tests/rt_unstable_metrics.rs | 7 +++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index a923f91f8e7..5bb79927a82 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -251,9 +251,11 @@ impl RuntimeMetrics { /// all pending work and is currently idle. When new work becomes available, /// the worker is unparked and the park/unpark count is again increased by one. /// + /// An odd count means that the worker is currently parked. + /// An even count means that the worker is currently active. + /// /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. An odd count means that the worker is currently parked; - /// an even count means that the worker is active. + /// reset to zero. /// /// # Arguments /// diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index 6c3248e19f1..31e834ba2fa 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -13,7 +13,7 @@ use std::task::Poll; use tokio::macros::support::poll_fn; use tokio::runtime::Runtime; -use tokio::task::consume_budget; +use tokio::task::{consume_budget, yield_now}; use tokio::time::{self, Duration}; #[test] @@ -183,11 +183,10 @@ fn worker_park_unpark_count() { let rt = threaded(); let metrics = rt.metrics(); rt.block_on(async { - time::sleep(Duration::from_millis(1)).await; + yield_now().await; }); drop(rt); - assert!(2 <= metrics.worker_park_unpark_count(0)); - assert!(2 <= metrics.worker_park_unpark_count(1)); + assert!(2 <= metrics.worker_park_unpark_count(0) || 2 <= metrics.worker_park_unpark_count(1)); } #[test] From 7176afdbb73bd1a3e8a5d5f36a3a4523f9f02159 Mon Sep 17 00:00:00 2001 From: Sebastian Urban Date: Mon, 22 Jul 2024 17:38:54 +0200 Subject: [PATCH 4/4] Avoid sleep in test --- tokio/tests/rt_unstable_metrics.rs | 35 +++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index 31e834ba2fa..2e51edee4d9 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -13,7 +13,7 @@ use std::task::Poll; use tokio::macros::support::poll_fn; use tokio::runtime::Runtime; -use tokio::task::{consume_budget, yield_now}; +use tokio::task::consume_budget; use tokio::time::{self, Duration}; #[test] @@ -174,19 +174,38 @@ fn worker_park_count() { fn worker_park_unpark_count() { let rt = current_thread(); let metrics = rt.metrics(); - rt.block_on(async { - time::sleep(Duration::from_millis(1)).await; - }); + rt.block_on(rt.spawn(async {})).unwrap(); drop(rt); assert!(2 <= metrics.worker_park_unpark_count(0)); let rt = threaded(); let metrics = rt.metrics(); - rt.block_on(async { - yield_now().await; - }); + + // Wait for workers to be parked after runtime startup. + for _ in 0..100 { + if 1 <= metrics.worker_park_unpark_count(0) && 1 <= metrics.worker_park_unpark_count(1) { + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + assert_eq!(1, metrics.worker_park_unpark_count(0)); + assert_eq!(1, metrics.worker_park_unpark_count(1)); + + // Spawn a task to unpark and then park a worker. + rt.block_on(rt.spawn(async {})).unwrap(); + for _ in 0..100 { + if 3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1) { + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + assert!(3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1)); + + // Both threads unpark for runtime shutdown. drop(rt); - assert!(2 <= metrics.worker_park_unpark_count(0) || 2 <= metrics.worker_park_unpark_count(1)); + assert_eq!(0, metrics.worker_park_unpark_count(0) % 2); + assert_eq!(0, metrics.worker_park_unpark_count(1) % 2); + assert!(4 <= metrics.worker_park_unpark_count(0) || 4 <= metrics.worker_park_unpark_count(1)); } #[test]