Skip to content

Commit

Permalink
MQE: add support for sort and sort_desc (#10487)
Browse files Browse the repository at this point in the history
* Enable upstream test cases

* Add test cases

* Implement `sort` / `sort_desc`

* Add benchmark

* Add `sort` and `sort_desc` to TestFunctionDeduplicateAndMerge

* Simplify implementation
  • Loading branch information
charleskorn authored Feb 5, 2025
1 parent 2a3642d commit d0ed3eb
Show file tree
Hide file tree
Showing 6 changed files with 311 additions and 24 deletions.
4 changes: 4 additions & 0 deletions pkg/streamingpromql/benchmarks/benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ func TestCases(metricSizes []int) []BenchCase {
//{
// Expr: "label_join(a_X, 'l2', '-', 'l', 'l')",
//},
{
Expr: "sort(a_X)",
InstantQueryOnly: true,
},
// Simple aggregations.
{
Expr: "sum(a_X)",
Expand Down
30 changes: 30 additions & 0 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,34 @@ func TimestampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTr
return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil
}

func SortOperatorFactory(descending bool) InstantVectorFunctionOperatorFactory {
functionName := "sort"

if descending {
functionName = "sort_desc"
}

return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", functionName, len(args))
}

inner, ok := args[0].(types.InstantVectorOperator)
if !ok {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected an instant vector for 1st argument for %s, got %T", functionName, args[0])
}

if timeRange.StepCount != 1 {
// If this is a range query, sort / sort_desc have no effect, so we might as well just skip straight to the inner operator.
return inner, nil
}

return functions.NewSort(inner, descending, memoryConsumptionTracker, expressionPosition), nil
}
}

