Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: Add support for histogram_fraction #10048

Merged
merged 2 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* [CHANGE] Querier: The `.` pattern in regular expressions in PromQL matches newline characters. With this change regular expressions like `.*` match strings that include `\n`. To maintain the old behaviour, you will have to change regular expressions by replacing all `.` patterns with `[^\n]`, e.g. `foo[^\n]*`. This upgrades PromQL compatibility from Prometheus 2.0 to 3.0. #9844
* [CHANGE] Querier: Lookback and range selectors are left open and right closed (previously left closed and right closed). This change affects queries when the evaluation time perfectly aligns with the sample timestamps. For example assume querying a timeseries with evenly spaced samples exactly 1 minute apart. Previously, a range query with `5m` would usually return 5 samples, or 6 samples if the query evaluation aligns perfectly with a scrape. Now, queries like this will always return 5 samples. This upgrades PromQL compatibility from Prometheus 2.0 to 3.0. #9844
* [CHANGE] Querier: promql(native histograms): Introduce exponential interpolation. #9844
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717 #9719 #9724 #9874 #9929 #9998 #10007 #10010 #10046 #10047
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717 #9719 #9724 #9874 #9929 #9998 #10007 #10010 #10046 #10047 #10048
* [FEATURE] Distributor: Add support for `lz4` OTLP compression. #9763
* [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028
* [FEATURE] Query-frontend: add middleware to control access to specific PromQL experimental functions on a per-tenant basis. #9798
Expand Down
4 changes: 4 additions & 0 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2664,6 +2664,10 @@ func TestCompareVariousMixedMetricsFunctions(t *testing.T) {
labelRegex := strings.Join(labels, "|")
expressions = append(expressions, fmt.Sprintf(`histogram_avg(series{label=~"(%s)"})`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`histogram_count(series{label=~"(%s)"})`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`histogram_fraction(-5, 5, series{label=~"(%s)"})`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`histogram_fraction(0, scalar(series{label="i"}), series{label=~"(%s)"})`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`histogram_fraction(scalar(series{label="i"}), 2, series{label=~"(%s)"})`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`histogram_fraction(scalar(series{label="i"}), scalar(series{label="i"}), series{label=~"(%s)"})`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`histogram_quantile(0.8, series{label=~"(%s)"})`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`histogram_quantile(scalar(series{label="i"}), series{label=~"(%s)"})`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`histogram_sum(series{label=~"(%s)"})`, labelRegex))
Expand Down
39 changes: 37 additions & 2 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func RoundFunctionOperatorFactory() InstantVectorFunctionOperatorFactory {
}
}

func HistogramQuantileOperatorFactory() InstantVectorFunctionOperatorFactory {
func HistogramQuantileFunctionOperatorFactory() InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 2 {
// Should be caught by the PromQL parser, but we check here for safety.
Expand All @@ -310,6 +310,40 @@ func HistogramQuantileOperatorFactory() InstantVectorFunctionOperatorFactory {
}
}

func HistogramFractionFunctionOperatorFactory() InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 3 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 3 argument for histogram_fraction, got %v", len(args))
}

lower, ok := args[0].(types.ScalarOperator)
if !ok {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected a scalar for 1st argument for histogram_fraction, got %T", args[0])
}

upper, ok := args[1].(types.ScalarOperator)
if !ok {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected a scalar for 2nd argument for histogram_fraction, got %T", args[1])
}

inner, ok := args[2].(types.InstantVectorOperator)
if !ok {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected an instant vector for 3rd argument for histogram_fraction, got %T", args[2])
}

f := functions.FunctionOverInstantVectorDefinition{
SeriesDataFunc: functions.HistogramFraction,
SeriesMetadataFunction: functions.DropSeriesName,
}

return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{lower, upper}, memoryConsumptionTracker, f, expressionPosition), nil
}
}

