Skip to content

Commit

Permalink
Add BaselineMetrics, Timestamp metrics, add for CoalscePartitionExec (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Aug 26, 2021
1 parent 74d2942 commit d31c157
Show file tree
Hide file tree
Showing 8 changed files with 623 additions and 57 deletions.
22 changes: 19 additions & 3 deletions datafusion/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use std::any::Any;
use std::sync::Arc;
use std::task::Poll;

use futures::channel::mpsc;
use futures::Stream;
Expand All @@ -29,6 +30,7 @@ use async_trait::async_trait;
use arrow::record_batch::RecordBatch;
use arrow::{datatypes::SchemaRef, error::Result as ArrowResult};

use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::RecordBatchStream;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
Expand All @@ -43,12 +45,17 @@ use pin_project_lite::pin_project;
pub struct CoalescePartitionsExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

impl CoalescePartitionsExec {
/// Create a new CoalescePartitionsExec
pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
CoalescePartitionsExec { input }
CoalescePartitionsExec {
input,
metrics: ExecutionPlanMetricsSet::new(),
}
}

/// Input execution plan
Expand Down Expand Up @@ -90,6 +97,8 @@ impl ExecutionPlan for CoalescePartitionsExec {
}

async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);

// CoalescePartitionsExec produces a single partition
if 0 != partition {
return Err(DataFusionError::Internal(format!(
Expand Down Expand Up @@ -123,6 +132,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
Ok(Box::pin(MergeStream {
input: receiver,
schema: self.schema(),
baseline_metrics,
}))
}
}
Expand All @@ -139,13 +149,18 @@ impl ExecutionPlan for CoalescePartitionsExec {
}
}
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}

pin_project! {
struct MergeStream {
schema: SchemaRef,
#[pin]
input: mpsc::Receiver<ArrowResult<RecordBatch>>,
baseline_metrics: BaselineMetrics
}
}

Expand All @@ -155,9 +170,10 @@ impl Stream for MergeStream {
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
) -> Poll<Option<Self::Item>> {
let this = self.project();
this.input.poll_next(cx)
let poll = this.input.poll_next(cx);
this.baseline_metrics.record_poll(poll)
}
}

Expand Down
7 changes: 6 additions & 1 deletion datafusion/src/physical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,12 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
ShowMetrics::None => {}
ShowMetrics::Aggregated => {
if let Some(metrics) = plan.metrics() {
write!(self.f, ", metrics=[{}]", metrics.aggregate_by_partition())?;
let metrics = metrics
.aggregate_by_partition()
.sorted_for_display()
.timestamps_removed();

write!(self.f, ", metrics=[{}]", metrics)?;
} else {
write!(self.f, ", metrics=[]")?;
}
Expand Down
183 changes: 183 additions & 0 deletions datafusion/src/physical_plan/metrics/baseline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Metrics common for almost all operators
use std::task::Poll;

use arrow::{error::ArrowError, record_batch::RecordBatch};

use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};

/// Helper for creating and tracking common "baseline" metrics for
/// each operator
///
/// Example:
/// ```
/// use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
/// let metrics = ExecutionPlanMetricsSet::new();
///
/// let partition = 2;
/// let baseline_metrics = BaselineMetrics::new(&metrics, partition);
///
/// // during execution, in CPU intensive operation:
/// let timer = baseline_metrics.elapsed_compute().timer();
/// // .. do CPU intensive work
/// timer.done();
///
/// // when operator is finished:
/// baseline_metrics.done();
/// ```
#[derive(Debug)]
pub struct BaselineMetrics {
/// end_time is set when `ExecutionMetrics::done()` is called
end_time: Timestamp,

/// amount of time the operator was actively trying to use the CPU
elapsed_compute: Time,

/// output rows: the total output rows
output_rows: Count,
}

impl BaselineMetrics {
/// Create a new BaselineMetric structure, and set `start_time` to now
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
start_time.record();

Self {
end_time: MetricBuilder::new(metrics).end_timestamp(partition),
elapsed_compute: MetricBuilder::new(metrics).elapsed_compute(partition),
output_rows: MetricBuilder::new(metrics).output_rows(partition),
}
}

/// return the metric for cpu time spend in this operator
pub fn elapsed_compute(&self) -> &Time {
&self.elapsed_compute
}

/// return the metric for the total number of output rows produced
pub fn output_rows(&self) -> &Count {
&self.output_rows
}

