diff --git a/Cargo.toml b/Cargo.toml index d8ac248189d..2238deac71c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,4 +17,3 @@ members = [ [workspace.metadata.spellcheck] config = "spellcheck.toml" - diff --git a/LICENSE b/LICENSE index 8bdf6bd60d3..f0dbcf4b45d 100644 --- a/LICENSE +++ b/LICENSE @@ -1,25 +1,21 @@ -Copyright (c) 2023 Tokio Contributors +MIT License -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: +Copyright (c) Tokio Contributors -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/tokio-macros/LICENSE b/tokio-macros/LICENSE index 12d1037fd0c..c4d82b91d6d 100644 --- a/tokio-macros/LICENSE +++ b/tokio-macros/LICENSE @@ -1,32 +1,7 @@ -Copyright (c) 2023 Tokio Contributors - -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: - -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. - -The MIT License (MIT) +MIT License Copyright (c) 2019 Yoshua Wuyts +Copyright (c) Tokio Contributors Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/tokio-stream/LICENSE b/tokio-stream/LICENSE index 8bdf6bd60d3..f0dbcf4b45d 100644 --- a/tokio-stream/LICENSE +++ b/tokio-stream/LICENSE @@ -1,25 +1,21 @@ -Copyright (c) 2023 Tokio Contributors +MIT License -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: +Copyright (c) Tokio Contributors -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/tokio-test/LICENSE b/tokio-test/LICENSE index 8bdf6bd60d3..f0dbcf4b45d 100644 --- a/tokio-test/LICENSE +++ b/tokio-test/LICENSE @@ -1,25 +1,21 @@ -Copyright (c) 2023 Tokio Contributors +MIT License -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: +Copyright (c) Tokio Contributors -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/tokio-util/LICENSE b/tokio-util/LICENSE index 8bdf6bd60d3..f0dbcf4b45d 100644 --- a/tokio-util/LICENSE +++ b/tokio-util/LICENSE @@ -1,25 +1,21 @@ -Copyright (c) 2023 Tokio Contributors +MIT License -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: +Copyright (c) Tokio Contributors -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 020cc1e4ac2..e46e274c47a 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -118,7 +118,7 @@ signal-hook-registry = { version = "1.1.1", optional = true } [target.'cfg(unix)'.dev-dependencies] libc = { version = "0.2.149" } -nix = { version = "0.27.1", default-features = false, features = ["fs", "socket"] } +nix = { version = "0.29.0", default-features = false, features = ["aio", "fs", "socket"] } [target.'cfg(windows)'.dependencies.windows-sys] version = "0.48" @@ -149,7 +149,7 @@ rand = "0.8.0" wasm-bindgen-test = "0.3.0" [target.'cfg(target_os = "freebsd")'.dev-dependencies] -mio-aio = { version = "0.8.0", features = ["tokio"] } +mio-aio = { version = "0.9.0", features = ["tokio"] } [target.'cfg(loom)'.dev-dependencies] loom = { version = "0.7", features = ["futures", "checkpoint"] } diff --git a/tokio/LICENSE b/tokio/LICENSE index 8bdf6bd60d3..f0dbcf4b45d 100644 --- a/tokio/LICENSE +++ b/tokio/LICENSE @@ -1,25 +1,21 @@ -Copyright (c) 2023 Tokio Contributors +MIT License -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: +Copyright (c) Tokio Contributors -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index f44599ff47a..8a72476f7c4 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -215,7 +215,7 @@ macro_rules! cfg_macros { } } -macro_rules! cfg_metrics { +macro_rules! cfg_unstable_metrics { ($($item:item)*) => { $( #[cfg(tokio_unstable)] @@ -245,7 +245,7 @@ macro_rules! cfg_no_64bit_metrics { } } -macro_rules! cfg_not_metrics { +macro_rules! cfg_not_unstable_metrics { ($($item:item)*) => { $( #[cfg(not(tokio_unstable))] diff --git a/tokio/src/net/windows/named_pipe.rs b/tokio/src/net/windows/named_pipe.rs index 98e63f0c450..0b312f896f1 100644 --- a/tokio/src/net/windows/named_pipe.rs +++ b/tokio/src/net/windows/named_pipe.rs @@ -75,7 +75,8 @@ use self::doc::*; /// let server = tokio::spawn(async move { /// loop { /// // Wait for a client to connect. -/// let connected = server.connect().await?; +/// server.connect().await?; +/// let connected_client = server; /// /// // Construct the next server to be connected before sending the one /// // we already have of onto a task. This ensures that the server @@ -2626,7 +2627,7 @@ pub enum PipeEnd { /// Information about a named pipe. /// /// Constructed through [`NamedPipeServer::info`] or [`NamedPipeClient::info`]. -#[derive(Debug)] +#[derive(Debug, Clone)] #[non_exhaustive] pub struct PipeInfo { /// Indicates the mode of a named pipe. diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index c74aea76568..3757079f329 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -40,7 +40,7 @@ impl SpawnerMetrics { self.num_idle_threads.load(Ordering::Relaxed) } - cfg_metrics! { + cfg_unstable_metrics! { fn queue_depth(&self) -> usize { self.queue_depth.load(Ordering::Relaxed) } @@ -474,7 +474,7 @@ impl Spawner { } } -cfg_metrics! { +cfg_unstable_metrics! { impl Spawner { pub(crate) fn num_threads(&self) -> usize { self.inner.metrics.num_threads() diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 05f736d3e50..519c7d01413 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -957,7 +957,7 @@ impl Builder { } } - cfg_metrics! { + cfg_unstable_metrics! { /// Enables tracking the distribution of task poll times. /// /// Task poll times are not instrumented by default as doing so requires diff --git a/tokio/src/runtime/coop.rs b/tokio/src/runtime/coop.rs index d9f7ff2af29..f2afa75c9c4 100644 --- a/tokio/src/runtime/coop.rs +++ b/tokio/src/runtime/coop.rs @@ -197,7 +197,7 @@ cfg_coop! { } cfg_rt! { - cfg_metrics! { + cfg_unstable_metrics! { #[inline(always)] fn inc_budget_forced_yield_count() { let _ = context::with_current(|handle| { @@ -206,7 +206,7 @@ cfg_coop! { } } - cfg_not_metrics! { + cfg_not_unstable_metrics! { #[inline(always)] fn inc_budget_forced_yield_count() {} } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 01d210cd36f..5691a6e3bd2 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -1,6 +1,6 @@ #[cfg(tokio_unstable)] use crate::runtime; -use crate::runtime::{context, scheduler, RuntimeFlavor}; +use crate::runtime::{context, scheduler, RuntimeFlavor, RuntimeMetrics}; /// Handle to the runtime. /// @@ -393,17 +393,11 @@ impl Handle { owned_id.into() } } -} - -cfg_metrics! { - use crate::runtime::RuntimeMetrics; - impl Handle { - /// Returns a view that lets you get information about how the runtime - /// is performing. - pub fn metrics(&self) -> RuntimeMetrics { - RuntimeMetrics::new(self.clone()) - } + /// Returns a view that lets you get information about how the runtime + /// is performing. + pub fn metrics(&self) -> RuntimeMetrics { + RuntimeMetrics::new(self.clone()) } } diff --git a/tokio/src/runtime/io/metrics.rs b/tokio/src/runtime/io/metrics.rs index ec341efe680..e7a01bc2f46 100644 --- a/tokio/src/runtime/io/metrics.rs +++ b/tokio/src/runtime/io/metrics.rs @@ -17,7 +17,7 @@ cfg_not_rt_and_metrics_and_net! { cfg_net! { cfg_rt! { - cfg_metrics! { + cfg_unstable_metrics! { pub(crate) use crate::runtime::IoDriverMetrics; } } diff --git a/tokio/src/runtime/metrics/histogram.rs b/tokio/src/runtime/metrics/histogram.rs index f75ffa3b495..4cfd769a94e 100644 --- a/tokio/src/runtime/metrics/histogram.rs +++ b/tokio/src/runtime/metrics/histogram.rs @@ -1,5 +1,5 @@ -use crate::loom::sync::atomic::Ordering::Relaxed; use crate::util::metric_atomics::MetricAtomicU64; +use std::sync::atomic::Ordering::Relaxed; use std::cmp; use std::ops::Range; diff --git a/tokio/src/runtime/metrics/io.rs b/tokio/src/runtime/metrics/io.rs index 674fca5faec..9fdf3c96694 100644 --- a/tokio/src/runtime/metrics/io.rs +++ b/tokio/src/runtime/metrics/io.rs @@ -1,6 +1,7 @@ #![cfg_attr(not(feature = "net"), allow(dead_code))] -use crate::{loom::sync::atomic::Ordering::Relaxed, util::metric_atomics::MetricAtomicU64}; +use crate::util::metric_atomics::MetricAtomicU64; +use std::sync::atomic::Ordering::Relaxed; #[derive(Default)] pub(crate) struct IoDriverMetrics { diff --git a/tokio/src/runtime/metrics/mod.rs b/tokio/src/runtime/metrics/mod.rs index 88be4a5211f..295c97cce88 100644 --- a/tokio/src/runtime/metrics/mod.rs +++ b/tokio/src/runtime/metrics/mod.rs @@ -8,7 +8,10 @@ //! [unstable]: crate#unstable-features #![allow(clippy::module_inception)] -cfg_metrics! { +mod runtime; +pub use runtime::RuntimeMetrics; + +cfg_unstable_metrics! { mod batch; pub(crate) use batch::MetricsBatch; @@ -17,9 +20,6 @@ cfg_metrics! { #[allow(unreachable_pub)] // rust-lang/rust#57411 pub use histogram::HistogramScale; - mod runtime; - #[allow(unreachable_pub)] // rust-lang/rust#57411 - pub use runtime::RuntimeMetrics; mod scheduler; pub(crate) use scheduler::SchedulerMetrics; @@ -33,7 +33,7 @@ cfg_metrics! { } } -cfg_not_metrics! { +cfg_not_unstable_metrics! { mod mock; pub(crate) use mock::{SchedulerMetrics, WorkerMetrics, MetricsBatch, HistogramBuilder}; diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 865a6406a6a..8d30f66f6ff 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -1,10 +1,12 @@ use crate::runtime::Handle; -use std::ops::Range; -cfg_64bit_metrics! { - use std::sync::atomic::Ordering::Relaxed; +cfg_unstable_metrics! { + use std::ops::Range; + cfg_64bit_metrics! { + use std::sync::atomic::Ordering::Relaxed; + } + use std::time::Duration; } -use std::time::Duration; /// Handle to the runtime's metrics. /// @@ -45,221 +47,354 @@ impl RuntimeMetrics { self.handle.inner.num_workers() } - /// Returns the number of additional threads spawned by the runtime. - /// - /// The number of workers is set by configuring `max_blocking_threads` on - /// `runtime::Builder`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let _ = tokio::task::spawn_blocking(move || { - /// // Stand-in for compute-heavy work or using synchronous APIs - /// 1 + 1 - /// }).await; - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.num_blocking_threads(); - /// println!("Runtime has created {} threads", n); - /// } - /// ``` - pub fn num_blocking_threads(&self) -> usize { - self.handle.inner.num_blocking_threads() - } + cfg_unstable_metrics! { - /// Returns the number of active tasks in the runtime. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.active_tasks_count(); - /// println!("Runtime has {} active tasks", n); - /// } - /// ``` - pub fn active_tasks_count(&self) -> usize { - self.handle.inner.active_tasks_count() - } + /// Returns the number of additional threads spawned by the runtime. + /// + /// The number of workers is set by configuring `max_blocking_threads` on + /// `runtime::Builder`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let _ = tokio::task::spawn_blocking(move || { + /// // Stand-in for compute-heavy work or using synchronous APIs + /// 1 + 1 + /// }).await; + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.num_blocking_threads(); + /// println!("Runtime has created {} threads", n); + /// } + /// ``` + pub fn num_blocking_threads(&self) -> usize { + self.handle.inner.num_blocking_threads() + } - /// Returns the number of idle threads, which have spawned by the runtime - /// for `spawn_blocking` calls. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let _ = tokio::task::spawn_blocking(move || { - /// // Stand-in for compute-heavy work or using synchronous APIs - /// 1 + 1 - /// }).await; - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.num_idle_blocking_threads(); - /// println!("Runtime has {} idle blocking thread pool threads", n); - /// } - /// ``` - pub fn num_idle_blocking_threads(&self) -> usize { - self.handle.inner.num_idle_blocking_threads() - } + /// Returns the number of active tasks in the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.active_tasks_count(); + /// println!("Runtime has {} active tasks", n); + /// } + /// ``` + pub fn active_tasks_count(&self) -> usize { + self.handle.inner.active_tasks_count() + } - cfg_64bit_metrics! { - /// Returns the number of tasks scheduled from **outside** of the runtime. - /// - /// The remote schedule count starts at zero when the runtime is created and - /// increases by one each time a task is woken from **outside** of the - /// runtime. This usually means that a task is spawned or notified from a - /// non-runtime thread and must be queued using the Runtime's injection - /// queue, which tends to be slower. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.remote_schedule_count(); - /// println!("{} tasks were scheduled from outside the runtime", n); - /// } - /// ``` - pub fn remote_schedule_count(&self) -> u64 { - self.handle - .inner - .scheduler_metrics() - .remote_schedule_count - .load(Relaxed) - } + /// Returns the number of idle threads, which have spawned by the runtime + /// for `spawn_blocking` calls. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let _ = tokio::task::spawn_blocking(move || { + /// // Stand-in for compute-heavy work or using synchronous APIs + /// 1 + 1 + /// }).await; + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.num_idle_blocking_threads(); + /// println!("Runtime has {} idle blocking thread pool threads", n); + /// } + /// ``` + pub fn num_idle_blocking_threads(&self) -> usize { + self.handle.inner.num_idle_blocking_threads() + } - /// Returns the number of times that tasks have been forced to yield back to the scheduler - /// after exhausting their task budgets. - /// - /// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - pub fn budget_forced_yield_count(&self) -> u64 { - self.handle - .inner - .scheduler_metrics() - .budget_forced_yield_count - .load(Relaxed) - } + cfg_64bit_metrics! { + /// Returns the number of tasks scheduled from **outside** of the runtime. + /// + /// The remote schedule count starts at zero when the runtime is created and + /// increases by one each time a task is woken from **outside** of the + /// runtime. This usually means that a task is spawned or notified from a + /// non-runtime thread and must be queued using the Runtime's injection + /// queue, which tends to be slower. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.remote_schedule_count(); + /// println!("{} tasks were scheduled from outside the runtime", n); + /// } + /// ``` + pub fn remote_schedule_count(&self) -> u64 { + self.handle + .inner + .scheduler_metrics() + .remote_schedule_count + .load(Relaxed) + } - /// Returns the total number of times the given worker thread has parked. - /// - /// The worker park count starts at zero when the runtime is created and - /// increases by one each time the worker parks the thread waiting for new - /// inbound events to process. This usually means the worker has processed - /// all pending work and is currently idle. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_park_count(0); - /// println!("worker 0 parked {} times", n); - /// } - /// ``` - pub fn worker_park_count(&self, worker: usize) -> u64 { - self.handle - .inner - .worker_metrics(worker) - .park_count - .load(Relaxed) - } + /// Returns the number of times that tasks have been forced to yield back to the scheduler + /// after exhausting their task budgets. + /// + /// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + pub fn budget_forced_yield_count(&self) -> u64 { + self.handle + .inner + .scheduler_metrics() + .budget_forced_yield_count + .load(Relaxed) + } - /// Returns the number of times the given worker thread unparked but - /// performed no work before parking again. - /// - /// The worker no-op count starts at zero when the runtime is created and - /// increases by one each time the worker unparks the thread but finds no - /// new work and goes back to sleep. This indicates a false-positive wake up. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_noop_count(0); - /// println!("worker 0 had {} no-op unparks", n); - /// } - /// ``` - pub fn worker_noop_count(&self, worker: usize) -> u64 { - self.handle - .inner - .worker_metrics(worker) - .noop_count - .load(Relaxed) - } + /// Returns the total number of times the given worker thread has parked. + /// + /// The worker park count starts at zero when the runtime is created and + /// increases by one each time the worker parks the thread waiting for new + /// inbound events to process. This usually means the worker has processed + /// all pending work and is currently idle. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_park_count(0); + /// println!("worker 0 parked {} times", n); + /// } + /// ``` + pub fn worker_park_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .park_count + .load(Relaxed) + } - /// Returns the number of tasks the given worker thread stole from - /// another worker thread. - /// - /// This metric only applies to the **multi-threaded** runtime and will - /// always return `0` when using the current thread runtime. + /// Returns the number of times the given worker thread unparked but + /// performed no work before parking again. + /// + /// The worker no-op count starts at zero when the runtime is created and + /// increases by one each time the worker unparks the thread but finds no + /// new work and goes back to sleep. This indicates a false-positive wake up. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_noop_count(0); + /// println!("worker 0 had {} no-op unparks", n); + /// } + /// ``` + pub fn worker_noop_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .noop_count + .load(Relaxed) + } + + /// Returns the number of tasks the given worker thread stole from + /// another worker thread. + /// + /// This metric only applies to the **multi-threaded** runtime and will + /// always return `0` when using the current thread runtime. + /// + /// The worker steal count starts at zero when the runtime is created and + /// increases by `N` each time the worker has processed its scheduled queue + /// and successfully steals `N` more pending tasks from another worker. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_steal_count(0); + /// println!("worker 0 has stolen {} tasks", n); + /// } + /// ``` + pub fn worker_steal_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .steal_count + .load(Relaxed) + } + + /// Returns the number of times the given worker thread stole tasks from + /// another worker thread. + /// + /// This metric only applies to the **multi-threaded** runtime and will + /// always return `0` when using the current thread runtime. + /// + /// The worker steal count starts at zero when the runtime is created and + /// increases by one each time the worker has processed its scheduled queue + /// and successfully steals more pending tasks from another worker. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_steal_operations(0); + /// println!("worker 0 has stolen tasks {} times", n); + /// } + /// ``` + pub fn worker_steal_operations(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .steal_operations + .load(Relaxed) + } + + /// Returns the number of tasks the given worker thread has polled. + /// + /// The worker poll count starts at zero when the runtime is created and + /// increases by one each time the worker polls a scheduled task. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_poll_count(0); + /// println!("worker 0 has polled {} tasks", n); + /// } + /// ``` + pub fn worker_poll_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .poll_count + .load(Relaxed) + } + + /// Returns the amount of time the given worker thread has been busy. /// - /// The worker steal count starts at zero when the runtime is created and - /// increases by `N` each time the worker has processed its scheduled queue - /// and successfully steals `N` more pending tasks from another worker. + /// The worker busy duration starts at zero when the runtime is created and + /// increases whenever the worker is spending time processing work. Using + /// this value can indicate the load of the given worker. If a lot of time + /// is spent busy, then the worker is under load and will check for inbound + /// events less often. /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. + /// The timer is monotonically increasing. It is never decremented or reset + /// to zero. /// /// # Arguments /// @@ -282,27 +417,28 @@ impl RuntimeMetrics { /// async fn main() { /// let metrics = Handle::current().metrics(); /// - /// let n = metrics.worker_steal_count(0); - /// println!("worker 0 has stolen {} tasks", n); + /// let n = metrics.worker_total_busy_duration(0); + /// println!("worker 0 was busy for a total of {:?}", n); /// } /// ``` - pub fn worker_steal_count(&self, worker: usize) -> u64 { - self.handle + pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { + let nanos = self + .handle .inner .worker_metrics(worker) - .steal_count - .load(Relaxed) + .busy_duration_total + .load(Relaxed); + Duration::from_nanos(nanos) } - /// Returns the number of times the given worker thread stole tasks from - /// another worker thread. + /// Returns the number of tasks scheduled from **within** the runtime on the + /// given worker's local queue. /// - /// This metric only applies to the **multi-threaded** runtime and will - /// always return `0` when using the current thread runtime. - /// - /// The worker steal count starts at zero when the runtime is created and - /// increases by one each time the worker has processed its scheduled queue - /// and successfully steals more pending tasks from another worker. + /// The local schedule count starts at zero when the runtime is created and + /// increases by one each time a task is woken from **inside** of the + /// runtime on the given worker. This usually means that a task is spawned + /// or notified from within a runtime thread and will be queued on the + /// worker-local queue. /// /// The counter is monotonically increasing. It is never decremented or /// reset to zero. @@ -328,22 +464,27 @@ impl RuntimeMetrics { /// async fn main() { /// let metrics = Handle::current().metrics(); /// - /// let n = metrics.worker_steal_operations(0); - /// println!("worker 0 has stolen tasks {} times", n); + /// let n = metrics.worker_local_schedule_count(0); + /// println!("{} tasks were scheduled on the worker's local queue", n); /// } /// ``` - pub fn worker_steal_operations(&self, worker: usize) -> u64 { + pub fn worker_local_schedule_count(&self, worker: usize) -> u64 { self.handle .inner .worker_metrics(worker) - .steal_operations + .local_schedule_count .load(Relaxed) } - /// Returns the number of tasks the given worker thread has polled. + /// Returns the number of times the given worker thread saturated its local + /// queue. + /// + /// This metric only applies to the **multi-threaded** scheduler. /// - /// The worker poll count starts at zero when the runtime is created and - /// increases by one each time the worker polls a scheduled task. + /// The worker overflow count starts at zero when the runtime is created and + /// increases by one each time the worker attempts to schedule a task + /// locally, but its local queue is full. When this happens, half of the + /// local queue is moved to the injection queue. /// /// The counter is monotonically increasing. It is never decremented or /// reset to zero. @@ -369,40 +510,27 @@ impl RuntimeMetrics { /// async fn main() { /// let metrics = Handle::current().metrics(); /// - /// let n = metrics.worker_poll_count(0); - /// println!("worker 0 has polled {} tasks", n); + /// let n = metrics.worker_overflow_count(0); + /// println!("worker 0 has overflowed its queue {} times", n); /// } /// ``` - pub fn worker_poll_count(&self, worker: usize) -> u64 { + pub fn worker_overflow_count(&self, worker: usize) -> u64 { self.handle .inner .worker_metrics(worker) - .poll_count + .overflow_count .load(Relaxed) } + } - /// Returns the amount of time the given worker thread has been busy. - /// - /// The worker busy duration starts at zero when the runtime is created and - /// increases whenever the worker is spending time processing work. Using - /// this value can indicate the load of the given worker. If a lot of time - /// is spent busy, then the worker is under load and will check for inbound - /// events less often. - /// - /// The timer is monotonically increasing. It is never decremented or reset - /// to zero. + /// Returns the number of tasks currently scheduled in the runtime's + /// injection queue. /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. + /// Tasks that are spawned or notified from a non-runtime thread are + /// scheduled using the runtime's injection queue. This metric returns the + /// **current** number of tasks pending in the injection queue. As such, the + /// returned value may increase or decrease as new tasks are scheduled and + /// processed. /// /// # Examples /// @@ -413,31 +541,22 @@ impl RuntimeMetrics { /// async fn main() { /// let metrics = Handle::current().metrics(); /// - /// let n = metrics.worker_total_busy_duration(0); - /// println!("worker 0 was busy for a total of {:?}", n); + /// let n = metrics.injection_queue_depth(); + /// println!("{} tasks currently pending in the runtime's injection queue", n); /// } /// ``` - pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { - let nanos = self - .handle - .inner - .worker_metrics(worker) - .busy_duration_total - .load(Relaxed); - Duration::from_nanos(nanos) + pub fn injection_queue_depth(&self) -> usize { + self.handle.inner.injection_queue_depth() } - /// Returns the number of tasks scheduled from **within** the runtime on the - /// given worker's local queue. - /// - /// The local schedule count starts at zero when the runtime is created and - /// increases by one each time a task is woken from **inside** of the - /// runtime on the given worker. This usually means that a task is spawned - /// or notified from within a runtime thread and will be queued on the - /// worker-local queue. + /// Returns the number of tasks currently scheduled in the given worker's + /// local queue. /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. + /// Tasks that are spawned or notified from within a runtime thread are + /// scheduled using that worker's local queue. This metric returns the + /// **current** number of tasks pending in the worker's local queue. As + /// such, the returned value may increase or decrease as new tasks are + /// scheduled and processed. /// /// # Arguments /// @@ -460,283 +579,56 @@ impl RuntimeMetrics { /// async fn main() { /// let metrics = Handle::current().metrics(); /// - /// let n = metrics.worker_local_schedule_count(0); - /// println!("{} tasks were scheduled on the worker's local queue", n); + /// let n = metrics.worker_local_queue_depth(0); + /// println!("{} tasks currently pending in worker 0's local queue", n); /// } /// ``` - pub fn worker_local_schedule_count(&self, worker: usize) -> u64 { - self.handle - .inner - .worker_metrics(worker) - .local_schedule_count - .load(Relaxed) + pub fn worker_local_queue_depth(&self, worker: usize) -> usize { + self.handle.inner.worker_local_queue_depth(worker) } - /// Returns the number of times the given worker thread saturated its local - /// queue. - /// - /// This metric only applies to the **multi-threaded** scheduler. - /// - /// The worker overflow count starts at zero when the runtime is created and - /// increases by one each time the worker attempts to schedule a task - /// locally, but its local queue is full. When this happens, half of the - /// local queue is moved to the injection queue. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. + /// Returns `true` if the runtime is tracking the distribution of task poll + /// times. /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. + /// Task poll times are not instrumented by default as doing so requires + /// calling [`Instant::now()`] twice per task poll. The feature is enabled + /// by calling [`enable_metrics_poll_count_histogram()`] when building the + /// runtime. /// /// # Examples /// /// ``` - /// use tokio::runtime::Handle; + /// use tokio::runtime::{self, Handle}; /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_poll_count_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let enabled = metrics.poll_count_histogram_enabled(); /// - /// let n = metrics.worker_overflow_count(0); - /// println!("worker 0 has overflowed its queue {} times", n); + /// println!("Tracking task poll time distribution: {:?}", enabled); + /// }); /// } /// ``` - pub fn worker_overflow_count(&self, worker: usize) -> u64 { + /// + /// [`enable_metrics_poll_count_histogram()`]: crate::runtime::Builder::enable_metrics_poll_count_histogram + /// [`Instant::now()`]: std::time::Instant::now + pub fn poll_count_histogram_enabled(&self) -> bool { self.handle .inner - .worker_metrics(worker) - .overflow_count - .load(Relaxed) + .worker_metrics(0) + .poll_count_histogram + .is_some() } - } - /// Returns the number of tasks currently scheduled in the runtime's - /// injection queue. - /// - /// Tasks that are spawned or notified from a non-runtime thread are - /// scheduled using the runtime's injection queue. This metric returns the - /// **current** number of tasks pending in the injection queue. As such, the - /// returned value may increase or decrease as new tasks are scheduled and - /// processed. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.injection_queue_depth(); - /// println!("{} tasks currently pending in the runtime's injection queue", n); - /// } - /// ``` - pub fn injection_queue_depth(&self) -> usize { - self.handle.inner.injection_queue_depth() - } - - /// Returns the number of tasks currently scheduled in the given worker's - /// local queue. - /// - /// Tasks that are spawned or notified from within a runtime thread are - /// scheduled using that worker's local queue. This metric returns the - /// **current** number of tasks pending in the worker's local queue. As - /// such, the returned value may increase or decrease as new tasks are - /// scheduled and processed. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_local_queue_depth(0); - /// println!("{} tasks currently pending in worker 0's local queue", n); - /// } - /// ``` - pub fn worker_local_queue_depth(&self, worker: usize) -> usize { - self.handle.inner.worker_local_queue_depth(worker) - } - - /// Returns `true` if the runtime is tracking the distribution of task poll - /// times. - /// - /// Task poll times are not instrumented by default as doing so requires - /// calling [`Instant::now()`] twice per task poll. The feature is enabled - /// by calling [`enable_metrics_poll_count_histogram()`] when building the - /// runtime. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::{self, Handle}; - /// - /// fn main() { - /// runtime::Builder::new_current_thread() - /// .enable_metrics_poll_count_histogram() - /// .build() - /// .unwrap() - /// .block_on(async { - /// let metrics = Handle::current().metrics(); - /// let enabled = metrics.poll_count_histogram_enabled(); - /// - /// println!("Tracking task poll time distribution: {:?}", enabled); - /// }); - /// } - /// ``` - /// - /// [`enable_metrics_poll_count_histogram()`]: crate::runtime::Builder::enable_metrics_poll_count_histogram - /// [`Instant::now()`]: std::time::Instant::now - pub fn poll_count_histogram_enabled(&self) -> bool { - self.handle - .inner - .worker_metrics(0) - .poll_count_histogram - .is_some() - } - - /// Returns the number of histogram buckets tracking the distribution of - /// task poll times. - /// - /// This value is configured by calling - /// [`metrics_poll_count_histogram_buckets()`] when building the runtime. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::{self, Handle}; - /// - /// fn main() { - /// runtime::Builder::new_current_thread() - /// .enable_metrics_poll_count_histogram() - /// .build() - /// .unwrap() - /// .block_on(async { - /// let metrics = Handle::current().metrics(); - /// let buckets = metrics.poll_count_histogram_num_buckets(); - /// - /// println!("Histogram buckets: {:?}", buckets); - /// }); - /// } - /// ``` - /// - /// [`metrics_poll_count_histogram_buckets()`]: - /// crate::runtime::Builder::metrics_poll_count_histogram_buckets - pub fn poll_count_histogram_num_buckets(&self) -> usize { - self.handle - .inner - .worker_metrics(0) - .poll_count_histogram - .as_ref() - .map(|histogram| histogram.num_buckets()) - .unwrap_or_default() - } - - /// Returns the range of task poll times tracked by the given bucket. - /// - /// This value is configured by calling - /// [`metrics_poll_count_histogram_resolution()`] when building the runtime. - /// - /// # Panics - /// - /// The method panics if `bucket` represents an invalid bucket index, i.e. - /// is greater than or equal to `poll_count_histogram_num_buckets()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::{self, Handle}; - /// - /// fn main() { - /// runtime::Builder::new_current_thread() - /// .enable_metrics_poll_count_histogram() - /// .build() - /// .unwrap() - /// .block_on(async { - /// let metrics = Handle::current().metrics(); - /// let buckets = metrics.poll_count_histogram_num_buckets(); - /// - /// for i in 0..buckets { - /// let range = metrics.poll_count_histogram_bucket_range(i); - /// println!("Histogram bucket {} range: {:?}", i, range); - /// } - /// }); - /// } - /// ``` - /// - /// [`metrics_poll_count_histogram_resolution()`]: - /// crate::runtime::Builder::metrics_poll_count_histogram_resolution - #[track_caller] - pub fn poll_count_histogram_bucket_range(&self, bucket: usize) -> Range { - self.handle - .inner - .worker_metrics(0) - .poll_count_histogram - .as_ref() - .map(|histogram| { - let range = histogram.bucket_range(bucket); - std::ops::Range { - start: Duration::from_nanos(range.start), - end: Duration::from_nanos(range.end), - } - }) - .unwrap_or_default() - } - - cfg_64bit_metrics! { - /// Returns the number of times the given worker polled tasks with a poll - /// duration within the given bucket's range. - /// - /// Each worker maintains its own histogram and the counts for each bucket - /// starts at zero when the runtime is created. Each time the worker polls a - /// task, it tracks the duration the task poll time took and increments the - /// associated bucket by 1. - /// - /// Each bucket is a monotonically increasing counter. It is never - /// decremented or reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// `bucket` is the index of the bucket being queried. The bucket is scoped - /// to the worker. The range represented by the bucket can be queried by - /// calling [`poll_count_histogram_bucket_range()`]. Each worker maintains - /// identical bucket ranges. + /// Returns the number of histogram buckets tracking the distribution of + /// task poll times. /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()` or if `bucket` represents an - /// invalid bucket. + /// This value is configured by calling + /// [`metrics_poll_count_histogram_buckets()`] when building the runtime. /// /// # Examples /// @@ -752,149 +644,156 @@ impl RuntimeMetrics { /// let metrics = Handle::current().metrics(); /// let buckets = metrics.poll_count_histogram_num_buckets(); /// - /// for worker in 0..metrics.num_workers() { - /// for i in 0..buckets { - /// let count = metrics.poll_count_histogram_bucket_count(worker, i); - /// println!("Poll count {}", count); - /// } - /// } + /// println!("Histogram buckets: {:?}", buckets); /// }); /// } /// ``` /// - /// [`poll_count_histogram_bucket_range()`]: crate::runtime::RuntimeMetrics::poll_count_histogram_bucket_range - #[track_caller] - pub fn poll_count_histogram_bucket_count(&self, worker: usize, bucket: usize) -> u64 { + /// [`metrics_poll_count_histogram_buckets()`]: + /// crate::runtime::Builder::metrics_poll_count_histogram_buckets + pub fn poll_count_histogram_num_buckets(&self) -> usize { self.handle .inner - .worker_metrics(worker) + .worker_metrics(0) .poll_count_histogram .as_ref() - .map(|histogram| histogram.get(bucket)) + .map(|histogram| histogram.num_buckets()) .unwrap_or_default() } - /// Returns the mean duration of task polls, in nanoseconds. + /// Returns the range of task poll times tracked by the given bucket. /// - /// This is an exponentially weighted moving average. Currently, this metric - /// is only provided by the multi-threaded runtime. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. + /// This value is configured by calling + /// [`metrics_poll_count_histogram_resolution()`] when building the runtime. /// /// # Panics /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. + /// The method panics if `bucket` represents an invalid bucket index, i.e. + /// is greater than or equal to `poll_count_histogram_num_buckets()`. /// /// # Examples /// /// ``` - /// use tokio::runtime::Handle; + /// use tokio::runtime::{self, Handle}; /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_poll_count_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.poll_count_histogram_num_buckets(); /// - /// let n = metrics.worker_mean_poll_time(0); - /// println!("worker 0 has a mean poll time of {:?}", n); + /// for i in 0..buckets { + /// let range = metrics.poll_count_histogram_bucket_range(i); + /// println!("Histogram bucket {} range: {:?}", i, range); + /// } + /// }); /// } /// ``` + /// + /// [`metrics_poll_count_histogram_resolution()`]: + /// crate::runtime::Builder::metrics_poll_count_histogram_resolution #[track_caller] - pub fn worker_mean_poll_time(&self, worker: usize) -> Duration { - let nanos = self - .handle + pub fn poll_count_histogram_bucket_range(&self, bucket: usize) -> Range { + self.handle .inner - .worker_metrics(worker) - .mean_poll_time - .load(Relaxed); - Duration::from_nanos(nanos) + .worker_metrics(0) + .poll_count_histogram + .as_ref() + .map(|histogram| { + let range = histogram.bucket_range(bucket); + std::ops::Range { + start: Duration::from_nanos(range.start), + end: Duration::from_nanos(range.end), + } + }) + .unwrap_or_default() } - } - - /// Returns the number of tasks currently scheduled in the blocking - /// thread pool, spawned using `spawn_blocking`. - /// - /// This metric returns the **current** number of tasks pending in - /// blocking thread pool. As such, the returned value may increase - /// or decrease as new tasks are scheduled and processed. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.blocking_queue_depth(); - /// println!("{} tasks currently pending in the blocking thread pool", n); - /// } - /// ``` - pub fn blocking_queue_depth(&self) -> usize { - self.handle.inner.blocking_queue_depth() - } -} -cfg_net! { - impl RuntimeMetrics { cfg_64bit_metrics! { - /// Returns the number of file descriptors that have been registered with the - /// runtime's I/O driver. + /// Returns the number of times the given worker polled tasks with a poll + /// duration within the given bucket's range. /// - /// # Examples + /// Each worker maintains its own histogram and the counts for each bucket + /// starts at zero when the runtime is created. Each time the worker polls a + /// task, it tracks the duration the task poll time took and increments the + /// associated bucket by 1. /// - /// ``` - /// use tokio::runtime::Handle; + /// Each bucket is a monotonically increasing counter. It is never + /// decremented or reset to zero. /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); + /// # Arguments /// - /// let registered_fds = metrics.io_driver_fd_registered_count(); - /// println!("{} fds have been registered with the runtime's I/O driver.", registered_fds); + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. /// - /// let deregistered_fds = metrics.io_driver_fd_deregistered_count(); + /// `bucket` is the index of the bucket being queried. The bucket is scoped + /// to the worker. The range represented by the bucket can be queried by + /// calling [`poll_count_histogram_bucket_range()`]. Each worker maintains + /// identical bucket ranges. /// - /// let current_fd_count = registered_fds - deregistered_fds; - /// println!("{} fds are currently registered by the runtime's I/O driver.", current_fd_count); - /// } - /// ``` - pub fn io_driver_fd_registered_count(&self) -> u64 { - self.with_io_driver_metrics(|m| { - m.fd_registered_count.load(Relaxed) - }) - } - - /// Returns the number of file descriptors that have been deregistered by the - /// runtime's I/O driver. + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()` or if `bucket` represents an + /// invalid bucket. /// /// # Examples /// /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.io_driver_fd_deregistered_count(); - /// println!("{} fds have been deregistered by the runtime's I/O driver.", n); + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_poll_count_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.poll_count_histogram_num_buckets(); + /// + /// for worker in 0..metrics.num_workers() { + /// for i in 0..buckets { + /// let count = metrics.poll_count_histogram_bucket_count(worker, i); + /// println!("Poll count {}", count); + /// } + /// } + /// }); /// } /// ``` - pub fn io_driver_fd_deregistered_count(&self) -> u64 { - self.with_io_driver_metrics(|m| { - m.fd_deregistered_count.load(Relaxed) - }) + /// + /// [`poll_count_histogram_bucket_range()`]: crate::runtime::RuntimeMetrics::poll_count_histogram_bucket_range + #[track_caller] + pub fn poll_count_histogram_bucket_count(&self, worker: usize, bucket: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .poll_count_histogram + .as_ref() + .map(|histogram| histogram.get(bucket)) + .unwrap_or_default() } - /// Returns the number of ready events processed by the runtime's - /// I/O driver. + /// Returns the mean duration of task polls, in nanoseconds. + /// + /// This is an exponentially weighted moving average. Currently, this metric + /// is only provided by the multi-threaded runtime. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. /// /// # Examples /// @@ -905,27 +804,131 @@ cfg_net! { /// async fn main() { /// let metrics = Handle::current().metrics(); /// - /// let n = metrics.io_driver_ready_count(); - /// println!("{} ready events processed by the runtime's I/O driver.", n); + /// let n = metrics.worker_mean_poll_time(0); + /// println!("worker 0 has a mean poll time of {:?}", n); /// } /// ``` - pub fn io_driver_ready_count(&self) -> u64 { - self.with_io_driver_metrics(|m| m.ready_count.load(Relaxed)) + #[track_caller] + pub fn worker_mean_poll_time(&self, worker: usize) -> Duration { + let nanos = self + .handle + .inner + .worker_metrics(worker) + .mean_poll_time + .load(Relaxed); + Duration::from_nanos(nanos) } + } - fn with_io_driver_metrics(&self, f: F) -> u64 - where - F: Fn(&super::IoDriverMetrics) -> u64, - { - // TODO: Investigate if this should return 0, most of our metrics always increase - // thus this breaks that guarantee. - self.handle - .inner - .driver() - .io - .as_ref() - .map(|h| f(&h.metrics)) - .unwrap_or(0) + /// Returns the number of tasks currently scheduled in the blocking + /// thread pool, spawned using `spawn_blocking`. + /// + /// This metric returns the **current** number of tasks pending in + /// blocking thread pool. As such, the returned value may increase + /// or decrease as new tasks are scheduled and processed. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.blocking_queue_depth(); + /// println!("{} tasks currently pending in the blocking thread pool", n); + /// } + /// ``` + pub fn blocking_queue_depth(&self) -> usize { + self.handle.inner.blocking_queue_depth() + } + + cfg_net! { + cfg_64bit_metrics! { + /// Returns the number of file descriptors that have been registered with the + /// runtime's I/O driver. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let registered_fds = metrics.io_driver_fd_registered_count(); + /// println!("{} fds have been registered with the runtime's I/O driver.", registered_fds); + /// + /// let deregistered_fds = metrics.io_driver_fd_deregistered_count(); + /// + /// let current_fd_count = registered_fds - deregistered_fds; + /// println!("{} fds are currently registered by the runtime's I/O driver.", current_fd_count); + /// } + /// ``` + pub fn io_driver_fd_registered_count(&self) -> u64 { + self.with_io_driver_metrics(|m| { + m.fd_registered_count.load(Relaxed) + }) + } + + /// Returns the number of file descriptors that have been deregistered by the + /// runtime's I/O driver. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.io_driver_fd_deregistered_count(); + /// println!("{} fds have been deregistered by the runtime's I/O driver.", n); + /// } + /// ``` + pub fn io_driver_fd_deregistered_count(&self) -> u64 { + self.with_io_driver_metrics(|m| { + m.fd_deregistered_count.load(Relaxed) + }) + } + + /// Returns the number of ready events processed by the runtime's + /// I/O driver. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.io_driver_ready_count(); + /// println!("{} ready events processed by the runtime's I/O driver.", n); + /// } + /// ``` + pub fn io_driver_ready_count(&self) -> u64 { + self.with_io_driver_metrics(|m| m.ready_count.load(Relaxed)) + } + + fn with_io_driver_metrics(&self, f: F) -> u64 + where + F: Fn(&super::IoDriverMetrics) -> u64, + { + // TODO: Investigate if this should return 0, most of our metrics always increase + // thus this breaks that guarantee. + self.handle + .inner + .driver() + .io + .as_ref() + .map(|h| f(&h.metrics)) + .unwrap_or(0) + } } } } diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index fc7c4e6dfe4..a396bf5a391 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -1,8 +1,10 @@ -use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::atomic::Ordering::Relaxed; use crate::runtime::metrics::Histogram; use crate::runtime::Config; use crate::util::metric_atomics::MetricAtomicU64; +// This is NOT the Loom atomic. To avoid an unnecessary state explosion in loom, +// all metrics use regular atomics. +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; /// Retrieve runtime worker metrics. /// diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 3d333960f3d..3fcde75b54e 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -388,21 +388,18 @@ cfg_rt! { mod thread_id; pub(crate) use thread_id::ThreadId; - cfg_metrics! { - mod metrics; - pub use metrics::{RuntimeMetrics, HistogramScale}; + pub(crate) mod metrics; + pub use metrics::RuntimeMetrics; - pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics, HistogramBuilder}; + cfg_unstable_metrics! { + pub use metrics::HistogramScale; cfg_net! { - pub(crate) use metrics::IoDriverMetrics; + pub(crate) use metrics::IoDriverMetrics; } } - cfg_not_metrics! { - pub(crate) mod metrics; - pub(crate) use metrics::{SchedulerMetrics, WorkerMetrics, MetricsBatch, HistogramBuilder}; - } + pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics, HistogramBuilder}; /// After thread starts / before thread stops type Callback = std::sync::Arc; diff --git a/tokio/src/runtime/runtime.rs b/tokio/src/runtime/runtime.rs index 7cf2cebeffc..d904af50458 100644 --- a/tokio/src/runtime/runtime.rs +++ b/tokio/src/runtime/runtime.rs @@ -455,6 +455,12 @@ impl Runtime { pub fn shutdown_background(self) { self.shutdown_timeout(Duration::from_nanos(0)); } + + /// Returns a view that lets you get information about how the runtime + /// is performing. + pub fn metrics(&self) -> crate::runtime::RuntimeMetrics { + self.handle.metrics() + } } #[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let @@ -486,13 +492,3 @@ impl Drop for Runtime { impl std::panic::UnwindSafe for Runtime {} impl std::panic::RefUnwindSafe for Runtime {} - -cfg_metrics! { - impl Runtime { - /// Returns a view that lets you get information about how the runtime - /// is performing. - pub fn metrics(&self) -> crate::runtime::RuntimeMetrics { - self.handle.metrics() - } - } -} diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 36bcefc4406..b9c23837a58 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -502,7 +502,7 @@ impl Handle { } } -cfg_metrics! { +cfg_unstable_metrics! { impl Handle { pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { &self.shared.scheduler_metrics diff --git a/tokio/src/runtime/scheduler/inject.rs b/tokio/src/runtime/scheduler/inject.rs index 39976fcd7a2..811b02c136c 100644 --- a/tokio/src/runtime/scheduler/inject.rs +++ b/tokio/src/runtime/scheduler/inject.rs @@ -16,7 +16,7 @@ cfg_rt_multi_thread! { mod rt_multi_thread; } -cfg_metrics! { +cfg_unstable_metrics! { mod metrics; } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 04fbff39e47..3cbba11b752 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -163,20 +163,22 @@ cfg_rt! { } } - cfg_metrics! { + impl Handle { + pub(crate) fn num_workers(&self) -> usize { + match self { + Handle::CurrentThread(_) => 1, + #[cfg(feature = "rt-multi-thread")] + Handle::MultiThread(handle) => handle.num_workers(), + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] + Handle::MultiThreadAlt(handle) => handle.num_workers(), + } + } + } + + cfg_unstable_metrics! { use crate::runtime::{SchedulerMetrics, WorkerMetrics}; impl Handle { - pub(crate) fn num_workers(&self) -> usize { - match self { - Handle::CurrentThread(_) => 1, - #[cfg(feature = "rt-multi-thread")] - Handle::MultiThread(handle) => handle.num_workers(), - #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] - Handle::MultiThreadAlt(handle) => handle.num_workers(), - } - } - pub(crate) fn num_blocking_threads(&self) -> usize { match_flavor!(self, Handle(handle) => handle.num_blocking_threads()) } diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index 568eb80af8b..72f776e47fa 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -9,9 +9,7 @@ use crate::util::RngSeedGenerator; use std::fmt; -cfg_metrics! { - mod metrics; -} +mod metrics; cfg_taskdump! { mod taskdump; diff --git a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs index 838694fc89e..6ced245ee5b 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs @@ -1,41 +1,48 @@ use super::Handle; -use crate::runtime::{SchedulerMetrics, WorkerMetrics}; +cfg_unstable_metrics! { + use crate::runtime::{SchedulerMetrics, WorkerMetrics}; +} impl Handle { pub(crate) fn num_workers(&self) -> usize { self.shared.worker_metrics.len() } - pub(crate) fn num_blocking_threads(&self) -> usize { - self.blocking_spawner.num_threads() - } + cfg_unstable_metrics! { + pub(crate) fn num_blocking_threads(&self) -> usize { + // workers are currently spawned using spawn_blocking + self.blocking_spawner + .num_threads() + .saturating_sub(self.num_workers()) + } - pub(crate) fn num_idle_blocking_threads(&self) -> usize { - self.blocking_spawner.num_idle_threads() - } + pub(crate) fn num_idle_blocking_threads(&self) -> usize { + self.blocking_spawner.num_idle_threads() + } - pub(crate) fn active_tasks_count(&self) -> usize { - self.shared.owned.active_tasks_count() - } + pub(crate) fn active_tasks_count(&self) -> usize { + self.shared.owned.active_tasks_count() + } - pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { - &self.shared.scheduler_metrics - } + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { + &self.shared.scheduler_metrics + } - pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { - &self.shared.worker_metrics[worker] - } + pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { + &self.shared.worker_metrics[worker] + } - pub(crate) fn injection_queue_depth(&self) -> usize { - self.shared.injection_queue_depth() - } + pub(crate) fn injection_queue_depth(&self) -> usize { + self.shared.injection_queue_depth() + } - pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { - self.shared.worker_local_queue_depth(worker) - } + pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { + self.shared.worker_local_queue_depth(worker) + } - pub(crate) fn blocking_queue_depth(&self) -> usize { - self.blocking_spawner.queue_depth() + pub(crate) fn blocking_queue_depth(&self) -> usize { + self.blocking_spawner.queue_depth() + } } } diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index 35223289870..99ee31ba15b 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -546,7 +546,7 @@ impl Steal { } } -cfg_metrics! { +cfg_unstable_metrics! { impl Steal { pub(crate) fn len(&self) -> usize { self.0.len() as _ diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 4d41698ce44..8ef487b09fd 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -74,7 +74,7 @@ use std::cell::RefCell; use std::task::Waker; use std::time::Duration; -cfg_metrics! { +cfg_unstable_metrics! { mod metrics; } diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/handle.rs b/tokio/src/runtime/scheduler/multi_thread_alt/handle.rs index d746bca1a18..1f5b7818521 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/handle.rs @@ -9,7 +9,7 @@ use crate::util::RngSeedGenerator; use std::fmt; -cfg_metrics! { +cfg_unstable_metrics! { mod metrics; } diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/handle/metrics.rs b/tokio/src/runtime/scheduler/multi_thread_alt/handle/metrics.rs index 838694fc89e..3d614b478c5 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/handle/metrics.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/handle/metrics.rs @@ -8,7 +8,10 @@ impl Handle { } pub(crate) fn num_blocking_threads(&self) -> usize { - self.blocking_spawner.num_threads() + // workers are currently spawned using spawn_blocking + self.blocking_spawner + .num_threads() + .saturating_sub(self.num_workers()) } pub(crate) fn num_idle_blocking_threads(&self) -> usize { diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/queue.rs b/tokio/src/runtime/scheduler/multi_thread_alt/queue.rs index 2694d27cbdf..c8293fdc845 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/queue.rs @@ -538,7 +538,7 @@ impl Steal { } } -cfg_metrics! { +cfg_unstable_metrics! { impl Steal { pub(crate) fn len(&self) -> usize { self.0.len() as _ diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index 63ae0a49743..9ceb7815a53 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -74,7 +74,7 @@ use std::cmp; use std::task::Waker; use std::time::Duration; -cfg_metrics! { +cfg_unstable_metrics! { mod metrics; } diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 55429b1b11b..f463355f0d3 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -40,7 +40,7 @@ fn fits_256_one_at_a_time() { local.push_back_or_overflow(task, &inject, &mut stats); } - cfg_metrics! { + cfg_unstable_metrics! { assert_metrics!(stats, overflow_count == 0); } @@ -98,7 +98,7 @@ fn overflow() { local.push_back_or_overflow(task, &inject, &mut stats); } - cfg_metrics! { + cfg_unstable_metrics! { assert_metrics!(stats, overflow_count == 1); } @@ -128,7 +128,7 @@ fn steal_batch() { assert!(steal1.steal_into(&mut local2, &mut stats).is_some()); - cfg_metrics! { + cfg_unstable_metrics! { assert_metrics!(stats, steal_count == 2); } @@ -184,7 +184,7 @@ fn stress1() { thread::yield_now(); } - cfg_metrics! { + cfg_unstable_metrics! { assert_metrics!(stats, steal_count == n as _); } diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 879b89b4069..5d344f70411 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -223,7 +223,9 @@ struct Waiter { /// `Notify`, or it is exclusively owned by the enclosing `Waiter`. waker: UnsafeCell>, - /// Notification for this waiter. + /// Notification for this waiter. Uses 2 bits to store if and how was + /// notified, 1 bit for storing if it was woken up using FIFO or LIFO, and + /// the rest of it is unused. /// * if it's `None`, then `waker` is protected by the `waiters` lock. /// * if it's `Some`, then `waker` is exclusively owned by the /// enclosing `Waiter` and can be accessed without locking. @@ -253,13 +255,16 @@ generate_addr_of_methods! { } // No notification. -const NOTIFICATION_NONE: usize = 0; +const NOTIFICATION_NONE: usize = 0b000; // Notification type used by `notify_one`. -const NOTIFICATION_ONE: usize = 1; +const NOTIFICATION_ONE: usize = 0b001; + +// Notification type used by `notify_last`. +const NOTIFICATION_LAST: usize = 0b101; // Notification type used by `notify_waiters`. -const NOTIFICATION_ALL: usize = 2; +const NOTIFICATION_ALL: usize = 0b010; /// Notification for a `Waiter`. /// This struct is equivalent to `Option`, but uses @@ -275,13 +280,20 @@ impl AtomicNotification { /// Store-release a notification. /// This method should be called exactly once. fn store_release(&self, notification: Notification) { - self.0.store(notification as usize, Release); + let data: usize = match notification { + Notification::All => NOTIFICATION_ALL, + Notification::One(NotifyOneStrategy::Fifo) => NOTIFICATION_ONE, + Notification::One(NotifyOneStrategy::Lifo) => NOTIFICATION_LAST, + }; + self.0.store(data, Release); } fn load(&self, ordering: Ordering) -> Option { - match self.0.load(ordering) { + let data = self.0.load(ordering); + match data { NOTIFICATION_NONE => None, - NOTIFICATION_ONE => Some(Notification::One), + NOTIFICATION_ONE => Some(Notification::One(NotifyOneStrategy::Fifo)), + NOTIFICATION_LAST => Some(Notification::One(NotifyOneStrategy::Lifo)), NOTIFICATION_ALL => Some(Notification::All), _ => unreachable!(), } @@ -296,11 +308,18 @@ impl AtomicNotification { } } +#[derive(Debug, PartialEq, Eq)] +#[repr(usize)] +enum NotifyOneStrategy { + Fifo, + Lifo, +} + #[derive(Debug, PartialEq, Eq)] #[repr(usize)] enum Notification { - One = NOTIFICATION_ONE, - All = NOTIFICATION_ALL, + One(NotifyOneStrategy), + All, } /// List used in `Notify::notify_waiters`. It wraps a guarded linked list @@ -521,7 +540,7 @@ impl Notify { } } - /// Notifies a waiting task. + /// Notifies the first waiting task. /// /// If a task is currently waiting, that task is notified. Otherwise, a /// permit is stored in this `Notify` value and the **next** call to @@ -558,6 +577,23 @@ impl Notify { // Alias for old name in 0.x #[cfg_attr(docsrs, doc(alias = "notify"))] pub fn notify_one(&self) { + self.notify_with_strategy(NotifyOneStrategy::Fifo); + } + + /// Notifies the last waiting task. + /// + /// This function behaves similar to `notify_one`. The only difference is that it wakes + /// the most recently added waiter instead of the oldest waiter. + /// + /// Check the [`notify_one()`] documentation for more info and + /// examples. + /// + /// [`notify_one()`]: Notify::notify_one + pub fn notify_last(&self) { + self.notify_with_strategy(NotifyOneStrategy::Lifo); + } + + fn notify_with_strategy(&self, strategy: NotifyOneStrategy) { // Load the current state let mut curr = self.state.load(SeqCst); @@ -585,7 +621,7 @@ impl Notify { // transition out of WAITING while the lock is held. curr = self.state.load(SeqCst); - if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) { + if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) { drop(waiters); waker.wake(); } @@ -708,7 +744,12 @@ impl Default for Notify { impl UnwindSafe for Notify {} impl RefUnwindSafe for Notify {} -fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option { +fn notify_locked( + waiters: &mut WaitList, + state: &AtomicUsize, + curr: usize, + strategy: NotifyOneStrategy, +) -> Option { match get_state(curr) { EMPTY | NOTIFIED => { let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst); @@ -728,8 +769,11 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op // concurrently change as holding the lock is required to // transition **out** of `WAITING`. // - // Get a pending waiter - let waiter = waiters.pop_back().unwrap(); + // Get a pending waiter using one of the available dequeue strategies. + let waiter = match strategy { + NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(), + NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(), + }; // Safety: we never make mutable references to waiters. let waiter = unsafe { waiter.as_ref() }; @@ -738,7 +782,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }; // This waiter is unlinked and will not be shared ever again, release it. - waiter.notification.store_release(Notification::One); + waiter + .notification + .store_release(Notification::One(strategy)); if waiters.is_empty() { // As this the **final** waiter in the list, the state @@ -1137,8 +1183,10 @@ impl Drop for Notified<'_> { // See if the node was notified but not received. In this case, if // the notification was triggered via `notify_one`, it must be sent // to the next waiter. - if notification == Some(Notification::One) { - if let Some(waker) = notify_locked(&mut waiters, ¬ify.state, notify_state) { + if let Some(Notification::One(strategy)) = notification { + if let Some(waker) = + notify_locked(&mut waiters, ¬ify.state, notify_state, strategy) + { drop(waiters); waker.wake(); } diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index ab20292e21d..0274849b0c6 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -137,6 +137,26 @@ impl LinkedList { } } + /// Removes the first element from a list and returns it, or None if it is + /// empty. + pub(crate) fn pop_front(&mut self) -> Option { + unsafe { + let head = self.head?; + self.head = L::pointers(head).as_ref().get_next(); + + if let Some(new_head) = L::pointers(head).as_ref().get_next() { + L::pointers(new_head).as_mut().set_prev(None); + } else { + self.tail = None; + } + + L::pointers(head).as_mut().set_prev(None); + L::pointers(head).as_mut().set_next(None); + + Some(L::from_raw(head)) + } + } + /// Removes the last element from a list and returns it, or None if it is /// empty. pub(crate) fn pop_back(&mut self) -> Option { diff --git a/tokio/tests/io_async_fd.rs b/tokio/tests/io_async_fd.rs index ea798b3067a..4e6f630eb93 100644 --- a/tokio/tests/io_async_fd.rs +++ b/tokio/tests/io_async_fd.rs @@ -1,7 +1,7 @@ #![warn(rust_2018_idioms)] #![cfg(all(unix, feature = "full"))] -use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd}; +use std::os::unix::io::{AsRawFd, RawFd}; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -13,7 +13,7 @@ use std::{ task::{Context, Waker}, }; -use nix::unistd::{close, read, write}; +use nix::unistd::{read, write}; use futures::poll; @@ -58,18 +58,18 @@ impl TestWaker { #[derive(Debug)] struct FileDescriptor { - fd: RawFd, + fd: std::os::fd::OwnedFd, } impl AsRawFd for FileDescriptor { fn as_raw_fd(&self) -> RawFd { - self.fd + self.fd.as_raw_fd() } } impl Read for &FileDescriptor { fn read(&mut self, buf: &mut [u8]) -> io::Result { - read(self.fd, buf).map_err(io::Error::from) + read(self.fd.as_raw_fd(), buf).map_err(io::Error::from) } } @@ -81,7 +81,7 @@ impl Read for FileDescriptor { impl Write for &FileDescriptor { fn write(&mut self, buf: &[u8]) -> io::Result { - write(self.fd, buf).map_err(io::Error::from) + write(&self.fd, buf).map_err(io::Error::from) } fn flush(&mut self) -> io::Result<()> { @@ -99,12 +99,6 @@ impl Write for FileDescriptor { } } -impl Drop for FileDescriptor { - fn drop(&mut self) { - let _ = close(self.fd); - } -} - fn set_nonblocking(fd: RawFd) { use nix::fcntl::{OFlag, F_GETFL, F_SETFL}; @@ -133,17 +127,10 @@ fn socketpair() -> (FileDescriptor, FileDescriptor) { SockFlag::empty(), ) .expect("socketpair"); - let fds = ( - FileDescriptor { - fd: fd_a.into_raw_fd(), - }, - FileDescriptor { - fd: fd_b.into_raw_fd(), - }, - ); + let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b }); - set_nonblocking(fds.0.fd); - set_nonblocking(fds.1.fd); + set_nonblocking(fds.0.fd.as_raw_fd()); + set_nonblocking(fds.1.fd.as_raw_fd()); fds } diff --git a/tokio/tests/io_poll_aio.rs b/tokio/tests/io_poll_aio.rs index e83859f5c98..242887eb60f 100644 --- a/tokio/tests/io_poll_aio.rs +++ b/tokio/tests/io_poll_aio.rs @@ -5,6 +5,7 @@ use mio_aio::{AioFsyncMode, SourceApi}; use std::{ future::Future, io, mem, + os::fd::AsFd, os::unix::io::{AsRawFd, RawFd}, pin::{pin, Pin}, task::{Context, Poll}, @@ -17,9 +18,9 @@ mod aio { use super::*; #[derive(Debug)] - struct TokioSource(mio_aio::Source); + struct TokioSource<'fd>(mio_aio::Source>); - impl AioSource for TokioSource { + impl<'fd> AioSource for TokioSource<'fd> { fn register(&mut self, kq: RawFd, token: usize) { self.0.register_raw(kq, token) } @@ -29,9 +30,9 @@ mod aio { } /// A very crude implementation of an AIO-based future - struct FsyncFut(Aio); + struct FsyncFut<'fd>(Aio>); - impl FsyncFut { + impl<'fd> FsyncFut<'fd> { pub fn submit(self: Pin<&mut Self>) -> io::Result<()> { let p = unsafe { self.map_unchecked_mut(|s| &mut s.0 .0) }; match p.submit() { @@ -41,7 +42,7 @@ mod aio { } } - impl Future for FsyncFut { + impl<'fd> Future for FsyncFut<'fd> { type Output = io::Result<()>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -134,7 +135,7 @@ mod aio { #[tokio::test] async fn fsync() { let f = tempfile().unwrap(); - let fd = f.as_raw_fd(); + let fd = f.as_fd(); let mode = AioFsyncMode::O_SYNC; let source = TokioSource(mio_aio::Fsync::fsync(fd, mode, 0)); let poll_aio = Aio::new_for_aio(source).unwrap(); diff --git a/tokio/tests/net_unix_pipe.rs b/tokio/tests/net_unix_pipe.rs index 6706880ed1b..37b8b41bd31 100644 --- a/tokio/tests/net_unix_pipe.rs +++ b/tokio/tests/net_unix_pipe.rs @@ -489,12 +489,10 @@ async fn anon_pipe_spawn_echo() -> std::io::Result<()> { #[cfg(target_os = "linux")] async fn anon_pipe_from_owned_fd() -> std::io::Result<()> { use nix::fcntl::OFlag; - use std::os::unix::io::{FromRawFd, OwnedFd}; const DATA: &[u8] = b"this is some data to write to the pipe"; - let fds = nix::unistd::pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK)?; - let (rx_fd, tx_fd) = unsafe { (OwnedFd::from_raw_fd(fds.0), OwnedFd::from_raw_fd(fds.1)) }; + let (rx_fd, tx_fd) = nix::unistd::pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK)?; let mut rx = pipe::Receiver::from_owned_fd(rx_fd)?; let mut tx = pipe::Sender::from_owned_fd(tx_fd)?; diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index 6a710a46ce6..2446deb6b41 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -31,6 +31,11 @@ fn num_blocking_threads() { assert_eq!(0, rt.metrics().num_blocking_threads()); let _ = rt.block_on(rt.spawn_blocking(move || {})); assert_eq!(1, rt.metrics().num_blocking_threads()); + + let rt = threaded(); + assert_eq!(0, rt.metrics().num_blocking_threads()); + let _ = rt.block_on(rt.spawn_blocking(move || {})); + assert_eq!(1, rt.metrics().num_blocking_threads()); } #[test] diff --git a/tokio/tests/sync_notify.rs b/tokio/tests/sync_notify.rs index 01b8ce86537..13b3f921e98 100644 --- a/tokio/tests/sync_notify.rs +++ b/tokio/tests/sync_notify.rs @@ -21,6 +21,38 @@ fn notify_notified_one() { assert_ready!(notified.poll()); } +#[test] +fn notify_multi_notified_one() { + let notify = Notify::new(); + let mut notified1 = spawn(async { notify.notified().await }); + let mut notified2 = spawn(async { notify.notified().await }); + + // add two waiters into the queue + assert_pending!(notified1.poll()); + assert_pending!(notified2.poll()); + + // should wakeup the first one + notify.notify_one(); + assert_ready!(notified1.poll()); + assert_pending!(notified2.poll()); +} + +#[test] +fn notify_multi_notified_last() { + let notify = Notify::new(); + let mut notified1 = spawn(async { notify.notified().await }); + let mut notified2 = spawn(async { notify.notified().await }); + + // add two waiters into the queue + assert_pending!(notified1.poll()); + assert_pending!(notified2.poll()); + + // should wakeup the last one + notify.notify_last(); + assert_pending!(notified1.poll()); + assert_ready!(notified2.poll()); +} + #[test] fn notified_one_notify() { let notify = Notify::new(); @@ -105,6 +137,49 @@ fn notified_multi_notify_drop_one() { assert_ready!(notified2.poll()); } +#[test] +fn notified_multi_notify_one_drop() { + let notify = Notify::new(); + let mut notified1 = spawn(async { notify.notified().await }); + let mut notified2 = spawn(async { notify.notified().await }); + let mut notified3 = spawn(async { notify.notified().await }); + + // add waiters by order of poll execution + assert_pending!(notified1.poll()); + assert_pending!(notified2.poll()); + assert_pending!(notified3.poll()); + + // by default fifo + notify.notify_one(); + + drop(notified1); + + // next waiter should be the one to be to woken up + assert_ready!(notified2.poll()); + assert_pending!(notified3.poll()); +} + +#[test] +fn notified_multi_notify_last_drop() { + let notify = Notify::new(); + let mut notified1 = spawn(async { notify.notified().await }); + let mut notified2 = spawn(async { notify.notified().await }); + let mut notified3 = spawn(async { notify.notified().await }); + + // add waiters by order of poll execution + assert_pending!(notified1.poll()); + assert_pending!(notified2.poll()); + assert_pending!(notified3.poll()); + + notify.notify_last(); + + drop(notified3); + + // latest waiter added should be the one to woken up + assert_ready!(notified2.poll()); + assert_pending!(notified1.poll()); +} + #[test] fn notify_in_drop_after_wake() { use futures::task::ArcWake;