// These functions return an instant-vector.
var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOperatorFactory{
// Please keep this list sorted alphabetically.
Expand All @@ -335,7 +369,8 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"floor": InstantVectorTransformationFunctionOperatorFactory("floor", functions.Floor),
"histogram_avg": InstantVectorTransformationFunctionOperatorFactory("histogram_avg", functions.HistogramAvg),
"histogram_count": InstantVectorTransformationFunctionOperatorFactory("histogram_count", functions.HistogramCount),
"histogram_quantile": HistogramQuantileOperatorFactory(),
"histogram_fraction": HistogramFractionFunctionOperatorFactory(),
"histogram_quantile": HistogramQuantileFunctionOperatorFactory(),
"histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum),
"increase": FunctionOverRangeVectorOperatorFactory("increase", functions.Increase),
"label_replace": LabelReplaceFunctionOperatorFactory(),
Expand Down
37 changes: 37 additions & 0 deletions pkg/streamingpromql/operators/functions/native_histograms.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,43 @@ func HistogramCount(seriesData types.InstantVectorSeriesData, _ []types.ScalarDa
return data, nil
}

func HistogramFraction(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

data := types.InstantVectorSeriesData{
Floats: floats,
}

lower := scalarArgsData[0]
upper := scalarArgsData[1]
// There will always be a scalar at every step of the query.
// However, there may not be a sample at a step. So we need to
// keep track of where we are up to step-wise with the scalars,
// incrementing through the scalars until their timestamp matches
// the samples.
argIdx := 0

for _, histogram := range seriesData.Histograms {
for histogram.T > lower.Samples[argIdx].T {
argIdx++
}
Comment on lines +80 to +82
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing this, couldn't we do QueryTimeRange.PointIndex(histogram.T)? (assuming we passed the time range to this function)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would work. There's a few other places that we need to adopt that too, so I'll do that as a followup if that's okay.

lowerVal := lower.Samples[argIdx].F
upperVal := upper.Samples[argIdx].F

data.Floats = append(data.Floats, promql.FPoint{
T: histogram.T,
F: histogramFraction(lowerVal, upperVal, histogram.H),
})
}

types.PutInstantVectorSeriesData(seriesData, memoryConsumptionTracker)

return data, nil
}

func HistogramSum(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
if err != nil {
Expand Down
134 changes: 134 additions & 0 deletions pkg/streamingpromql/operators/functions/quantile.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,140 @@ func histogramQuantile(q float64, h *histogram.FloatHistogram) float64 {
return -math.Exp2(logUpper + (logLower-logUpper)*(1-fraction))
}

// histogramFraction calculates the fraction of observations between the
// provided lower and upper bounds, based on the provided histogram.
//
// histogramFraction is in a certain way the inverse of histogramQuantile. If
// histogramQuantile(0.9, h) returns 123.4, then histogramFraction(-Inf, 123.4, h)
// returns 0.9.
//
// The same notes with regard to interpolation and assumptions about the zero
// bucket boundaries apply as for histogramQuantile.
//
// Whether either boundary is inclusive or exclusive doesn’t actually matter as
// long as interpolation has to be performed anyway. In the case of a boundary
// coinciding with a bucket boundary, the inclusive or exclusive nature of the
// boundary determines the exact behavior of the threshold. With the current
// implementation, that means that lower is exclusive for positive values and
// inclusive for negative values, while upper is inclusive for positive values
// and exclusive for negative values.
//
// Special cases:
//
// If the histogram has 0 observations, NaN is returned.
//
// Use a lower bound of -Inf to get the fraction of all observations below the
// upper bound.
//
// Use an upper bound of +Inf to get the fraction of all observations above the
// lower bound.
//
// If lower or upper is NaN, NaN is returned.
//
// If lower >= upper and the histogram has at least 1 observation, zero is returned.
func histogramFraction(lower, upper float64, h *histogram.FloatHistogram) float64 {
if h.Count == 0 || math.IsNaN(lower) || math.IsNaN(upper) {
return math.NaN()
}
if lower >= upper {
return 0
}

var (
rank, lowerRank, upperRank float64
lowerSet, upperSet bool
it = h.AllBucketIterator()
)
for it.Next() {
b := it.At()
zeroBucket := false

// interpolateLinearly is used for custom buckets to be
// consistent with the linear interpolation known from classic
// histograms. It is also used for the zero bucket.
interpolateLinearly := func(v float64) float64 {
return rank + b.Count*(v-b.Lower)/(b.Upper-b.Lower)
}

// interpolateExponentially is using the same exponential
// interpolation method as above for histogramQuantile. This
// method is a better fit for exponential bucketing.
interpolateExponentially := func(v float64) float64 {
var (
logLower = math.Log2(math.Abs(b.Lower))
logUpper = math.Log2(math.Abs(b.Upper))
logV = math.Log2(math.Abs(v))
fraction float64
)
if v > 0 {
fraction = (logV - logLower) / (logUpper - logLower)
} else {
fraction = 1 - ((logV - logUpper) / (logLower - logUpper))
}
return rank + b.Count*fraction
}

if b.Lower <= 0 && b.Upper >= 0 {
zeroBucket = true
switch {
case len(h.NegativeBuckets) == 0 && len(h.PositiveBuckets) > 0:
// This is the zero bucket and the histogram has only
// positive buckets. So we consider 0 to be the lower
// bound.
b.Lower = 0
case len(h.PositiveBuckets) == 0 && len(h.NegativeBuckets) > 0:
// This is in the zero bucket and the histogram has only
// negative buckets. So we consider 0 to be the upper
// bound.
b.Upper = 0
}
}
if !lowerSet && b.Lower >= lower {
// We have hit the lower value at the lower bucket boundary.
lowerRank = rank
lowerSet = true
}
if !upperSet && b.Lower >= upper {
// We have hit the upper value at the lower bucket boundary.
upperRank = rank
upperSet = true
}
if lowerSet && upperSet {
break
}
if !lowerSet && b.Lower < lower && b.Upper > lower {
// The lower value is in this bucket.
if h.UsesCustomBuckets() || zeroBucket {
lowerRank = interpolateLinearly(lower)
} else {
lowerRank = interpolateExponentially(lower)
}
lowerSet = true
}
if !upperSet && b.Lower < upper && b.Upper > upper {
// The upper value is in this bucket.
if h.UsesCustomBuckets() || zeroBucket {
upperRank = interpolateLinearly(upper)
} else {
upperRank = interpolateExponentially(upper)
}
upperSet = true
}
if lowerSet && upperSet {
break
}
rank += b.Count
}
if !lowerSet || lowerRank > h.Count {
lowerRank = h.Count
}
if !upperSet || upperRank > h.Count {
upperRank = h.Count
}

return (upperRank - lowerRank) / h.Count
}

// coalesceBuckets merges buckets with the same upper bound.
//
// The input buckets must be sorted.
Expand Down
17 changes: 17 additions & 0 deletions pkg/streamingpromql/testdata/ours/native_histograms.test
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ eval range from 0 to 5m step 1m histogram_avg(single_histogram)
eval range from 0 to 5m step 1m histogram_count(single_histogram)
{} 4 4 4 4 4 7

eval range from 0 to 5m step 1m histogram_fraction(0, 2, single_histogram)
{} 0.75 0.75 0.75 0.75 0.75 1

# histogram_sum extracts the sum property from the histogram.
eval range from 0 to 5m step 1m histogram_sum(single_histogram)
{} 5 5 5 5 5 20
Expand All @@ -40,6 +43,9 @@ eval instant at 3m histogram_avg(mixed_metric)
eval instant at 3m histogram_count(mixed_metric)
{} 4

eval instant at 3m histogram_fraction(0, 1, mixed_metric)
{} 0.25

eval instant at 4m histogram_sum(mixed_metric)
{} 8

Expand All @@ -55,6 +61,9 @@ eval instant at 2m histogram_avg(mixed_metric)
# histogram_count ignores any float values
eval instant at 2m histogram_count(mixed_metric)

# histogram_fraction ignores any float values
eval instant at 2m histogram_fraction(0, 1, mixed_metric)

# histogram_sum ignores any float values
eval instant at 2m histogram_sum(mixed_metric)

Expand All @@ -81,6 +90,11 @@ eval instant at 0 histogram_count(route)
{path="two"} 20
{path="three"} 10

eval instant at 0 histogram_fraction(-10, 20, route)
{path="one"} 1
{path="two"} 1
{path="three"} 1

eval instant at 0 histogram_sum(route)
{path="one"} 5
{path="two"} 10
Expand Down Expand Up @@ -115,6 +129,9 @@ eval range from 0 to 8m step 1m histogram_avg(mixed_metric)
eval range from 0 to 8m step 1m histogram_count(mixed_metric)
{} _ _ _ _ _ _ _ 10 10

eval range from 0 to 8m step 1m histogram_fraction(0, 1, mixed_metric)
{} _ _ _ _ _ _ _ 0.3 0.3

eval range from 0 to 8m step 1m histogram_sum(mixed_metric)
{} _ _ _ _ _ _ _ 18 18

Expand Down
14 changes: 6 additions & 8 deletions pkg/streamingpromql/testdata/upstream/histograms.test
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,13 @@ eval instant at 50m histogram_avg(testhistogram3)

# Test histogram_fraction.

# Unsupported by streaming engine.
# eval instant at 50m histogram_fraction(0, 0.2, testhistogram3)
# {start="positive"} 0.6363636363636364
# {start="negative"} 0
eval instant at 50m histogram_fraction(0, 0.2, testhistogram3)
{start="positive"} 0.6363636363636364
{start="negative"} 0

# Unsupported by streaming engine.
# eval instant at 50m histogram_fraction(0, 0.2, rate(testhistogram3[10m]))
# {start="positive"} 0.6363636363636364
# {start="negative"} 0
eval instant at 50m histogram_fraction(0, 0.2, rate(testhistogram3[10m]))
{start="positive"} 0.6363636363636364
{start="negative"} 0

# In the classic histogram, we can access the corresponding bucket (if
# it exists) and divide by the count to get the same result.
Expand Down
Loading