-
-
Notifications
You must be signed in to change notification settings - Fork 2.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stabilize worker_total_busy_duration #6899
base: master
Are you sure you want to change the base?
Changes from all commits
8f1fcb4
9b47cf9
e2f4f33
b6974ba
86f019f
64f626d
489003c
8a134a2
4bc00cf
7eb6b97
7239af5
57f6b9b
c8b2c7d
d7333cf
14543de
245d75c
e0d0b84
b821f92
64f029a
73dfd89
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,125 +1,189 @@ | ||
use crate::runtime::metrics::{HistogramBatch, WorkerMetrics}; | ||
use crate::runtime::metrics::WorkerMetrics; | ||
|
||
cfg_unstable_metrics! { | ||
use crate::runtime::metrics::HistogramBatch; | ||
} | ||
|
||
use std::sync::atomic::Ordering::Relaxed; | ||
use std::time::{Duration, Instant}; | ||
|
||
pub(crate) struct MetricsBatch { | ||
/// The total busy duration in nanoseconds. | ||
busy_duration_total: u64, | ||
|
||
/// Instant at which work last resumed (continued after park). | ||
processing_scheduled_tasks_started_at: Instant, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// Number of times the worker parked. | ||
park_count: u64, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// Number of times the worker parked and unparked. | ||
park_unpark_count: u64, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// Number of times the worker woke w/o doing work. | ||
noop_count: u64, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// Number of tasks stolen. | ||
steal_count: u64, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// Number of times tasks where stolen. | ||
steal_operations: u64, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// Number of tasks that were polled by the worker. | ||
poll_count: u64, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// Number of tasks polled when the worker entered park. This is used to | ||
/// track the noop count. | ||
poll_count_on_last_park: u64, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// Number of tasks that were scheduled locally on this worker. | ||
local_schedule_count: u64, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// Number of tasks moved to the global queue to make space in the local | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. consider reorganizing to put the stable fields on the top to make it clearer |
||
/// queue | ||
overflow_count: u64, | ||
|
||
/// The total busy duration in nanoseconds. | ||
busy_duration_total: u64, | ||
|
||
/// Instant at which work last resumed (continued after park). | ||
processing_scheduled_tasks_started_at: Instant, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// If `Some`, tracks poll times in nanoseconds | ||
poll_timer: Option<PollTimer>, | ||
} | ||
|
||
struct PollTimer { | ||
/// Histogram of poll counts within each band. | ||
poll_counts: HistogramBatch, | ||
cfg_unstable_metrics! { | ||
struct PollTimer { | ||
/// Histogram of poll counts within each band. | ||
poll_counts: HistogramBatch, | ||
|
||
/// Instant when the most recent task started polling. | ||
poll_started_at: Instant, | ||
/// Instant when the most recent task started polling. | ||
poll_started_at: Instant, | ||
} | ||
} | ||
|
||
impl MetricsBatch { | ||
pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch { | ||
let now = Instant::now(); | ||
Self::new_unstable(worker_metrics, now) | ||
} | ||
|
||
MetricsBatch { | ||
park_count: 0, | ||
park_unpark_count: 0, | ||
noop_count: 0, | ||
steal_count: 0, | ||
steal_operations: 0, | ||
poll_count: 0, | ||
poll_count_on_last_park: 0, | ||
local_schedule_count: 0, | ||
overflow_count: 0, | ||
busy_duration_total: 0, | ||
processing_scheduled_tasks_started_at: now, | ||
poll_timer: worker_metrics | ||
.poll_count_histogram | ||
.as_ref() | ||
.map(|worker_poll_counts| PollTimer { | ||
poll_counts: HistogramBatch::from_histogram(worker_poll_counts), | ||
poll_started_at: now, | ||
}), | ||
cfg_metrics_variant! { | ||
stable: { | ||
#[inline(always)] | ||
fn new_unstable(_worker_metrics: &WorkerMetrics, now: Instant) -> MetricsBatch { | ||
MetricsBatch { | ||
busy_duration_total: 0, | ||
processing_scheduled_tasks_started_at: now, | ||
} | ||
} | ||
}, | ||
unstable: { | ||
#[inline(always)] | ||
fn new_unstable(worker_metrics: &WorkerMetrics, now: Instant) -> MetricsBatch { | ||
MetricsBatch { | ||
park_count: 0, | ||
park_unpark_count: 0, | ||
noop_count: 0, | ||
steal_count: 0, | ||
steal_operations: 0, | ||
poll_count: 0, | ||
poll_count_on_last_park: 0, | ||
local_schedule_count: 0, | ||
overflow_count: 0, | ||
busy_duration_total: 0, | ||
processing_scheduled_tasks_started_at: now, | ||
poll_timer: worker_metrics.poll_count_histogram.as_ref().map( | ||
|worker_poll_counts| PollTimer { | ||
poll_counts: HistogramBatch::from_histogram(worker_poll_counts), | ||
poll_started_at: now, | ||
}, | ||
), | ||
} | ||
} | ||
} | ||
} | ||
|
||
pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) { | ||
worker.mean_poll_time.store(mean_poll_time, Relaxed); | ||
worker.park_count.store(self.park_count, Relaxed); | ||
worker | ||
.park_unpark_count | ||
.store(self.park_unpark_count, Relaxed); | ||
worker.noop_count.store(self.noop_count, Relaxed); | ||
worker.steal_count.store(self.steal_count, Relaxed); | ||
worker | ||
.steal_operations | ||
.store(self.steal_operations, Relaxed); | ||
worker.poll_count.store(self.poll_count, Relaxed); | ||
|
||
worker | ||
.busy_duration_total | ||
.store(self.busy_duration_total, Relaxed); | ||
Comment on lines
113
to
115
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's move the stablized items up to the top of the function. |
||
|
||
worker | ||
.local_schedule_count | ||
.store(self.local_schedule_count, Relaxed); | ||
worker.overflow_count.store(self.overflow_count, Relaxed); | ||
self.submit_unstable(worker, mean_poll_time); | ||
} | ||
|
||
if let Some(poll_timer) = &self.poll_timer { | ||
let dst = worker.poll_count_histogram.as_ref().unwrap(); | ||
poll_timer.poll_counts.submit(dst); | ||
cfg_metrics_variant! { | ||
stable: { | ||
#[inline(always)] | ||
fn submit_unstable(&mut self, _worker: &WorkerMetrics, _mean_poll_time: u64) {} | ||
}, | ||
unstable: { | ||
#[inline(always)] | ||
fn submit_unstable(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) { | ||
worker.mean_poll_time.store(mean_poll_time, Relaxed); | ||
worker.park_count.store(self.park_count, Relaxed); | ||
worker | ||
.park_unpark_count | ||
.store(self.park_unpark_count, Relaxed); | ||
worker.noop_count.store(self.noop_count, Relaxed); | ||
worker.steal_count.store(self.steal_count, Relaxed); | ||
worker | ||
.steal_operations | ||
.store(self.steal_operations, Relaxed); | ||
worker.poll_count.store(self.poll_count, Relaxed); | ||
|
||
worker | ||
.local_schedule_count | ||
.store(self.local_schedule_count, Relaxed); | ||
worker.overflow_count.store(self.overflow_count, Relaxed); | ||
|
||
if let Some(poll_timer) = &self.poll_timer { | ||
let dst = worker.poll_count_histogram.as_ref().unwrap(); | ||
poll_timer.poll_counts.submit(dst); | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// The worker is about to park. | ||
pub(crate) fn about_to_park(&mut self) { | ||
self.park_count += 1; | ||
self.park_unpark_count += 1; | ||
|
||
if self.poll_count_on_last_park == self.poll_count { | ||
self.noop_count += 1; | ||
} else { | ||
self.poll_count_on_last_park = self.poll_count; | ||
cfg_metrics_variant! { | ||
stable: { | ||
/// The worker is about to park. | ||
pub(crate) fn about_to_park(&mut self) {} | ||
}, | ||
unstable: { | ||
/// The worker is about to park. | ||
pub(crate) fn about_to_park(&mut self) { | ||
#[cfg(tokio_unstable)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't need a double config here |
||
{ | ||
self.park_count += 1; | ||
self.park_unpark_count += 1; | ||
|
||
if self.poll_count_on_last_park == self.poll_count { | ||
self.noop_count += 1; | ||
} else { | ||
self.poll_count_on_last_park = self.poll_count; | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// The worker was unparked. | ||
pub(crate) fn unparked(&mut self) { | ||
self.park_unpark_count += 1; | ||
cfg_metrics_variant! { | ||
stable: { | ||
/// The worker was unparked. | ||
pub(crate) fn unparked(&mut self) {} | ||
}, | ||
unstable: { | ||
/// The worker was unparked. | ||
pub(crate) fn unparked(&mut self) { | ||
self.park_unpark_count += 1; | ||
} | ||
} | ||
} | ||
|
||
/// Start processing a batch of tasks | ||
|
@@ -133,40 +197,84 @@ impl MetricsBatch { | |
self.busy_duration_total += duration_as_u64(busy_duration); | ||
} | ||
|
||
/// Start polling an individual task | ||
pub(crate) fn start_poll(&mut self) { | ||
self.poll_count += 1; | ||
|
||
if let Some(poll_timer) = &mut self.poll_timer { | ||
poll_timer.poll_started_at = Instant::now(); | ||
cfg_metrics_variant! { | ||
stable: { | ||
/// Start polling an individual task | ||
pub(crate) fn start_poll(&mut self) {} | ||
}, | ||
unstable: { | ||
/// Start polling an individual task | ||
pub(crate) fn start_poll(&mut self) { | ||
self.poll_count += 1; | ||
if let Some(poll_timer) = &mut self.poll_timer { | ||
poll_timer.poll_started_at = Instant::now(); | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// Stop polling an individual task | ||
pub(crate) fn end_poll(&mut self) { | ||
if let Some(poll_timer) = &mut self.poll_timer { | ||
let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed()); | ||
poll_timer.poll_counts.measure(elapsed, 1); | ||
cfg_metrics_variant! { | ||
stable: { | ||
/// Stop polling an individual task | ||
pub(crate) fn end_poll(&mut self) {} | ||
}, | ||
unstable: { | ||
/// Stop polling an individual task | ||
pub(crate) fn end_poll(&mut self) { | ||
#[cfg(tokio_unstable)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. another double cfg |
||
if let Some(poll_timer) = &mut self.poll_timer { | ||
let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed()); | ||
poll_timer.poll_counts.measure(elapsed, 1); | ||
} | ||
} | ||
} | ||
} | ||
|
||
pub(crate) fn inc_local_schedule_count(&mut self) { | ||
self.local_schedule_count += 1; | ||
cfg_metrics_variant! { | ||
stable: { | ||
pub(crate) fn inc_local_schedule_count(&mut self) {} | ||
}, | ||
unstable: { | ||
pub(crate) fn inc_local_schedule_count(&mut self) { | ||
self.local_schedule_count += 1; | ||
} | ||
} | ||
} | ||
} | ||
|
||
cfg_rt_multi_thread! { | ||
impl MetricsBatch { | ||
pub(crate) fn incr_steal_count(&mut self, by: u16) { | ||
self.steal_count += by as u64; | ||
cfg_metrics_variant! { | ||
stable: { | ||
pub(crate) fn incr_steal_count(&mut self, _by: u16) {} | ||
}, | ||
unstable: { | ||
pub(crate) fn incr_steal_count(&mut self, by: u16) { | ||
self.steal_count += by as u64; | ||
} | ||
} | ||
} | ||
|
||
pub(crate) fn incr_steal_operations(&mut self) { | ||
self.steal_operations += 1; | ||
cfg_metrics_variant! { | ||
stable: { | ||
pub(crate) fn incr_steal_operations(&mut self) {} | ||
}, | ||
unstable: { | ||
pub(crate) fn incr_steal_operations(&mut self) { | ||
self.steal_operations += 1; | ||
} | ||
} | ||
} | ||
|
||
pub(crate) fn incr_overflow_count(&mut self) { | ||
self.overflow_count += 1; | ||
cfg_metrics_variant! { | ||
stable: { | ||
pub(crate) fn incr_overflow_count(&mut self) {} | ||
}, | ||
unstable: { | ||
pub(crate) fn incr_overflow_count(&mut self) { | ||
self.overflow_count += 1; | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a doc comment here