Skip to content

Commit

Permalink
Improve StepBucketHistogram (#3793)
Browse files Browse the repository at this point in the history
* eliminate null check on noValue
* support exporting cumulative buckets
* fix explicit histogram buckets (SLO), was zero previously
  • Loading branch information
lenin-jaganathan authored May 1, 2023
1 parent eb54740 commit c85a754
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ static io.micrometer.core.instrument.distribution.Histogram getHistogram(Clock c
.merge(distributionStatisticConfig), true, false);
}
else if (AggregationTemporality.isDelta(aggregationTemporality) && stepMillis > 0) {
return new StepBucketHistogram(clock, stepMillis, distributionStatisticConfig, true);
return new StepBucketHistogram(clock, stepMillis, distributionStatisticConfig, true, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ void timerWithHistogram() {
timer.record(111, TimeUnit.MILLISECONDS);

HistogramDataPoint histogramDataPoint = writeToMetric(timer).getHistogram().getDataPoints(0);
assertThat(histogramDataPoint.getExplicitBoundsCount()).isZero();
assertThat(histogramDataPoint.getExplicitBoundsCount()).isEqualTo(4);
this.stepOverNStep(1);
assertHistogram(writeToMetric(timer), TimeUnit.MINUTES.toNanos(1), TimeUnit.MINUTES.toNanos(2), "milliseconds",
3, 198, 111);
Expand Down Expand Up @@ -243,7 +243,7 @@ void distributionSummaryWithHistogram() {
assertHistogram(writeToMetric(ds), 0, TimeUnit.MINUTES.toNanos(1), "bytes", 0, 0, 0);

HistogramDataPoint histogramDataPoint = writeToMetric(ds).getHistogram().getDataPoints(0);
assertThat(histogramDataPoint.getExplicitBoundsCount()).isZero();
assertThat(histogramDataPoint.getExplicitBoundsCount()).isEqualTo(4);
this.stepOverNStep(1);
assertHistogram(writeToMetric(ds), TimeUnit.MINUTES.toNanos(1), TimeUnit.MINUTES.toNanos(2), "bytes", 3, 198,
111);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.micrometer.core.instrument.distribution;

import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLongArray;

class FixedBoundaryHistogram {
Expand All @@ -24,9 +25,12 @@ class FixedBoundaryHistogram {

private final double[] buckets;

FixedBoundaryHistogram(double[] buckets) {
private final boolean isCumulativeBucketCounts;

FixedBoundaryHistogram(double[] buckets, boolean isCumulativeBucketCounts) {
this.buckets = buckets;
this.values = new AtomicLongArray(buckets.length);
this.isCumulativeBucketCounts = isCumulativeBucketCounts;
}

long countAtValue(double value) {
Expand Down Expand Up @@ -68,4 +72,28 @@ else if (buckets[mid] > key)
return low < buckets.length ? low : -1;
}

Iterator<CountAtBucket> countsAtValues(Iterator<Double> values) {
return new Iterator<CountAtBucket>() {
private double cumulativeCount = 0.0;

@Override
public boolean hasNext() {
return values.hasNext();
}

@Override
public CountAtBucket next() {
double value = values.next();
double count = countAtValue(value);
if (isCumulativeBucketCounts) {
cumulativeCount += count;
return new CountAtBucket(value, cumulativeCount);
}
else {
return new CountAtBucket(value, count);
}
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.micrometer.core.instrument.config.InvalidConfigurationException;
import io.micrometer.core.instrument.step.StepValue;

import java.util.Arrays;
import java.util.Iterator;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.function.Supplier;
Expand All @@ -32,26 +34,18 @@
*/
public class StepBucketHistogram extends StepValue<CountAtBucket[]> implements Histogram {

private static final CountAtBucket[] EMPTY_COUNTS = new CountAtBucket[0];

private final FixedBoundaryHistogram fixedBoundaryHistogram;

private final double[] buckets;

public StepBucketHistogram(Clock clock, long stepMillis, DistributionStatisticConfig distributionStatisticConfig,
boolean supportsAggregablePercentiles) {
super(clock, stepMillis);
boolean supportsAggregablePercentiles, boolean isCumulativeBucketCounts) {
super(clock, stepMillis, getEmptyCounts(
getBucketsFromDistributionStatisticConfig(distributionStatisticConfig, supportsAggregablePercentiles)));

if (distributionStatisticConfig.getMaximumExpectedValueAsDouble() == null
|| distributionStatisticConfig.getMinimumExpectedValueAsDouble() == null) {
throw new InvalidConfigurationException(
"minimumExpectedValue and maximumExpectedValue should be greater than 0.");
}
NavigableSet<Double> histogramBuckets = distributionStatisticConfig
.getHistogramBuckets(supportsAggregablePercentiles);

this.buckets = histogramBuckets.stream().filter(Objects::nonNull).mapToDouble(Double::doubleValue).toArray();
this.fixedBoundaryHistogram = new FixedBoundaryHistogram(buckets);
this.buckets = getBucketsFromDistributionStatisticConfig(distributionStatisticConfig,
supportsAggregablePercentiles);
this.fixedBoundaryHistogram = new FixedBoundaryHistogram(buckets, isCumulativeBucketCounts);
}

@Override
Expand All @@ -74,8 +68,10 @@ protected Supplier<CountAtBucket[]> valueSupplier() {
return () -> {
CountAtBucket[] countAtBuckets = new CountAtBucket[buckets.length];
synchronized (fixedBoundaryHistogram) {
for (int i = 0; i < buckets.length; i++) {
countAtBuckets[i] = new CountAtBucket(buckets[i], fixedBoundaryHistogram.countAtValue(buckets[i]));
final Iterator<CountAtBucket> iterator = fixedBoundaryHistogram
.countsAtValues(Arrays.stream(buckets).iterator());
for (int i = 0; i < countAtBuckets.length; i++) {
countAtBuckets[i] = iterator.next();
}
fixedBoundaryHistogram.reset();
}
Expand All @@ -85,13 +81,28 @@ protected Supplier<CountAtBucket[]> valueSupplier() {

@Override
protected CountAtBucket[] noValue() {
if (buckets == null)
return EMPTY_COUNTS;
return getEmptyCounts(buckets);
}

private static CountAtBucket[] getEmptyCounts(double[] buckets) {
CountAtBucket[] countAtBuckets = new CountAtBucket[buckets.length];
for (int i = 0; i < buckets.length; i++) {
countAtBuckets[i] = new CountAtBucket(buckets[i], 0);
}
return countAtBuckets;
}

private static double[] getBucketsFromDistributionStatisticConfig(
DistributionStatisticConfig distributionStatisticConfig, boolean supportsAggregablePercentiles) {
if (distributionStatisticConfig.getMaximumExpectedValueAsDouble() == null
|| distributionStatisticConfig.getMinimumExpectedValueAsDouble() == null) {
throw new InvalidConfigurationException(
"minimumExpectedValue and maximumExpectedValue should be greater than 0.");
}
NavigableSet<Double> histogramBuckets = distributionStatisticConfig
.getHistogramBuckets(supportsAggregablePercentiles);

return histogramBuckets.stream().filter(Objects::nonNull).mapToDouble(Double::doubleValue).toArray();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class TimeWindowFixedBoundaryHistogram extends AbstractTimeWindowHistogra

private final double[] buckets;

private final boolean cumulativeBucketCounts;
private final boolean isCumulativeBucketCounts;

public TimeWindowFixedBoundaryHistogram(Clock clock, DistributionStatisticConfig config,
boolean supportsAggregablePercentiles) {
Expand All @@ -48,14 +48,14 @@ public TimeWindowFixedBoundaryHistogram(Clock clock, DistributionStatisticConfig
* @param clock clock
* @param config distribution statistic configuration
* @param supportsAggregablePercentiles whether it supports aggregable percentiles
* @param cumulativeBucketCounts whether it uses cumulative bucket counts
* @param isCumulativeBucketCounts whether it uses cumulative bucket counts
* @since 1.9.0
*/
public TimeWindowFixedBoundaryHistogram(Clock clock, DistributionStatisticConfig config,
boolean supportsAggregablePercentiles, boolean cumulativeBucketCounts) {
boolean supportsAggregablePercentiles, boolean isCumulativeBucketCounts) {
super(clock, config, FixedBoundaryHistogram.class, supportsAggregablePercentiles);

this.cumulativeBucketCounts = cumulativeBucketCounts;
this.isCumulativeBucketCounts = isCumulativeBucketCounts;

NavigableSet<Double> histogramBuckets = distributionStatisticConfig
.getHistogramBuckets(supportsAggregablePercentiles);
Expand All @@ -71,7 +71,7 @@ public TimeWindowFixedBoundaryHistogram(Clock clock, DistributionStatisticConfig

@Override
FixedBoundaryHistogram newBucket() {
return new FixedBoundaryHistogram(this.buckets);
return new FixedBoundaryHistogram(this.buckets, isCumulativeBucketCounts);
}

@Override
Expand Down Expand Up @@ -114,27 +114,7 @@ void resetAccumulatedHistogram() {
*/
@Override
Iterator<CountAtBucket> countsAtValues(Iterator<Double> values) {
return new Iterator<CountAtBucket>() {
private double cumulativeCount = 0.0;

@Override
public boolean hasNext() {
return values.hasNext();
}

@Override
public CountAtBucket next() {
double value = values.next();
double count = currentHistogram().countAtValue(value);
if (cumulativeBucketCounts) {
cumulativeCount += count;
return new CountAtBucket(value, cumulativeCount);
}
else {
return new CountAtBucket(value, count);
}
}
};
return currentHistogram().countsAtValues(values);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.micrometer.core.instrument.step;

import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.Clock;

import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -34,13 +35,18 @@ public abstract class StepValue<V> {

private final long stepMillis;

private AtomicLong lastInitPos;
private final AtomicLong lastInitPos;

private volatile V previous = noValue();
private volatile V previous;

public StepValue(final Clock clock, final long stepMillis) {
this(clock, stepMillis, null);
}

protected StepValue(final Clock clock, final long stepMillis, @Nullable final V initValue) {
this.clock = clock;
this.stepMillis = stepMillis;
this.previous = initValue == null ? noValue() : initValue;
lastInitPos = new AtomicLong(clock.wallTime() / stepMillis);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,21 @@ private static DistributionStatisticConfig getConfig(boolean percentilesHistogra
void aggregablePercentilesTrue_AddsBuckets() {
boolean supportsAggregablePercentiles = true;
try (StepBucketHistogram histogram = new StepBucketHistogram(clock, step.toMillis(),
distributionStatisticConfig, supportsAggregablePercentiles)) {
// TODO I don't think we should need this to have the correct buckets (even if
// their counts are all 0)
distributionStatisticConfig, supportsAggregablePercentiles, true)) {
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).isNotEmpty();
clock.add(step);
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts().length).isGreaterThan(0);
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).isNotEmpty();
}
}

@Test
void aggregablePercentilesFalse_NoBuckets() {
boolean supportsAggregablePercentiles = false;
try (StepBucketHistogram histogram = new StepBucketHistogram(clock, step.toMillis(),
distributionStatisticConfig, supportsAggregablePercentiles)) {
// TODO I don't think we should need this to have the correct buckets (even if
// their counts are all 0)
distributionStatisticConfig, supportsAggregablePercentiles, true)) {
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).isEmpty();
clock.add(step);
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts().length).isZero();
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).isEmpty();
}
}

Expand All @@ -70,9 +68,8 @@ void sloWithAggregablePercentilesFalse_onlySloBucket() {
.build()
.merge(distributionStatisticConfig);
try (StepBucketHistogram histogram = new StepBucketHistogram(clock, step.toMillis(), config,
supportsAggregablePercentiles)) {
// TODO I don't think we should need this to have the correct buckets (even if
// their counts are all 0)
supportsAggregablePercentiles, true)) {
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts().length).isOne();
clock.add(step);
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts().length).isOne();
}
Expand All @@ -88,9 +85,8 @@ void sloWithAggregablePercentilesTrue_sloBucketPlusPercentilesHistogramBuckets()
.build()
.merge(distributionStatisticConfig);
try (StepBucketHistogram histogram = new StepBucketHistogram(clock, step.toMillis(), config,
supportsAggregablePercentiles)) {
// TODO I don't think we should need this to have the correct buckets (even if
// their counts are all 0)
supportsAggregablePercentiles, true)) {
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).contains(new CountAtBucket(slo, 0));
clock.add(step);
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).contains(new CountAtBucket(slo, 0));
}
Expand All @@ -106,9 +102,8 @@ void sloWithPercentileHistogramFalse_onlySloBucket() {
.build()
.merge(getConfig(false));
try (StepBucketHistogram histogram = new StepBucketHistogram(clock, step.toMillis(), config,
supportsAggregablePercentiles)) {
// TODO I don't think we should need this to have the correct buckets (even if
// their counts are all 0)
supportsAggregablePercentiles, true)) {
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).contains(new CountAtBucket(slo, 0));
clock.add(step);
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).contains(new CountAtBucket(slo, 0));
}
Expand All @@ -123,7 +118,7 @@ void bucketCountRollover() {
.build()
.merge(distributionStatisticConfig);
try (StepBucketHistogram histogram = new StepBucketHistogram(clock, step.toMillis(), config,
supportsAggregablePercentiles)) {
supportsAggregablePercentiles, false)) {
histogram.recordDouble(slo - 1);
histogram.recordDouble(slo);
histogram.recordDouble(slo + 1);
Expand All @@ -142,13 +137,49 @@ void bucketCountRollover() {
}
}

@Test
void bucketCountRolloverCumulativeBucket() {
boolean supportsAggregablePercentiles = true;
double slo = 15.0;
DistributionStatisticConfig config = DistributionStatisticConfig.builder()
.serviceLevelObjectives(slo)
.build()
.merge(distributionStatisticConfig);
try (StepBucketHistogram histogram = new StepBucketHistogram(clock, step.toMillis(), config,
supportsAggregablePercentiles, true)) {
histogram.recordDouble(slo - 1);
histogram.recordDouble(slo);
histogram.recordDouble(slo + 1);
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).allMatch(bucket -> bucket.count() == 0);
clock.add(step);
Arrays.stream(histogram.takeSnapshot(0, 0, 0).histogramCounts()).forEach(bucket -> {
// In case of cumulative buckets, value at bucket denotes 0 to bucket and
// not between 2 buckets.
if (bucket.bucket() < slo - 1) {
assertThat(bucket.count()).isZero();
}
else if (bucket.bucket() == slo - 1) {
assertThat(bucket.count()).isOne();
}
else if (bucket.bucket() == slo) {
assertThat(bucket.count()).isEqualTo(2);
}
else {
assertThat(bucket.count()).isEqualTo(3);
}
});
clock.add(step);
assertThat(histogram.takeSnapshot(0, 0, 0).histogramCounts()).allMatch(bucket -> bucket.count() == 0);
}
}

@Test
void doesNotSupportPercentiles() {
DistributionStatisticConfig config = DistributionStatisticConfig.builder()
.percentiles(0.5, 0.9)
.build()
.merge(distributionStatisticConfig);
try (StepBucketHistogram histogram = new StepBucketHistogram(clock, step.toMillis(), config, false)) {
try (StepBucketHistogram histogram = new StepBucketHistogram(clock, step.toMillis(), config, false, true)) {
histogram.recordDouble(10.0);
clock.add(step);
assertThat(histogram.takeSnapshot(0, 0, 0).percentileValues()).isEmpty();
Expand Down

0 comments on commit c85a754

Please sign in to comment.