/// Records the fact that this operator's execution is complete
/// (recording the `end_time` metric).
///
/// Note care should be taken to call `done()` maually if
/// `BaselineMetrics` is not `drop`ped immediately upon operator
/// completion, as async streams may not be dropped immediately
/// depending on the consumer.
pub fn done(&self) {
self.end_time.record()
}

/// Record that some number of rows have been produced as output
///
/// See the [`RecordOutput`] for conveniently recording record
/// batch output for other thing
pub fn record_output(&self, num_rows: usize) {
self.output_rows.add(num_rows);
}

/// Process a poll result of a stream producing output for an
/// operator, recording the output rows and stream done time and
/// returning the same poll result
pub fn record_poll(
&self,
poll: Poll<Option<Result<RecordBatch, ArrowError>>>,
) -> Poll<Option<Result<RecordBatch, ArrowError>>> {
if let Poll::Ready(maybe_batch) = &poll {
match maybe_batch {
Some(Ok(batch)) => {
batch.record_output(self);
}
Some(Err(_)) => self.done(),
None => self.done(),
}
}
poll
}
}

impl Drop for BaselineMetrics {
fn drop(&mut self) {
// if not previously recorded, record
if self.end_time.value().is_none() {
self.end_time.record()
}
}
}

/// Trait for things that produce output rows as a result of execution.
pub trait RecordOutput {
/// Record that some number of output rows have been produced
///
/// Meant to be composable so that instead of returning `batch`
/// the operator can return `batch.record_output(baseline_metrics)`
fn record_output(self, bm: &BaselineMetrics) -> Self;
}

impl RecordOutput for usize {
fn record_output(self, bm: &BaselineMetrics) -> Self {
bm.record_output(self);
self
}
}

impl RecordOutput for RecordBatch {
fn record_output(self, bm: &BaselineMetrics) -> Self {
bm.record_output(self.num_rows());
self
}
}

impl RecordOutput for &RecordBatch {
fn record_output(self, bm: &BaselineMetrics) -> Self {
bm.record_output(self.num_rows());
self
}
}

impl RecordOutput for Option<&RecordBatch> {
fn record_output(self, bm: &BaselineMetrics) -> Self {
if let Some(record_batch) = &self {
record_batch.record_output(bm);
}
self
}
}

impl RecordOutput for Option<RecordBatch> {
fn record_output(self, bm: &BaselineMetrics) -> Self {
if let Some(record_batch) = &self {
record_batch.record_output(bm);
}
self
}
}

impl RecordOutput for arrow::error::Result<RecordBatch> {
fn record_output(self, bm: &BaselineMetrics) -> Self {
if let Ok(record_batch) = &self {
record_batch.record_output(bm);
}
self
}
}
30 changes: 25 additions & 5 deletions datafusion/src/physical_plan/metrics/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
use std::{borrow::Cow, sync::Arc};

use super::{Count, ExecutionPlanMetricsSet, Label, Metric, MetricValue, Time};
use super::{
Count, ExecutionPlanMetricsSet, Label, Metric, MetricValue, Time, Timestamp,
};

/// Structure for constructing metrics, counters, timers, etc.
///
Expand Down Expand Up @@ -124,12 +126,12 @@ impl<'a> MetricBuilder<'a> {
count
}

/// Consume self and create a new Timer for recording the overall cpu time
/// spent by an operator
pub fn cpu_time(self, partition: usize) -> Time {
/// Consume self and create a new Timer for recording the elapsed
/// CPU time spent by an operator
pub fn elapsed_compute(self, partition: usize) -> Time {
let time = Time::new();
self.with_partition(partition)
.build(MetricValue::CPUTime(time.clone()));
.build(MetricValue::ElapsedCompute(time.clone()));
time
}

Expand All @@ -147,4 +149,22 @@ impl<'a> MetricBuilder<'a> {
});
time
}

/// Consumes self and creates a new Timestamp for recording the
/// starting time of execution for a partition
pub fn start_timestamp(self, partition: usize) -> Timestamp {
let timestamp = Timestamp::new();
self.with_partition(partition)
.build(MetricValue::StartTimestamp(timestamp.clone()));
timestamp
}

/// Consumes self and creates a new Timestamp for recording the
/// ending time of execution for a partition
pub fn end_timestamp(self, partition: usize) -> Timestamp {
let timestamp = Timestamp::new();
self.with_partition(partition)
.build(MetricValue::EndTimestamp(timestamp.clone()));
timestamp
}
}
Loading

0 comments on commit d31c157

Please sign in to comment.