Skip to content

Commit

Permalink
Add spill_count and spilled_bytes to baseline metrics, test sort with…
Browse files Browse the repository at this point in the history
… spill of metrics (#1641)
  • Loading branch information
yjshen authored Jan 22, 2022
1 parent af8786e commit 1c63759
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 27 deletions.
24 changes: 24 additions & 0 deletions datafusion/src/physical_plan/metrics/baseline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ pub struct BaselineMetrics {
/// amount of time the operator was actively trying to use the CPU
elapsed_compute: Time,

/// count of spills during the execution of the operator
spill_count: Count,

/// total spilled bytes during the execution of the operator
spilled_bytes: Count,

/// output rows: the total output rows
output_rows: Count,
}
Expand All @@ -63,6 +69,8 @@ impl BaselineMetrics {
Self {
end_time: MetricBuilder::new(metrics).end_timestamp(partition),
elapsed_compute: MetricBuilder::new(metrics).elapsed_compute(partition),
spill_count: MetricBuilder::new(metrics).spill_count(partition),
spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
output_rows: MetricBuilder::new(metrics).output_rows(partition),
}
}
Expand All @@ -72,6 +80,22 @@ impl BaselineMetrics {
&self.elapsed_compute
}

/// return the metric for the total number of spills triggered during execution
pub fn spill_count(&self) -> &Count {
&self.spill_count
}

/// return the metric for the total spilled bytes during execution
pub fn spilled_bytes(&self) -> &Count {
&self.spilled_bytes
}

/// Record a spill of `spilled_bytes` size.
pub fn record_spill(&self, spilled_bytes: usize) {
self.spill_count.add(1);
self.spilled_bytes.add(spilled_bytes);
}

/// return the metric for the total number of output rows produced
pub fn output_rows(&self) -> &Count {
&self.output_rows
Expand Down
18 changes: 18 additions & 0 deletions datafusion/src/physical_plan/metrics/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,24 @@ impl<'a> MetricBuilder<'a> {
count
}

/// Consume self and create a new counter for recording the number of spills
/// triggered by an operator
pub fn spill_count(self, partition: usize) -> Count {
let count = Count::new();
self.with_partition(partition)
.build(MetricValue::SpillCount(count.clone()));
count
}

/// Consume self and create a new counter for recording the total spilled bytes
/// triggered by an operator
pub fn spilled_bytes(self, partition: usize) -> Count {
let count = Count::new();
self.with_partition(partition)
.build(MetricValue::SpilledBytes(count.clone()));
count
}

/// Consumes self and creates a new [`Count`] for recording some
/// arbitrary metric of an operator.
pub fn counter(
Expand Down
14 changes: 14 additions & 0 deletions datafusion/src/physical_plan/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,20 @@ impl MetricsSet {
.map(|v| v.as_usize())
}

/// convenience: return the count of spills, aggregated
/// across partitions or None if no metric is present
pub fn spill_count(&self) -> Option<usize> {
self.sum(|metric| matches!(metric.value(), MetricValue::SpillCount(_)))
.map(|v| v.as_usize())
}

/// convenience: return the total byte size of spills, aggregated
/// across partitions or None if no metric is present
pub fn spilled_bytes(&self) -> Option<usize> {
self.sum(|metric| matches!(metric.value(), MetricValue::SpilledBytes(_)))
.map(|v| v.as_usize())
}

/// convenience: return the amount of elapsed CPU time spent,
/// aggregated across partitions or None if no metric is present
pub fn elapsed_compute(&self) -> Option<usize> {
Expand Down
27 changes: 22 additions & 5 deletions datafusion/src/physical_plan/metrics/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ pub enum MetricValue {
/// classical defintion of "cpu_time", which is the time reported
/// from `clock_gettime(CLOCK_THREAD_CPUTIME_ID, ..)`.
ElapsedCompute(Time),
/// Number of spills produced: "spill_count" metric
SpillCount(Count),
/// Total size of spilled bytes produced: "spilled_bytes" metric
SpilledBytes(Count),
/// Operator defined count.
Count {
/// The provided name of this metric
Expand All @@ -308,6 +312,8 @@ impl MetricValue {
pub fn name(&self) -> &str {
match self {
Self::OutputRows(_) => "output_rows",
Self::SpillCount(_) => "spill_count",
Self::SpilledBytes(_) => "spilled_bytes",
Self::ElapsedCompute(_) => "elapsed_compute",
Self::Count { name, .. } => name.borrow(),
Self::Time { name, .. } => name.borrow(),
Expand All @@ -320,6 +326,8 @@ impl MetricValue {
pub fn as_usize(&self) -> usize {
match self {
Self::OutputRows(count) => count.value(),
Self::SpillCount(count) => count.value(),
Self::SpilledBytes(bytes) => bytes.value(),
Self::ElapsedCompute(time) => time.value(),
Self::Count { count, .. } => count.value(),
Self::Time { time, .. } => time.value(),
Expand All @@ -339,6 +347,8 @@ impl MetricValue {
pub fn new_empty(&self) -> Self {
match self {
Self::OutputRows(_) => Self::OutputRows(Count::new()),
Self::SpillCount(_) => Self::SpillCount(Count::new()),
Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
Self::Count { name, .. } => Self::Count {
name: name.clone(),
Expand All @@ -365,6 +375,8 @@ impl MetricValue {
pub fn aggregate(&mut self, other: &Self) {
match (self, other) {
(Self::OutputRows(count), Self::OutputRows(other_count))
| (Self::SpillCount(count), Self::SpillCount(other_count))
| (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
| (
Self::Count { count, .. },
Self::Count {
Expand Down Expand Up @@ -401,10 +413,12 @@ impl MetricValue {
match self {
Self::OutputRows(_) => 0, // show first
Self::ElapsedCompute(_) => 1, // show second
Self::Count { .. } => 2,
Self::Time { .. } => 3,
Self::StartTimestamp(_) => 4, // show timestamps last
Self::EndTimestamp(_) => 5,
Self::SpillCount(_) => 2,
Self::SpilledBytes(_) => 3,
Self::Count { .. } => 4,
Self::Time { .. } => 5,
Self::StartTimestamp(_) => 6, // show timestamps last
Self::EndTimestamp(_) => 7,
}
}

Expand All @@ -418,7 +432,10 @@ impl std::fmt::Display for MetricValue {
/// Prints the value of this metric
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::OutputRows(count) | Self::Count { count, .. } => {
Self::OutputRows(count)
| Self::SpillCount(count)
| Self::SpilledBytes(count)
| Self::Count { count, .. } => {
write!(f, "{}", count)
}
Self::ElapsedCompute(time) | Self::Time { time, .. } => {
Expand Down
Loading

0 comments on commit 1c63759

Please sign in to comment.