// These functions return an instant-vector.
var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOperatorFactory{
// Please keep this list sorted alphabetically.
Expand Down Expand Up @@ -482,6 +510,8 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"sgn": InstantVectorTransformationFunctionOperatorFactory("sgn", functions.Sgn),
"sin": InstantVectorTransformationFunctionOperatorFactory("sin", functions.Sin),
"sinh": InstantVectorTransformationFunctionOperatorFactory("sinh", functions.Sinh),
"sort": SortOperatorFactory(false),
"sort_desc": SortOperatorFactory(true),
"sqrt": InstantVectorTransformationFunctionOperatorFactory("sqrt", functions.Sqrt),
"sum_over_time": FunctionOverRangeVectorOperatorFactory("sum_over_time", functions.SumOverTime),
"tan": InstantVectorTransformationFunctionOperatorFactory("tan", functions.Tan),
Expand Down
2 changes: 2 additions & 0 deletions pkg/streamingpromql/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func TestFunctionDeduplicateAndMerge(t *testing.T) {
"sgn": `sgn({__name__=~"float.*"})`,
"sin": `sin({__name__=~"float.*"})`,
"sinh": `sinh({__name__=~"float.*"})`,
"sort": `<skip>`, // sort() and sort_desc() don't drop the metric name, so this test doesn't apply.
"sort_desc": `<skip>`, // sort() and sort_desc() don't drop the metric name, so this test doesn't apply.
"sqrt": `sqrt({__name__=~"float.*"})`,
"sum_over_time": `sum_over_time({__name__=~"float.*"}[1m])`,
"tan": `tan({__name__=~"float.*"})`,
Expand Down
165 changes: 165 additions & 0 deletions pkg/streamingpromql/operators/functions/sort.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// SPDX-License-Identifier: AGPL-3.0-only

package functions

import (
"context"
"fmt"
"math"
"sort"

"github.com/prometheus/prometheus/promql/parser/posrange"

"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

type Sort struct {
Inner types.InstantVectorOperator
Descending bool
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker

expressionPosition posrange.PositionRange

allData []types.InstantVectorSeriesData // Series data, in the order to be returned
seriesReturned int // Number of series already returned by NextSeries
}

var _ types.InstantVectorOperator = &Sort{}

func NewSort(
inner types.InstantVectorOperator,
descending bool,
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
expressionPosition posrange.PositionRange,
) *Sort {
return &Sort{
Inner: inner,
Descending: descending,
MemoryConsumptionTracker: memoryConsumptionTracker,
expressionPosition: expressionPosition,
}
}

func (s *Sort) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
allSeries, err := s.Inner.SeriesMetadata(ctx)
if err != nil {
return nil, err
}

s.allData = make([]types.InstantVectorSeriesData, len(allSeries))

for idx := range allSeries {
d, err := s.Inner.NextSeries(ctx)
if err != nil {
return nil, err
}

pointCount := len(d.Floats) + len(d.Histograms)

if pointCount > 1 {
return nil, fmt.Errorf("expected series %v to have at most one point, but it had %v", allSeries[idx], pointCount)
}

s.allData[idx] = d
}

if s.Descending {
sort.Sort(&sortDescending{data: s.allData, series: allSeries})
} else {
sort.Sort(&sortAscending{data: s.allData, series: allSeries})
}

return allSeries, nil
}

type sortAscending struct {
data []types.InstantVectorSeriesData
series []types.SeriesMetadata
}

func (s *sortAscending) Len() int {
return len(s.data)
}

func (s *sortAscending) Less(idx1, idx2 int) bool {
v1 := getValueForSorting(s.data, idx1)
v2 := getValueForSorting(s.data, idx2)

// NaNs always sort to the end of the list, regardless of the sort order.
if math.IsNaN(v1) {
return false
} else if math.IsNaN(v2) {
return true
}

return v1 < v2
}

func (s *sortAscending) Swap(i, j int) {
s.data[i], s.data[j] = s.data[j], s.data[i]
s.series[i], s.series[j] = s.series[j], s.series[i]
}

type sortDescending struct {
data []types.InstantVectorSeriesData
series []types.SeriesMetadata
}

func (s *sortDescending) Len() int {
return len(s.data)
}

func (s *sortDescending) Less(idx1, idx2 int) bool {
v1 := getValueForSorting(s.data, idx1)
v2 := getValueForSorting(s.data, idx2)

// NaNs always sort to the end of the list, regardless of the sort order.
if math.IsNaN(v1) {
return false
} else if math.IsNaN(v2) {
return true
}

return v1 > v2
}

func (s *sortDescending) Swap(i, j int) {
s.data[i], s.data[j] = s.data[j], s.data[i]
s.series[i], s.series[j] = s.series[j], s.series[i]
}

func getValueForSorting(allData []types.InstantVectorSeriesData, seriesIdx int) float64 {
series := allData[seriesIdx]

if len(series.Floats) == 1 {
return series.Floats[0].F
}

if len(series.Histograms) == 1 {
return series.Histograms[0].H.Sum
}

return 0 // The value we use for empty series doesn't matter, as long as we're consistent: we'll still return an empty set of data in NextSeries().
}

func (s *Sort) NextSeries(_ context.Context) (types.InstantVectorSeriesData, error) {
if s.seriesReturned >= len(s.allData) {
return types.InstantVectorSeriesData{}, types.EOS
}

data := s.allData[s.seriesReturned]
s.seriesReturned++

return data, nil
}

func (s *Sort) ExpressionPosition() posrange.PositionRange {
return s.expressionPosition
}

func (s *Sort) Close() {
s.Inner.Close()

// We don't need to do anything with s.allData here: we passed ownership of the data to the calling operator when we returned it in NextSeries.
}
90 changes: 89 additions & 1 deletion pkg/streamingpromql/testdata/ours/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,8 @@ eval range from 0 to 7m step 1m last_over_time(some_metric_count[3m])
some_metric_count{env="prod", cluster="us"} _ _ _ 0 2 4 6 8
some_metric_count{env="prod", cluster="au"} _ _ _ {{count:5}} {{count:10}} {{count:15}} {{count:20}} {{count:25}}

clear

# Test time-related functions.
load 5m
histogram_sample {{schema:0 sum:1 count:1}}
Expand Down Expand Up @@ -851,4 +853,90 @@ load 30s
metric 3 20 4 7 80

eval range from 0 to 2m step 1m predict_linear(metric[1m1s], scalar(prediction_time))
{} _ 9.75 100
{} _ 9.75 100

clear

# Test sort / sort_desc.
load 1m
test_metric{case="float 1"} 0+10x4
test_metric{case="float 2"} 0+15x4
test_metric{case="float with NaN"} NaN NaN NaN NaN NaN
test_metric{case="float with +Inf"} +Inf +Inf +Inf +Inf +Inf
test_metric{case="float with -Inf"} -Inf -Inf -Inf -Inf -Inf
test_metric{case="histogram 1"} {{count:0 sum:12}} {{count:0 sum:12}} {{count:0 sum:12}} {{count:0 sum:12}} {{count:0 sum:12}}
test_metric{case="histogram 2"} {{count:20 sum:5}} {{count:20 sum:5}} {{count:20 sum:5}} {{count:20 sum:5}} {{count:20 sum:5}}
test_metric{case="histogram with NaN"} {{count:0 sum:NaN}} {{count:0 sum:NaN}} {{count:0 sum:NaN}} {{count:0 sum:NaN}} {{count:0 sum:NaN}}
test_metric{case="histogram with +Inf"} {{count:0 sum:Inf}} {{count:0 sum:Inf}} {{count:0 sum:Inf}} {{count:0 sum:Inf}} {{count:0 sum:Inf}}
test_metric{case="histogram with -Inf"} {{count:0 sum:-Inf}} {{count:0 sum:-Inf}} {{count:0 sum:-Inf}} {{count:0 sum:-Inf}} {{count:0 sum:-Inf}}

# Sorting of identical values is not stable, so we exclude those from these test cases and check them below.
eval_ordered instant at 1m sort(test_metric{case!~".*(NaN|Inf)"})
test_metric{case="histogram 2"} {{count:20 sum:5}}
test_metric{case="float 1"} 10
test_metric{case="histogram 1"} {{count:0 sum:12}}
test_metric{case="float 2"} 15

eval_ordered instant at 1m sort_desc(test_metric{case!~".*(NaN|Inf)"})
test_metric{case="float 2"} 15
test_metric{case="histogram 1"} {{count:0 sum:12}}
test_metric{case="float 1"} 10
test_metric{case="histogram 2"} {{count:20 sum:5}}

eval_ordered instant at 1m sort(test_metric{case=~"float.*"})
test_metric{case="float with -Inf"} -Inf
test_metric{case="float 1"} 10
test_metric{case="float 2"} 15
test_metric{case="float with +Inf"} +Inf
test_metric{case="float with NaN"} NaN

eval_ordered instant at 1m sort_desc(test_metric{case=~"float.*"})
test_metric{case="float with +Inf"} +Inf
test_metric{case="float 2"} 15
test_metric{case="float 1"} 10
test_metric{case="float with -Inf"} -Inf
test_metric{case="float with NaN"} NaN

eval_ordered instant at 1m sort(test_metric{case=~"histogram.*"})
test_metric{case="histogram with -Inf"} {{count:0 sum:-Inf}}
test_metric{case="histogram 2"} {{count:20 sum:5}}
test_metric{case="histogram 1"} {{count:0 sum:12}}
test_metric{case="histogram with +Inf"} {{count:0 sum:Inf}}
test_metric{case="histogram with NaN"} {{count:0 sum:NaN}}

eval_ordered instant at 1m sort_desc(test_metric{case=~"histogram.*"})
test_metric{case="histogram with +Inf"} {{count:0 sum:Inf}}
test_metric{case="histogram 1"} {{count:0 sum:12}}
test_metric{case="histogram 2"} {{count:20 sum:5}}
test_metric{case="histogram with -Inf"} {{count:0 sum:-Inf}}
test_metric{case="histogram with NaN"} {{count:0 sum:NaN}}

# Test the case where some series have no sample at all.
eval_ordered instant at 1m sort(test_metric{case=~"float.*"} > 11)
test_metric{case="float 2"} 15
test_metric{case="float with +Inf"} +Inf

# sort / sort_desc do nothing for range queries.
eval range from 0 to 4m step 1m sort(test_metric)
test_metric{case="float 1"} 0+10x4
test_metric{case="float 2"} 0+15x4
test_metric{case="float with NaN"} NaN NaN NaN NaN NaN
test_metric{case="float with +Inf"} +Inf +Inf +Inf +Inf +Inf
test_metric{case="float with -Inf"} -Inf -Inf -Inf -Inf -Inf
test_metric{case="histogram 1"} {{count:0 sum:12}} {{count:0 sum:12}} {{count:0 sum:12}} {{count:0 sum:12}} {{count:0 sum:12}}
test_metric{case="histogram 2"} {{count:20 sum:5}} {{count:20 sum:5}} {{count:20 sum:5}} {{count:20 sum:5}} {{count:20 sum:5}}
test_metric{case="histogram with NaN"} {{count:0 sum:NaN}} {{count:0 sum:NaN}} {{count:0 sum:NaN}} {{count:0 sum:NaN}} {{count:0 sum:NaN}}
test_metric{case="histogram with +Inf"} {{count:0 sum:Inf}} {{count:0 sum:Inf}} {{count:0 sum:Inf}} {{count:0 sum:Inf}} {{count:0 sum:Inf}}
test_metric{case="histogram with -Inf"} {{count:0 sum:-Inf}} {{count:0 sum:-Inf}} {{count:0 sum:-Inf}} {{count:0 sum:-Inf}} {{count:0 sum:-Inf}}

eval range from 0 to 4m step 1m sort_desc(test_metric)
test_metric{case="float 1"} 0+10x4
test_metric{case="float 2"} 0+15x4
test_metric{case="float with NaN"} NaN NaN NaN NaN NaN
test_metric{case="float with +Inf"} +Inf +Inf +Inf +Inf +Inf
test_metric{case="float with -Inf"} -Inf -Inf -Inf -Inf -Inf
test_metric{case="histogram 1"} {{count:0 sum:12}} {{count:0 sum:12}} {{count:0 sum:12}} {{count:0 sum:12}} {{count:0 sum:12}}
test_metric{case="histogram 2"} {{count:20 sum:5}} {{count:20 sum:5}} {{count:20 sum:5}} {{count:20 sum:5}} {{count:20 sum:5}}
test_metric{case="histogram with NaN"} {{count:0 sum:NaN}} {{count:0 sum:NaN}} {{count:0 sum:NaN}} {{count:0 sum:NaN}} {{count:0 sum:NaN}}
test_metric{case="histogram with +Inf"} {{count:0 sum:Inf}} {{count:0 sum:Inf}} {{count:0 sum:Inf}} {{count:0 sum:Inf}} {{count:0 sum:Inf}}
test_metric{case="histogram with -Inf"} {{count:0 sum:-Inf}} {{count:0 sum:-Inf}} {{count:0 sum:-Inf}} {{count:0 sum:-Inf}} {{count:0 sum:-Inf}}
44 changes: 21 additions & 23 deletions pkg/streamingpromql/testdata/upstream/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -588,29 +588,27 @@ load 5m
http_requests{job="app-server", instance="0", group="canary"} 0+70x10
http_requests{job="app-server", instance="1", group="canary"} 0+80x10

# Unsupported by streaming engine.
# eval_ordered instant at 50m sort(http_requests)
# http_requests{group="production", instance="0", job="api-server"} 100
# http_requests{group="production", instance="1", job="api-server"} 200
# http_requests{group="canary", instance="0", job="api-server"} 300
# http_requests{group="canary", instance="1", job="api-server"} 400
# http_requests{group="production", instance="0", job="app-server"} 500
# http_requests{group="production", instance="1", job="app-server"} 600
# http_requests{group="canary", instance="0", job="app-server"} 700
# http_requests{group="canary", instance="1", job="app-server"} 800
# http_requests{group="canary", instance="2", job="api-server"} NaN

# Unsupported by streaming engine.
# eval_ordered instant at 50m sort_desc(http_requests)
# http_requests{group="canary", instance="1", job="app-server"} 800
# http_requests{group="canary", instance="0", job="app-server"} 700
# http_requests{group="production", instance="1", job="app-server"} 600
# http_requests{group="production", instance="0", job="app-server"} 500
# http_requests{group="canary", instance="1", job="api-server"} 400
# http_requests{group="canary", instance="0", job="api-server"} 300
# http_requests{group="production", instance="1", job="api-server"} 200
# http_requests{group="production", instance="0", job="api-server"} 100
# http_requests{group="canary", instance="2", job="api-server"} NaN
eval_ordered instant at 50m sort(http_requests)
http_requests{group="production", instance="0", job="api-server"} 100
http_requests{group="production", instance="1", job="api-server"} 200
http_requests{group="canary", instance="0", job="api-server"} 300
http_requests{group="canary", instance="1", job="api-server"} 400
http_requests{group="production", instance="0", job="app-server"} 500
http_requests{group="production", instance="1", job="app-server"} 600
http_requests{group="canary", instance="0", job="app-server"} 700
http_requests{group="canary", instance="1", job="app-server"} 800
http_requests{group="canary", instance="2", job="api-server"} NaN

eval_ordered instant at 50m sort_desc(http_requests)
http_requests{group="canary", instance="1", job="app-server"} 800
http_requests{group="canary", instance="0", job="app-server"} 700
http_requests{group="production", instance="1", job="app-server"} 600
http_requests{group="production", instance="0", job="app-server"} 500
http_requests{group="canary", instance="1", job="api-server"} 400
http_requests{group="canary", instance="0", job="api-server"} 300
http_requests{group="production", instance="1", job="api-server"} 200
http_requests{group="production", instance="0", job="api-server"} 100
http_requests{group="canary", instance="2", job="api-server"} NaN

# Tests for sort_by_label/sort_by_label_desc.
clear
Expand Down

0 comments on commit d0ed3eb

Please sign in to comment.