Skip to content

Commit e5c5c07

Browse files
committed
Add BaselineMetrics, Timestamp metrics, add for CoalscePartitionExec
1 parent bd49b86 commit e5c5c07

File tree

8 files changed

+624
-57
lines changed

8 files changed

+624
-57
lines changed

datafusion/src/physical_plan/coalesce_partitions.rs

+19-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
2121
use std::any::Any;
2222
use std::sync::Arc;
23+
use std::task::Poll;
2324

2425
use futures::channel::mpsc;
2526
use futures::Stream;
@@ -29,6 +30,7 @@ use async_trait::async_trait;
2930
use arrow::record_batch::RecordBatch;
3031
use arrow::{datatypes::SchemaRef, error::Result as ArrowResult};
3132

33+
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
3234
use super::RecordBatchStream;
3335
use crate::error::{DataFusionError, Result};
3436
use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
@@ -43,12 +45,17 @@ use pin_project_lite::pin_project;
4345
pub struct CoalescePartitionsExec {
4446
/// Input execution plan
4547
input: Arc<dyn ExecutionPlan>,
48+
/// Execution metrics
49+
metrics: ExecutionPlanMetricsSet,
4650
}
4751

4852
impl CoalescePartitionsExec {
4953
/// Create a new CoalescePartitionsExec
5054
pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
51-
CoalescePartitionsExec { input }
55+
CoalescePartitionsExec {
56+
input,
57+
metrics: ExecutionPlanMetricsSet::new(),
58+
}
5259
}
5360

5461
/// Input execution plan
@@ -90,6 +97,8 @@ impl ExecutionPlan for CoalescePartitionsExec {
9097
}
9198

9299
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
100+
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
101+
93102
// CoalescePartitionsExec produces a single partition
94103
if 0 != partition {
95104
return Err(DataFusionError::Internal(format!(
@@ -123,6 +132,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
123132
Ok(Box::pin(MergeStream {
124133
input: receiver,
125134
schema: self.schema(),
135+
baseline_metrics,
126136
}))
127137
}
128138
}
@@ -139,13 +149,18 @@ impl ExecutionPlan for CoalescePartitionsExec {
139149
}
140150
}
141151
}
152+
153+
fn metrics(&self) -> Option<MetricsSet> {
154+
Some(self.metrics.clone_inner())
155+
}
142156
}
143157

144158
pin_project! {
145159
struct MergeStream {
146160
schema: SchemaRef,
147161
#[pin]
148162
input: mpsc::Receiver<ArrowResult<RecordBatch>>,
163+
baseline_metrics: BaselineMetrics
149164
}
150165
}
151166

@@ -155,9 +170,10 @@ impl Stream for MergeStream {
155170
fn poll_next(
156171
self: std::pin::Pin<&mut Self>,
157172
cx: &mut std::task::Context<'_>,
158-
) -> std::task::Poll<Option<Self::Item>> {
173+
) -> Poll<Option<Self::Item>> {
159174
let this = self.project();
160-
this.input.poll_next(cx)
175+
let poll = this.input.poll_next(cx);
176+
this.baseline_metrics.record_poll(poll)
161177
}
162178
}
163179

