diff --git a/datafusion/src/physical_plan/coalesce_partitions.rs b/datafusion/src/physical_plan/coalesce_partitions.rs index 4c040651cd0f..8781a3d3ad75 100644 --- a/datafusion/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/src/physical_plan/coalesce_partitions.rs @@ -20,6 +20,7 @@ use std::any::Any; use std::sync::Arc; +use std::task::Poll; use futures::channel::mpsc; use futures::Stream; @@ -29,6 +30,7 @@ use async_trait::async_trait; use arrow::record_batch::RecordBatch; use arrow::{datatypes::SchemaRef, error::Result as ArrowResult}; +use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::RecordBatchStream; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning}; @@ -43,12 +45,17 @@ use pin_project_lite::pin_project; pub struct CoalescePartitionsExec { /// Input execution plan input: Arc, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, } impl CoalescePartitionsExec { /// Create a new CoalescePartitionsExec pub fn new(input: Arc) -> Self { - CoalescePartitionsExec { input } + CoalescePartitionsExec { + input, + metrics: ExecutionPlanMetricsSet::new(), + } } /// Input execution plan @@ -90,6 +97,8 @@ impl ExecutionPlan for CoalescePartitionsExec { } async fn execute(&self, partition: usize) -> Result { + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + // CoalescePartitionsExec produces a single partition if 0 != partition { return Err(DataFusionError::Internal(format!( @@ -123,6 +132,7 @@ impl ExecutionPlan for CoalescePartitionsExec { Ok(Box::pin(MergeStream { input: receiver, schema: self.schema(), + baseline_metrics, })) } } @@ -139,6 +149,10 @@ impl ExecutionPlan for CoalescePartitionsExec { } } } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } } pin_project! { @@ -146,6 +160,7 @@ pin_project! { schema: SchemaRef, #[pin] input: mpsc::Receiver>, + baseline_metrics: BaselineMetrics } } @@ -155,9 +170,10 @@ impl Stream for MergeStream { fn poll_next( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + ) -> Poll> { let this = self.project(); - this.input.poll_next(cx) + let poll = this.input.poll_next(cx); + this.baseline_metrics.record_poll(poll) } } diff --git a/datafusion/src/physical_plan/display.rs b/datafusion/src/physical_plan/display.rs index 5ff99e5f9704..19a859a0f00a 100644 --- a/datafusion/src/physical_plan/display.rs +++ b/datafusion/src/physical_plan/display.rs @@ -139,7 +139,12 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> { ShowMetrics::None => {} ShowMetrics::Aggregated => { if let Some(metrics) = plan.metrics() { - write!(self.f, ", metrics=[{}]", metrics.aggregate_by_partition())?; + let metrics = metrics + .aggregate_by_partition() + .sorted_for_display() + .timestamps_removed(); + + write!(self.f, ", metrics=[{}]", metrics)?; } else { write!(self.f, ", metrics=[]")?; } diff --git a/datafusion/src/physical_plan/metrics/baseline.rs b/datafusion/src/physical_plan/metrics/baseline.rs new file mode 100644 index 000000000000..b007d074f624 --- /dev/null +++ b/datafusion/src/physical_plan/metrics/baseline.rs @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metrics common for almost all operators + +use std::task::Poll; + +use arrow::{error::ArrowError, record_batch::RecordBatch}; + +use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp}; + +/// Helper for creating and tracking common "baseline" metrics for +/// each operator +/// +/// Example: +/// ``` +/// use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; +/// let metrics = ExecutionPlanMetricsSet::new(); +/// +/// let partition = 2; +/// let baseline_metrics = BaselineMetrics::new(&metrics, partition); +/// +/// // during execution, in CPU intensive operation: +/// let timer = baseline_metrics.elapsed_compute().timer(); +/// // .. do CPU intensive work +/// timer.done(); +/// +/// // when operator is finished: +/// baseline_metrics.done(); +/// ``` +#[derive(Debug)] +pub struct BaselineMetrics { + /// end_time is set when `ExecutionMetrics::done()` is called + end_time: Timestamp, + + /// amount of time the operator was actively trying to use the CPU + elapsed_compute: Time, + + /// output rows: the total output rows + output_rows: Count, +} + +impl BaselineMetrics { + /// Create a new BaselineMetric structure, and set `start_time` to now + pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + let start_time = MetricBuilder::new(metrics).start_timestamp(partition); + start_time.record(); + + Self { + end_time: MetricBuilder::new(metrics).end_timestamp(partition), + elapsed_compute: MetricBuilder::new(metrics).elapsed_compute(partition), + output_rows: MetricBuilder::new(metrics).output_rows(partition), + } + } + + /// return the metric for cpu time spend in this operator + pub fn elapsed_compute(&self) -> &Time { + &self.elapsed_compute + } + + /// return the metric for the total number of output rows produced + pub fn output_rows(&self) -> &Count { + &self.output_rows + } + + /// Records the fact that this operator's execution is complete + /// (recording the `end_time` metric). + /// + /// Note care should be taken to call `done()` maually if + /// `BaselineMetrics` is not `drop`ped immediately upon operator + /// completion, as async streams may not be dropped immediately + /// depending on the consumer. + pub fn done(&self) { + self.end_time.record() + } + + /// Record that some number of rows have been produced as output + /// + /// See the [`RecordOutput`] for conveniently recording record + /// batch output for other thing + pub fn record_output(&self, num_rows: usize) { + self.output_rows.add(num_rows); + } + + /// Process a poll result of a stream producing output for an + /// operator, recording the output rows and stream done time and + /// returning the same poll result + pub fn record_poll( + &self, + poll: Poll>>, + ) -> Poll>> { + if let Poll::Ready(maybe_batch) = &poll { + match maybe_batch { + Some(Ok(batch)) => { + batch.record_output(self); + } + Some(Err(_)) => self.done(), + None => self.done(), + } + } + poll + } +} + +impl Drop for BaselineMetrics { + fn drop(&mut self) { + // if not previously recorded, record + if self.end_time.value().is_none() { + self.end_time.record() + } + } +} + +/// Trait for things that produce output rows as a result of execution. +pub trait RecordOutput { + /// Record that some number of output rows have been produced + /// + /// Meant to be composable so that instead of returning `batch` + /// the operator can return `batch.record_output(baseline_metrics)` + fn record_output(self, bm: &BaselineMetrics) -> Self; +} + +impl RecordOutput for usize { + fn record_output(self, bm: &BaselineMetrics) -> Self { + bm.record_output(self); + self + } +} + +impl RecordOutput for RecordBatch { + fn record_output(self, bm: &BaselineMetrics) -> Self { + bm.record_output(self.num_rows()); + self + } +} + +impl RecordOutput for &RecordBatch { + fn record_output(self, bm: &BaselineMetrics) -> Self { + bm.record_output(self.num_rows()); + self + } +} + +impl RecordOutput for Option<&RecordBatch> { + fn record_output(self, bm: &BaselineMetrics) -> Self { + if let Some(record_batch) = &self { + record_batch.record_output(bm); + } + self + } +} + +impl RecordOutput for Option { + fn record_output(self, bm: &BaselineMetrics) -> Self { + if let Some(record_batch) = &self { + record_batch.record_output(bm); + } + self + } +} + +impl RecordOutput for arrow::error::Result { + fn record_output(self, bm: &BaselineMetrics) -> Self { + if let Ok(record_batch) = &self { + record_batch.record_output(bm); + } + self + } +} diff --git a/datafusion/src/physical_plan/metrics/builder.rs b/datafusion/src/physical_plan/metrics/builder.rs index 34392c770ce4..510366bb3565 100644 --- a/datafusion/src/physical_plan/metrics/builder.rs +++ b/datafusion/src/physical_plan/metrics/builder.rs @@ -19,7 +19,9 @@ use std::{borrow::Cow, sync::Arc}; -use super::{Count, ExecutionPlanMetricsSet, Label, Metric, MetricValue, Time}; +use super::{ + Count, ExecutionPlanMetricsSet, Label, Metric, MetricValue, Time, Timestamp, +}; /// Structure for constructing metrics, counters, timers, etc. /// @@ -124,12 +126,12 @@ impl<'a> MetricBuilder<'a> { count } - /// Consume self and create a new Timer for recording the overall cpu time - /// spent by an operator - pub fn cpu_time(self, partition: usize) -> Time { + /// Consume self and create a new Timer for recording the elapsed + /// CPU time spent by an operator + pub fn elapsed_compute(self, partition: usize) -> Time { let time = Time::new(); self.with_partition(partition) - .build(MetricValue::CPUTime(time.clone())); + .build(MetricValue::ElapsedCompute(time.clone())); time } @@ -147,4 +149,22 @@ impl<'a> MetricBuilder<'a> { }); time } + + /// Consumes self and creates a new Timestamp for recording the + /// starting time of execution for a partition + pub fn start_timestamp(self, partition: usize) -> Timestamp { + let timestamp = Timestamp::new(); + self.with_partition(partition) + .build(MetricValue::StartTimestamp(timestamp.clone())); + timestamp + } + + /// Consumes self and creates a new Timestamp for recording the + /// ending time of execution for a partition + pub fn end_timestamp(self, partition: usize) -> Timestamp { + let timestamp = Timestamp::new(); + self.with_partition(partition) + .build(MetricValue::EndTimestamp(timestamp.clone())); + timestamp + } } diff --git a/datafusion/src/physical_plan/metrics/mod.rs b/datafusion/src/physical_plan/metrics/mod.rs index 7dd92a4d1ed1..f85a0f1d9723 100644 --- a/datafusion/src/physical_plan/metrics/mod.rs +++ b/datafusion/src/physical_plan/metrics/mod.rs @@ -17,6 +17,7 @@ //! Metrics for recording information about execution +mod baseline; mod builder; mod value; @@ -29,8 +30,9 @@ use std::{ use hashbrown::HashMap; // public exports +pub use baseline::{BaselineMetrics, RecordOutput}; pub use builder::MetricBuilder; -pub use value::{Count, MetricValue, ScopedTimerGuard, Time}; +pub use value::{Count, MetricValue, ScopedTimerGuard, Time, Timestamp}; /// Something that tracks a value of interest (metric) of a DataFusion /// [`ExecutionPlan`] execution. @@ -189,10 +191,10 @@ impl MetricsSet { .map(|v| v.as_usize()) } - /// convenience: return the amount of CPU time spent, aggregated - /// across partitions or None if no metric is present - pub fn cpu_time(&self) -> Option { - self.sum(|metric| matches!(metric.value(), MetricValue::CPUTime(_))) + /// convenience: return the amount of elapsed CPU time spent, + /// aggregated across partitions or None if no metric is present + pub fn elapsed_compute(&self) -> Option { + self.sum(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_))) .map(|v| v.as_usize()) } @@ -216,7 +218,7 @@ impl MetricsSet { Some(metric) => metric.value().new_empty(), }; - iter.for_each(|metric| accum.add(metric.value())); + iter.for_each(|metric| accum.aggregate(metric.value())); Some(accum) } @@ -233,7 +235,7 @@ impl MetricsSet { let key = (metric.value.name(), metric.labels.clone()); map.entry(key) .and_modify(|accum: &mut Metric| { - accum.value_mut().add(metric.value()); + accum.value_mut().aggregate(metric.value()); }) .or_insert_with(|| { // accumulate with no partition @@ -243,7 +245,7 @@ impl MetricsSet { partition, metric.labels().to_vec(), ); - accum.value_mut().add(metric.value()); + accum.value_mut().aggregate(metric.value()); accum }); } @@ -257,6 +259,25 @@ impl MetricsSet { metrics: new_metrics, } } + + /// Sort the order of metrics so the "most useful" show up first + pub fn sorted_for_display(mut self) -> Self { + self.metrics + .sort_unstable_by_key(|metric| metric.value().display_sort_key()); + self + } + + /// remove all timestamp metrics (for more compact display + pub fn timestamps_removed(self) -> Self { + let Self { metrics } = self; + + let metrics = metrics + .into_iter() + .filter(|m| !m.value.is_timestamp()) + .collect::>(); + + Self { metrics } + } } impl Display for MetricsSet { @@ -351,6 +372,8 @@ impl Display for Label { mod tests { use std::time::Duration; + use chrono::{TimeZone, Utc}; + use super::*; #[test] @@ -414,17 +437,17 @@ mod tests { } #[test] - fn test_cpu_time() { + fn test_elapsed_compute() { let metrics = ExecutionPlanMetricsSet::new(); - assert!(metrics.clone_inner().cpu_time().is_none()); + assert!(metrics.clone_inner().elapsed_compute().is_none()); let partition = 1; - let cpu_time = MetricBuilder::new(&metrics).cpu_time(partition); - cpu_time.add_duration(Duration::from_nanos(1234)); + let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition); + elapsed_compute.add_duration(Duration::from_nanos(1234)); - let cpu_time = MetricBuilder::new(&metrics).cpu_time(partition + 1); - cpu_time.add_duration(Duration::from_nanos(6)); - assert_eq!(metrics.clone_inner().cpu_time().unwrap(), 1240); + let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition + 1); + elapsed_compute.add_duration(Duration::from_nanos(6)); + assert_eq!(metrics.clone_inner().elapsed_compute().unwrap(), 1240); } #[test] @@ -473,16 +496,16 @@ mod tests { let metrics = ExecutionPlanMetricsSet::new(); // Note cpu_time1 has labels so it is not aggregated with 2 and 3 - let cpu_time1 = MetricBuilder::new(&metrics) + let elapsed_compute1 = MetricBuilder::new(&metrics) .with_new_label("foo", "bar") - .cpu_time(1); - cpu_time1.add_duration(Duration::from_nanos(12)); + .elapsed_compute(1); + elapsed_compute1.add_duration(Duration::from_nanos(12)); - let cpu_time2 = MetricBuilder::new(&metrics).cpu_time(2); - cpu_time2.add_duration(Duration::from_nanos(34)); + let elapsed_compute2 = MetricBuilder::new(&metrics).elapsed_compute(2); + elapsed_compute2.add_duration(Duration::from_nanos(34)); - let cpu_time3 = MetricBuilder::new(&metrics).cpu_time(4); - cpu_time3.add_duration(Duration::from_nanos(56)); + let elapsed_compute3 = MetricBuilder::new(&metrics).elapsed_compute(4); + elapsed_compute3.add_duration(Duration::from_nanos(56)); let output_rows = MetricBuilder::new(&metrics).output_rows(1); // output rows output_rows.add(56); @@ -490,16 +513,16 @@ mod tests { let aggregated = metrics.clone_inner().aggregate_by_partition(); // cpu time should be aggregated: - let cpu_times = aggregated + let elapsed_computes = aggregated .iter() .filter(|metric| { - matches!(metric.value(), MetricValue::CPUTime(_)) + matches!(metric.value(), MetricValue::ElapsedCompute(_)) && metric.labels().is_empty() }) .collect::>(); - assert_eq!(cpu_times.len(), 1); - assert_eq!(cpu_times[0].value().as_usize(), 34 + 56); - assert!(cpu_times[0].partition().is_none()); + assert_eq!(elapsed_computes.len(), 1); + assert_eq!(elapsed_computes[0].value().as_usize(), 34 + 56); + assert!(elapsed_computes[0].partition().is_none()); // output rows should let output_rows = aggregated @@ -525,4 +548,88 @@ mod tests { // can't aggregate time and count -- expect a panic metrics.clone_inner().aggregate_by_partition(); } + + #[test] + fn test_aggregate_partition_timestamps() { + let metrics = ExecutionPlanMetricsSet::new(); + + // 1431648000000000 == 1970-01-17 13:40:48 UTC + let t1 = Utc.timestamp_nanos(1431648000000000); + // 1531648000000000 == 1970-01-18 17:27:28 UTC + let t2 = Utc.timestamp_nanos(1531648000000000); + // 1631648000000000 == 1970-01-19 21:14:08 UTC + let t3 = Utc.timestamp_nanos(1631648000000000); + // 1731648000000000 == 1970-01-21 01:00:48 UTC + let t4 = Utc.timestamp_nanos(1731648000000000); + + let start_timestamp0 = MetricBuilder::new(&metrics).start_timestamp(0); + start_timestamp0.set(t1); + let end_timestamp0 = MetricBuilder::new(&metrics).end_timestamp(0); + end_timestamp0.set(t2); + let start_timestamp1 = MetricBuilder::new(&metrics).start_timestamp(0); + start_timestamp1.set(t3); + let end_timestamp1 = MetricBuilder::new(&metrics).end_timestamp(0); + end_timestamp1.set(t4); + + // aggregate + let aggregated = metrics.clone_inner().aggregate_by_partition(); + + let mut ts = aggregated + .iter() + .filter(|metric| { + matches!(metric.value(), MetricValue::StartTimestamp(_)) + && metric.labels().is_empty() + }) + .collect::>(); + assert_eq!(ts.len(), 1); + match ts.remove(0).value() { + MetricValue::StartTimestamp(ts) => { + // expect earliest of t1, t2 + assert_eq!(ts.value(), Some(t1)); + } + _ => { + panic!("Not a timestamp"); + } + }; + + let mut ts = aggregated + .iter() + .filter(|metric| { + matches!(metric.value(), MetricValue::EndTimestamp(_)) + && metric.labels().is_empty() + }) + .collect::>(); + assert_eq!(ts.len(), 1); + match ts.remove(0).value() { + MetricValue::EndTimestamp(ts) => { + // expect latest of t3, t4 + assert_eq!(ts.value(), Some(t4)); + } + _ => { + panic!("Not a timestamp"); + } + }; + } + + #[test] + fn test_sorted_for_display() { + let metrics = ExecutionPlanMetricsSet::new(); + MetricBuilder::new(&metrics).end_timestamp(0); + MetricBuilder::new(&metrics).start_timestamp(0); + MetricBuilder::new(&metrics).elapsed_compute(0); + MetricBuilder::new(&metrics).counter("the_counter", 0); + MetricBuilder::new(&metrics).subset_time("the_time", 0); + MetricBuilder::new(&metrics).output_rows(0); + let metrics = metrics.clone_inner(); + + fn metric_names(metrics: &MetricsSet) -> String { + let n = metrics.iter().map(|m| m.value().name()).collect::>(); + n.join(", ") + } + + assert_eq!("end_timestamp, start_timestamp, elapsed_compute, the_counter, the_time, output_rows", metric_names(&metrics)); + + let metrics = metrics.sorted_for_display(); + assert_eq!("output_rows, elapsed_compute, the_counter, the_time, start_timestamp, end_timestamp", metric_names(&metrics)); + } } diff --git a/datafusion/src/physical_plan/metrics/value.rs b/datafusion/src/physical_plan/metrics/value.rs index 61a7c2bfa0e8..6f6358339bdb 100644 --- a/datafusion/src/physical_plan/metrics/value.rs +++ b/datafusion/src/physical_plan/metrics/value.rs @@ -19,13 +19,16 @@ use std::{ borrow::{Borrow, Cow}, + fmt::Display, sync::{ atomic::{AtomicUsize, Ordering}, - Arc, + Arc, Mutex, }, time::{Duration, Instant}, }; +use chrono::{DateTime, Utc}; + /// A counter to record things such as number of input or output rows /// /// Note `clone`ing counters update the same underlying metrics @@ -41,6 +44,12 @@ impl PartialEq for Count { } } +impl Display for Count { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.value()) + } +} + impl Count { /// create a new counter pub fn new() -> Self { @@ -75,6 +84,13 @@ impl PartialEq for Time { } } +impl Display for Time { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let duration = std::time::Duration::from_nanos(self.value() as u64); + write!(f, "{:?}", duration) + } +} + impl Time { /// Create a new [`Time`] wrapper suitable for recording elapsed /// times for operations. @@ -116,6 +132,82 @@ impl Time { } } +/// Stores a single timestamp, stored as the number of nanoseconds +/// elapsed from Jan 1, 1970 UTC +#[derive(Debug, Clone)] +pub struct Timestamp { + /// Time thing started + timestamp: Arc>>>, +} + +impl Timestamp { + /// Create a new timestamp and sets its value to 0 + pub fn new() -> Self { + Self { + timestamp: Arc::new(Mutex::new(None)), + } + } + + /// Sets the timestamps value to the current time + pub fn record(&self) { + self.set(Utc::now()) + } + + /// Sets the timestamps value to a specified time + pub fn set(&self, now: DateTime) { + *self.timestamp.lock().unwrap() = Some(now); + } + + /// return the timestamps value at the last time `record()` was + /// called. + /// + /// Returns `None` if `record()` has not been called + pub fn value(&self) -> Option> { + *self.timestamp.lock().unwrap() + } + + /// sets the value of this timestamp to the minimum of this and other + pub fn update_to_min(&self, other: &Timestamp) { + let min = match (self.value(), other.value()) { + (None, None) => None, + (Some(v), None) => Some(v), + (None, Some(v)) => Some(v), + (Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }), + }; + + *self.timestamp.lock().unwrap() = min; + } + + /// sets the value of this timestamp to the maximum of this and other + pub fn update_to_max(&self, other: &Timestamp) { + let max = match (self.value(), other.value()) { + (None, None) => None, + (Some(v), None) => Some(v), + (None, Some(v)) => Some(v), + (Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }), + }; + + *self.timestamp.lock().unwrap() = max; + } +} + +impl PartialEq for Timestamp { + fn eq(&self, other: &Self) -> bool { + self.value().eq(&other.value()) + } +} + +impl Display for Timestamp { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.value() { + None => write!(f, "NONE"), + Some(v) => { + write!(f, "{}", v) + } + } + } +} + /// RAAI structure that adds all time between its construction and /// destruction to the CPU time or the first call to `stop` whichever /// comes first @@ -144,7 +236,7 @@ impl<'a> Drop for ScopedTimerGuard<'a> { } } -/// Possible values for a metric. +/// Possible values for a [`Metric`]. /// /// Among other differences, the metric types have different ways to /// logically interpret their underlying values and some metrics are @@ -153,8 +245,26 @@ impl<'a> Drop for ScopedTimerGuard<'a> { pub enum MetricValue { /// Number of output rows produced: "output_rows" metric OutputRows(Count), - /// CPU time: the "cpu_time" metric - CPUTime(Time), + /// Elapsed Compute Time: the wall clock time spent in "cpu + /// intensive" work. + /// + /// This measurement represents, roughly: + /// ``` + /// use std::time::Instant; + /// let start = Instant::now(); + /// // ...CPU intensive work here... + /// let elapsed_compute = (Instant::now() - start).as_nanos(); + /// ``` + /// + /// Note 1: Does *not* include time other operators spend + /// computing input. + /// + /// Note 2: *Does* includes time when the thread could have made + /// progress but the OS did not schedule it (e.g. due to CPU + /// contention), thus making this value different than the + /// classical defintion of "cpu_time", which is the time reported + /// from `clock_gettime(CLOCK_THREAD_CPUTIME_ID, ..)`. + ElapsedCompute(Time), /// Operator defined count. Count { /// The provided name of this metric @@ -169,8 +279,10 @@ pub enum MetricValue { /// The value of the metric time: Time, }, - // TODO timestamp, etc - // https://github.com/apache/arrow-datafusion/issues/866 + /// The time at which execution started + StartTimestamp(Timestamp), + /// The time at which execution ended + EndTimestamp(Timestamp), } impl MetricValue { @@ -178,9 +290,11 @@ impl MetricValue { pub fn name(&self) -> &str { match self { Self::OutputRows(_) => "output_rows", - Self::CPUTime(_) => "cpu_time", + Self::ElapsedCompute(_) => "elapsed_compute", Self::Count { name, .. } => name.borrow(), Self::Time { name, .. } => name.borrow(), + Self::StartTimestamp(_) => "start_timestamp", + Self::EndTimestamp(_) => "end_timestamp", } } @@ -188,9 +302,17 @@ impl MetricValue { pub fn as_usize(&self) -> usize { match self { Self::OutputRows(count) => count.value(), - Self::CPUTime(time) => time.value(), + Self::ElapsedCompute(time) => time.value(), Self::Count { count, .. } => count.value(), Self::Time { time, .. } => time.value(), + Self::StartTimestamp(timestamp) => timestamp + .value() + .map(|ts| ts.timestamp_nanos() as usize) + .unwrap_or(0), + Self::EndTimestamp(timestamp) => timestamp + .value() + .map(|ts| ts.timestamp_nanos() as usize) + .unwrap_or(0), } } @@ -199,7 +321,7 @@ impl MetricValue { pub fn new_empty(&self) -> Self { match self { Self::OutputRows(_) => Self::OutputRows(Count::new()), - Self::CPUTime(_) => Self::CPUTime(Time::new()), + Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()), Self::Count { name, .. } => Self::Count { name: name.clone(), count: Count::new(), @@ -208,18 +330,21 @@ impl MetricValue { name: name.clone(), time: Time::new(), }, + Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()), + Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()), } } - /// Add the value of other to `self`. panic's if the type is mismatched or - /// aggregating does not make sense for this value + /// Aggregates the value of other to `self`. panic's if the types + /// are mismatched or aggregating does not make sense for this + /// value /// /// Note this is purposely marked `mut` (even though atomics are /// used) so Rust's type system can be used to ensure the /// appropriate API access. `MetricValues` should be modified /// using the original [`Count`] or [`Time`] they were created /// from. - pub fn add(&mut self, other: &Self) { + pub fn aggregate(&mut self, other: &Self) { match (self, other) { (Self::OutputRows(count), Self::OutputRows(other_count)) | ( @@ -228,13 +353,21 @@ impl MetricValue { count: other_count, .. }, ) => count.add(other_count.value()), - (Self::CPUTime(time), Self::CPUTime(other_time)) + (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time)) | ( Self::Time { time, .. }, Self::Time { time: other_time, .. }, ) => time.add(other_time), + // timestamps are aggregated by min/max + (Self::StartTimestamp(timestamp), Self::StartTimestamp(other_timestamp)) => { + timestamp.update_to_min(other_timestamp); + } + // timestamps are aggregated by min/max + (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => { + timestamp.update_to_max(other_timestamp); + } m @ (_, _) => { panic!( "Mismatched metric types. Can not aggregate {:?} with value {:?}", @@ -243,6 +376,24 @@ impl MetricValue { } } } + + /// Returns a number by which to sort metrics by display. Lower + /// numbers are "more useful" (and displayed first) + pub fn display_sort_key(&self) -> u8 { + match self { + Self::OutputRows(_) => 0, // show first + Self::ElapsedCompute(_) => 1, // show second + Self::Count { .. } => 2, + Self::Time { .. } => 3, + Self::StartTimestamp(_) => 4, // show timestamps last + Self::EndTimestamp(_) => 5, + } + } + + /// returns true if this metric has a timestamp value + pub fn is_timestamp(&self) -> bool { + matches!(self, Self::StartTimestamp(_) | Self::EndTimestamp(_)) + } } impl std::fmt::Display for MetricValue { @@ -250,12 +401,94 @@ impl std::fmt::Display for MetricValue { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::OutputRows(count) | Self::Count { count, .. } => { - write!(f, "{}", count.value()) + write!(f, "{}", count) + } + Self::ElapsedCompute(time) | Self::Time { time, .. } => { + // distinguish between no time recorded and very small + // amount of time recorded + if time.value() > 0 { + write!(f, "{}", time) + } else { + write!(f, "NOT RECORDED") + } } - Self::CPUTime(time) | Self::Time { time, .. } => { - let duration = std::time::Duration::from_nanos(time.value() as u64); - write!(f, "{:?}", duration) + Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => { + write!(f, "{}", timestamp) } } } } + +#[cfg(test)] +mod tests { + use chrono::TimeZone; + + use super::*; + + #[test] + fn test_display_output_rows() { + let count = Count::new(); + let values = vec![ + MetricValue::OutputRows(count.clone()), + MetricValue::Count { + name: "my_counter".into(), + count: count.clone(), + }, + ]; + + for value in &values { + assert_eq!("0", value.to_string(), "value {:?}", value); + } + + count.add(42); + for value in &values { + assert_eq!("42", value.to_string(), "value {:?}", value); + } + } + + #[test] + fn test_display_time() { + let time = Time::new(); + let values = vec![ + MetricValue::ElapsedCompute(time.clone()), + MetricValue::Time { + name: "my_time".into(), + time: time.clone(), + }, + ]; + + // if time is not set, it should not be reported as zero + for value in &values { + assert_eq!("NOT RECORDED", value.to_string(), "value {:?}", value); + } + + time.add_duration(Duration::from_nanos(1042)); + for value in &values { + assert_eq!("1.042µs", value.to_string(), "value {:?}", value); + } + } + + #[test] + fn test_display_timestamp() { + let timestamp = Timestamp::new(); + let values = vec![ + MetricValue::StartTimestamp(timestamp.clone()), + MetricValue::EndTimestamp(timestamp.clone()), + ]; + + // if time is not set, it should not be reported as zero + for value in &values { + assert_eq!("NONE", value.to_string(), "value {:?}", value); + } + + timestamp.set(Utc.timestamp_nanos(1431648000000000)); + for value in &values { + assert_eq!( + "1970-01-17 13:40:48 UTC", + value.to_string(), + "value {:?}", + value + ); + } + } +} diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs index f1346b5b1d71..df77a16c2947 100644 --- a/datafusion/src/physical_plan/sort.rs +++ b/datafusion/src/physical_plan/sort.rs @@ -155,13 +155,14 @@ impl ExecutionPlan for SortExec { let output_rows = MetricBuilder::new(&self.metrics).output_rows(partition); - let cpu_time = MetricBuilder::new(&self.metrics).cpu_time(partition); + let elapsed_compute = + MetricBuilder::new(&self.metrics).elapsed_compute(partition); Ok(Box::pin(SortStream::new( input, self.expr.clone(), output_rows, - cpu_time, + elapsed_compute, ))) } @@ -436,7 +437,7 @@ mod tests { let result: Vec = collect(sort_exec.clone()).await?; let metrics = sort_exec.metrics().unwrap(); - assert!(metrics.cpu_time().unwrap() > 0); + assert!(metrics.elapsed_compute().unwrap() > 0); assert_eq!(metrics.output_rows().unwrap(), 8); assert_eq!(result.len(), 1); diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index a2e84e1f6d9a..8aae3d950719 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -2198,7 +2198,8 @@ async fn csv_explain_analyze() { // Only test basic plumbing and try to avoid having to change too // many things - let needle = "RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), metrics=["; + let needle = + "CoalescePartitionsExec, metrics=[output_rows=5, elapsed_compute=NOT RECORDED"; assert_contains!(&formatted, needle); let verbose_needle = "Output Rows";