datafusion/src/physical_plan/display.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,12 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
139139
ShowMetrics::None => {}
140140
ShowMetrics::Aggregated => {
141141
if let Some(metrics) = plan.metrics() {
142-
write!(self.f, ", metrics=[{}]", metrics.aggregate_by_partition())?;
142+
let metrics = metrics
143+
.aggregate_by_partition()
144+
.sorted_for_display()
145+
.timestamps_removed();
146+
147+
write!(self.f, ", metrics=[{}]", metrics)?;
143148
} else {
144149
write!(self.f, ", metrics=[]")?;
145150
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Metrics common for almost all operators
19+
20+
use std::task::Poll;
21+
22+
use arrow::{error::ArrowError, record_batch::RecordBatch};
23+
24+
use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
25+
26+
/// Helper for creating and tracking common "baseline" metrics for
27+
/// each operator
28+
///
29+
/// Example:
30+
/// ```
31+
/// use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
32+
/// let metrics = ExecutionPlanMetricsSet::new();
33+
///
34+
/// let partition = 2;
35+
/// let baseline_metrics = BaselineMetrics::new(&metrics, partition);
36+
///
37+
/// // during execution, in CPU intensive operation:
38+
/// let timer = baseline_metrics.elapsed_compute().timer();
39+
/// // .. do CPU intensive work
40+
/// timer.done();
41+
///
42+
/// // when operator is finished:
43+
/// baseline_metrics.done();
44+
/// ```
45+
#[derive(Debug)]
46+
pub struct BaselineMetrics {
47+
/// end_time is set when `ExecutionMetrics::done()` is called
48+
end_time: Timestamp,
49+
50+
/// amount of time the operator was actively trying to use the CPU
51+
elapsed_compute: Time,
52+
53+
/// output rows: the total output rows
54+
output_rows: Count,
55+
}
56+
57+
impl BaselineMetrics {
58+
/// Create a new BaselineMetric structure, and set `start_time` to now
59+
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
60+
let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
61+
start_time.record();
62+
63+
Self {
64+
end_time: MetricBuilder::new(metrics).end_timestamp(partition),
65+
elapsed_compute: MetricBuilder::new(metrics).elapsed_compute(partition),
66+
output_rows: MetricBuilder::new(metrics).output_rows(partition),
67+
}
68+
}
69+
70+
/// return the metric for cpu time spend in this operator
71+
pub fn elapsed_compute(&self) -> &Time {
72+
&self.elapsed_compute
73+
}
74+
75+
/// return the metric for the total number of output rows produced
76+
pub fn output_rows(&self) -> &Count {
77+
&self.output_rows
78+
}
79+
80+
/// Records the fact that this operator's execution is complete
81+
/// (recording the `end_time` metric).
82+
///
83+
/// Note care should be taken to call `done()` maually if
84+
/// `BaselineMetrics` is not `drop`ped immediately upon operator
85+
/// completion, as async streams may not be dropped immediately
86+
/// depending on the consumer.
87+
pub fn done(&self) {
88+
self.end_time.record()
89+
}
90+
91+
/// Record that some number of rows have been produced as output
92+
///
93+
/// See the [`RecordOutput`] for conveniently recording record
94+
/// batch output for other thing
95+
pub fn record_output(&self, num_rows: usize) {
96+
self.output_rows.add(num_rows);
97+
}
98+
99+
/// Process a poll result of a stream producing output for an
100+
/// operator, recording the output rows and stream done time and
101+
/// returning the same poll result
102+
pub fn record_poll(
103+
&self,
104+
poll: Poll<Option<Result<RecordBatch, ArrowError>>>,
105+
) -> Poll<Option<Result<RecordBatch, ArrowError>>> {
106+
if let Poll::Ready(maybe_batch) = &poll {
107+
match maybe_batch {
108+
Some(Ok(batch)) => {
109+
batch.record_output(self);
110+
}
111+
Some(Err(_)) => self.done(),
112+
None => self.done(),
113+
}
114+
}
115+
poll
116+
}
117+
}
118+
119+
impl Drop for BaselineMetrics {
120+
fn drop(&mut self) {
121+
// if not previously recorded, record
122+
if self.end_time.value().is_none() {
123+
self.end_time.record()
124+
}
125+
}
126+
}
127+
128+
/// Trait for things that produce output rows as a result of execution.
129+
pub trait RecordOutput {
130+
/// Record that some number of output rows have been produced
131+
///
132+
/// Meant to be composable so that instead of returning `batch`
133+
/// the operator can return `batch.record_output(baseline_metrics)`
134+
fn record_output(self, bm: &BaselineMetrics) -> Self;
135+
}
136+
137+
impl RecordOutput for usize {
138+
fn record_output(self, bm: &BaselineMetrics) -> Self {
139+
bm.record_output(self);
140+
self
141+
}
142+
}
143+
144+
impl RecordOutput for RecordBatch {
145+
fn record_output(self, bm: &BaselineMetrics) -> Self {
146+
bm.record_output(self.num_rows());
147+
self
148+
}
149+
}
150+
151+
impl RecordOutput for &RecordBatch {
152+
fn record_output(self, bm: &BaselineMetrics) -> Self {
153+
bm.record_output(self.num_rows());
154+
self
155+
}
156+
}
157+
158+
impl RecordOutput for Option<&RecordBatch> {
159+
fn record_output(self, bm: &BaselineMetrics) -> Self {
160+
if let Some(record_batch) = &self {
161+
record_batch.record_output(bm);
162+
}
163+
self
164+
}
165+
}
166+
167+
impl RecordOutput for Option<RecordBatch> {
168+
fn record_output(self, bm: &BaselineMetrics) -> Self {
169+
if let Some(record_batch) = &self {
170+
record_batch.record_output(bm);
171+
}
172+
self
173+
}
174+
}
175+
176+
impl RecordOutput for arrow::error::Result<RecordBatch> {
177+
fn record_output(self, bm: &BaselineMetrics) -> Self {
178+
if let Ok(record_batch) = &self {
179+
record_batch.record_output(bm);
180+
}
181+
self
182+
}
183+
}

datafusion/src/physical_plan/metrics/builder.rs

+25-5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
2020
use std::{borrow::Cow, sync::Arc};
2121

22-
use super::{Count, ExecutionPlanMetricsSet, Label, Metric, MetricValue, Time};
22+
use super::{
23+
Count, ExecutionPlanMetricsSet, Label, Metric, MetricValue, Time, Timestamp,
24+
};
2325

2426
/// Structure for constructing metrics, counters, timers, etc.
2527
///
@@ -124,12 +126,12 @@ impl<'a> MetricBuilder<'a> {
124126
count
125127
}
126128

127-
/// Consume self and create a new Timer for recording the overall cpu time
128-
/// spent by an operator
129-
pub fn cpu_time(self, partition: usize) -> Time {
129+
/// Consume self and create a new Timer for recording the elapsed
130+
/// CPU time spent by an operator
131+
pub fn elapsed_compute(self, partition: usize) -> Time {
130132
let time = Time::new();
131133
self.with_partition(partition)
132-
.build(MetricValue::CPUTime(time.clone()));
134+
.build(MetricValue::ElapsedCompute(time.clone()));
133135
time
134136
}
135137

@@ -147,4 +149,22 @@ impl<'a> MetricBuilder<'a> {
147149
});
148150
time
149151
}
152+
153+
/// Consumes self and creates a new Timestamp for recording the
154+
/// starting time of execution for a partition
155+
pub fn start_timestamp(self, partition: usize) -> Timestamp {
156+
let timestamp = Timestamp::new();
157+
self.with_partition(partition)
158+
.build(MetricValue::StartTimestamp(timestamp.clone()));
159+
timestamp
160+
}
161+
162+
/// Consumes self and creates a new Timestamp for recording the
163+
/// ending time of execution for a partition
164+
pub fn end_timestamp(self, partition: usize) -> Timestamp {
165+
let timestamp = Timestamp::new();
166+
self.with_partition(partition)
167+
.build(MetricValue::EndTimestamp(timestamp.clone()));
168+
timestamp
169+
}
150170
}

0 commit comments

Comments
 (0)