From aff930a4a0453693622eb246a2b5460423220dd8 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Thu, 12 Sep 2024 15:30:00 +1000 Subject: [PATCH 01/23] Move RequireEqualResults and export for other tests to use --- .../benchmarks/comparison_test.go | 78 +------------------ pkg/streamingpromql/testing.go | 77 ++++++++++++++++++ 2 files changed, 79 insertions(+), 76 deletions(-) diff --git a/pkg/streamingpromql/benchmarks/comparison_test.go b/pkg/streamingpromql/benchmarks/comparison_test.go index f22e10bea0e..26644aab975 100644 --- a/pkg/streamingpromql/benchmarks/comparison_test.go +++ b/pkg/streamingpromql/benchmarks/comparison_test.go @@ -9,7 +9,6 @@ import ( "context" "math" "os" - "slices" "testing" "time" @@ -23,7 +22,6 @@ import ( "github.com/grafana/dskit/user" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/require" @@ -68,7 +66,7 @@ func BenchmarkQuery(b *testing.B) { prometheusResult, prometheusClose := c.Run(ctx, b, start, end, interval, prometheusEngine, q) mimirResult, mimirClose := c.Run(ctx, b, start, end, interval, mimirEngine, q) - requireEqualResults(b, c.Expr, prometheusResult, mimirResult) + streamingpromql.RequireEqualResults(b, c.Expr, prometheusResult, mimirResult) prometheusClose() mimirClose() @@ -109,7 +107,7 @@ func TestBothEnginesReturnSameResultsForBenchmarkQueries(t *testing.T) { prometheusResult, prometheusClose := c.Run(ctx, t, start, end, interval, prometheusEngine, q) mimirResult, mimirClose := c.Run(ctx, t, start, end, interval, mimirEngine, q) - requireEqualResults(t, c.Expr, prometheusResult, mimirResult) + streamingpromql.RequireEqualResults(t, c.Expr, prometheusResult, mimirResult) prometheusClose() mimirClose() @@ -175,72 +173,6 @@ func TestBenchmarkSetup(t *testing.T) { require.Equal(t, 18.4, series.Histograms[0].H.Sum) } -// Why do we do this rather than require.Equal(t, expected, actual)? -// It's possible that floating point values are slightly different due to imprecision, but require.Equal doesn't allow us to set an allowable difference. -func requireEqualResults(t testing.TB, expr string, expected, actual *promql.Result) { - require.Equal(t, expected.Err, actual.Err) - require.Equal(t, expected.Value.Type(), actual.Value.Type()) - - expectedWarnings, expectedInfos := expected.Warnings.AsStrings(expr, 0, 0) - actualWarnings, actualInfos := actual.Warnings.AsStrings(expr, 0, 0) - require.ElementsMatch(t, expectedWarnings, actualWarnings) - require.ElementsMatch(t, expectedInfos, actualInfos) - - switch expected.Value.Type() { - case parser.ValueTypeVector: - expectedVector, err := expected.Vector() - require.NoError(t, err) - actualVector, err := actual.Vector() - require.NoError(t, err) - - // Instant queries don't guarantee any particular sort order, so sort results here so that we can easily compare them. - sortVector(expectedVector) - sortVector(actualVector) - - require.Len(t, actualVector, len(expectedVector)) - - for i, expectedSample := range expectedVector { - actualSample := actualVector[i] - - require.Equal(t, expectedSample.Metric, actualSample.Metric) - require.Equal(t, expectedSample.T, actualSample.T) - require.Equal(t, expectedSample.H, actualSample.H) - if expectedSample.F == 0 { - require.Equal(t, expectedSample.F, actualSample.F) - } else { - require.InEpsilon(t, expectedSample.F, actualSample.F, 1e-10) - } - } - case parser.ValueTypeMatrix: - expectedMatrix, err := expected.Matrix() - require.NoError(t, err) - actualMatrix, err := actual.Matrix() - require.NoError(t, err) - - require.Len(t, actualMatrix, len(expectedMatrix)) - - for i, expectedSeries := range expectedMatrix { - actualSeries := actualMatrix[i] - - require.Equal(t, expectedSeries.Metric, actualSeries.Metric) - require.Equal(t, expectedSeries.Histograms, actualSeries.Histograms) - - for j, expectedPoint := range expectedSeries.Floats { - actualPoint := actualSeries.Floats[j] - - require.Equal(t, expectedPoint.T, actualPoint.T) - if expectedPoint.F == 0 { - require.Equal(t, expectedPoint.F, actualPoint.F) - } else { - require.InEpsilonf(t, expectedPoint.F, actualPoint.F, 1e-10, "expected series %v to have points %v, but result is %v", expectedSeries.Metric.String(), expectedSeries.Floats, actualSeries.Floats) - } - } - } - default: - require.Fail(t, "unexpected value type", "type: %v", expected.Value.Type()) - } -} - func createBenchmarkQueryable(t testing.TB, metricSizes []int) storage.Queryable { addr := os.Getenv("MIMIR_PROMQL_ENGINE_BENCHMARK_INGESTER_ADDR") @@ -317,9 +249,3 @@ type alwaysQueryIngestersConfigProvider struct{} func (a alwaysQueryIngestersConfigProvider) QueryIngestersWithin(string) time.Duration { return time.Duration(math.MaxInt64) } - -func sortVector(v promql.Vector) { - slices.SortFunc(v, func(a, b promql.Sample) int { - return labels.Compare(a.Metric, b.Metric) - }) -} diff --git a/pkg/streamingpromql/testing.go b/pkg/streamingpromql/testing.go index 01ff956bb8d..caaf0eb2298 100644 --- a/pkg/streamingpromql/testing.go +++ b/pkg/streamingpromql/testing.go @@ -4,9 +4,14 @@ package streamingpromql import ( "math" + "testing" "time" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" ) func NewTestEngineOpts() EngineOpts { @@ -24,3 +29,75 @@ func NewTestEngineOpts() EngineOpts { Pedantic: true, } } + +func sortVector(v promql.Vector) { + slices.SortFunc(v, func(a, b promql.Sample) int { + return labels.Compare(a.Metric, b.Metric) + }) +} + +// Why do we do this rather than require.Equal(t, expected, actual)? +// It's possible that floating point values are slightly different due to imprecision, but require.Equal doesn't allow us to set an allowable difference. +func RequireEqualResults(t testing.TB, expr string, expected, actual *promql.Result) { + require.Equal(t, expected.Err, actual.Err) + require.Equal(t, expected.Value.Type(), actual.Value.Type()) + + expectedWarnings, expectedInfos := expected.Warnings.AsStrings(expr, 0, 0) + actualWarnings, actualInfos := actual.Warnings.AsStrings(expr, 0, 0) + require.ElementsMatch(t, expectedWarnings, actualWarnings) + require.ElementsMatch(t, expectedInfos, actualInfos) + + switch expected.Value.Type() { + case parser.ValueTypeVector: + expectedVector, err := expected.Vector() + require.NoError(t, err) + actualVector, err := actual.Vector() + require.NoError(t, err) + + // Instant queries don't guarantee any particular sort order, so sort results here so that we can easily compare them. + sortVector(expectedVector) + sortVector(actualVector) + + require.Len(t, actualVector, len(expectedVector)) + + for i, expectedSample := range expectedVector { + actualSample := actualVector[i] + + require.Equal(t, expectedSample.Metric, actualSample.Metric) + require.Equal(t, expectedSample.T, actualSample.T) + require.Equal(t, expectedSample.H, actualSample.H) + if expectedSample.F == 0 { + require.Equal(t, expectedSample.F, actualSample.F) + } else { + require.InEpsilon(t, expectedSample.F, actualSample.F, 1e-10) + } + } + case parser.ValueTypeMatrix: + expectedMatrix, err := expected.Matrix() + require.NoError(t, err) + actualMatrix, err := actual.Matrix() + require.NoError(t, err) + + require.Len(t, actualMatrix, len(expectedMatrix)) + + for i, expectedSeries := range expectedMatrix { + actualSeries := actualMatrix[i] + + require.Equal(t, expectedSeries.Metric, actualSeries.Metric) + require.Equal(t, expectedSeries.Histograms, actualSeries.Histograms) + + for j, expectedPoint := range expectedSeries.Floats { + actualPoint := actualSeries.Floats[j] + + require.Equal(t, expectedPoint.T, actualPoint.T) + if expectedPoint.F == 0 { + require.Equal(t, expectedPoint.F, actualPoint.F) + } else { + require.InEpsilonf(t, expectedPoint.F, actualPoint.F, 1e-10, "expected series %v to have points %v, but result is %v", expectedSeries.Metric.String(), expectedSeries.Floats, actualSeries.Floats) + } + } + } + default: + require.Fail(t, "unexpected value type", "type: %v", expected.Value.Type()) + } +} From 19fe732d7276cb18c5f7404b4095c771f3d810c3 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Thu, 12 Sep 2024 15:31:12 +1000 Subject: [PATCH 02/23] Expand RequireEqualResults to check histogram floats --- pkg/streamingpromql/testing.go | 131 +++++++++++++++++++++++++++++---- 1 file changed, 116 insertions(+), 15 deletions(-) diff --git a/pkg/streamingpromql/testing.go b/pkg/streamingpromql/testing.go index caaf0eb2298..7655801ec9d 100644 --- a/pkg/streamingpromql/testing.go +++ b/pkg/streamingpromql/testing.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" @@ -30,12 +31,6 @@ func NewTestEngineOpts() EngineOpts { } } -func sortVector(v promql.Vector) { - slices.SortFunc(v, func(a, b promql.Sample) int { - return labels.Compare(a.Metric, b.Metric) - }) -} - // Why do we do this rather than require.Equal(t, expected, actual)? // It's possible that floating point values are slightly different due to imprecision, but require.Equal doesn't allow us to set an allowable difference. func RequireEqualResults(t testing.TB, expr string, expected, actual *promql.Result) { @@ -66,11 +61,7 @@ func RequireEqualResults(t testing.TB, expr string, expected, actual *promql.Res require.Equal(t, expectedSample.Metric, actualSample.Metric) require.Equal(t, expectedSample.T, actualSample.T) require.Equal(t, expectedSample.H, actualSample.H) - if expectedSample.F == 0 { - require.Equal(t, expectedSample.F, actualSample.F) - } else { - require.InEpsilon(t, expectedSample.F, actualSample.F, 1e-10) - } + requireInEpsilonIfNotZero(t, expectedSample.F, actualSample.F) } case parser.ValueTypeMatrix: expectedMatrix, err := expected.Matrix() @@ -84,16 +75,40 @@ func RequireEqualResults(t testing.TB, expr string, expected, actual *promql.Res actualSeries := actualMatrix[i] require.Equal(t, expectedSeries.Metric, actualSeries.Metric) - require.Equal(t, expectedSeries.Histograms, actualSeries.Histograms) for j, expectedPoint := range expectedSeries.Floats { actualPoint := actualSeries.Floats[j] require.Equal(t, expectedPoint.T, actualPoint.T) - if expectedPoint.F == 0 { - require.Equal(t, expectedPoint.F, actualPoint.F) + requireInEpsilonIfNotZero(t, expectedPoint.F, actualPoint.F, "expected series %v to have points %v, but result is %v", expectedSeries.Metric.String(), expectedSeries.Floats, actualSeries.Floats) + } + + for j, expectedPoint := range actualSeries.Histograms { + actualPoint := actualSeries.Histograms[j] + + require.Equal(t, expectedPoint.T, actualPoint.T) + if expectedPoint.H == nil { + require.Equal(t, expectedPoint.H, actualPoint.H) } else { - require.InEpsilonf(t, expectedPoint.F, actualPoint.F, 1e-10, "expected series %v to have points %v, but result is %v", expectedSeries.Metric.String(), expectedSeries.Floats, actualSeries.Floats) + h1 := expectedPoint.H + h2 := actualPoint.H + + require.Equal(t, h1.Schema, h2.Schema, "histogram schemas match") + requireInEpsilonIfNotZero(t, h1.Count, h2.Count, "histogram counts match") + requireInEpsilonIfNotZero(t, h1.Sum, h2.Sum, "histogram sums match") + + if h1.UsesCustomBuckets() { + requireFloatBucketsMatch(t, h1.CustomValues, h2.CustomValues) + } + + requireInEpsilonIfNotZero(t, h1.ZeroThreshold, h2.ZeroThreshold, "histogram thresholds match") + requireInEpsilonIfNotZero(t, h1.ZeroCount, h2.ZeroCount, "histogram zero counts match") + + require.True(t, spansMatch(h1.NegativeSpans, h2.NegativeSpans), "spans match") + requireFloatBucketsMatch(t, h1.NegativeBuckets, h2.NegativeBuckets) + + require.True(t, spansMatch(h1.PositiveSpans, h2.PositiveSpans)) + requireFloatBucketsMatch(t, h1.PositiveBuckets, h2.PositiveBuckets) } } } @@ -101,3 +116,89 @@ func RequireEqualResults(t testing.TB, expr string, expected, actual *promql.Res require.Fail(t, "unexpected value type", "type: %v", expected.Value.Type()) } } + +func requireInEpsilonIfNotZero(t testing.TB, expected, actual float64, msgAndArgs ...interface{}) { + if expected == 0 { + require.Equal(t, expected, actual, msgAndArgs...) + } else { + require.InEpsilon(t, expected, actual, 1e-10, msgAndArgs...) + } +} + +func requireFloatBucketsMatch(t testing.TB, b1, b2 []float64) { + require.Equal(t, len(b1), len(b2), "bucket lengths match") + for i, b := range b1 { + require.InEpsilon(t, b, b2[i], 1e-10, "bucket values match") + } +} + +// Copied from prometheus as it is not exported +// spansMatch returns true if both spans represent the same bucket layout +// after combining zero length spans with the next non-zero length span. +func spansMatch(s1, s2 []histogram.Span) bool { + if len(s1) == 0 && len(s2) == 0 { + return true + } + + s1idx, s2idx := 0, 0 + for { + if s1idx >= len(s1) { + return allEmptySpans(s2[s2idx:]) + } + if s2idx >= len(s2) { + return allEmptySpans(s1[s1idx:]) + } + + currS1, currS2 := s1[s1idx], s2[s2idx] + s1idx++ + s2idx++ + if currS1.Length == 0 { + // This span is zero length, so we add consecutive such spans + // until we find a non-zero span. + for ; s1idx < len(s1) && s1[s1idx].Length == 0; s1idx++ { + currS1.Offset += s1[s1idx].Offset + } + if s1idx < len(s1) { + currS1.Offset += s1[s1idx].Offset + currS1.Length = s1[s1idx].Length + s1idx++ + } + } + if currS2.Length == 0 { + // This span is zero length, so we add consecutive such spans + // until we find a non-zero span. + for ; s2idx < len(s2) && s2[s2idx].Length == 0; s2idx++ { + currS2.Offset += s2[s2idx].Offset + } + if s2idx < len(s2) { + currS2.Offset += s2[s2idx].Offset + currS2.Length = s2[s2idx].Length + s2idx++ + } + } + + if currS1.Length == 0 && currS2.Length == 0 { + // The last spans of both set are zero length. Previous spans match. + return true + } + + if currS1.Offset != currS2.Offset || currS1.Length != currS2.Length { + return false + } + } +} + +func allEmptySpans(s []histogram.Span) bool { + for _, ss := range s { + if ss.Length > 0 { + return false + } + } + return true +} + +func sortVector(v promql.Vector) { + slices.SortFunc(v, func(a, b promql.Sample) int { + return labels.Compare(a.Metric, b.Metric) + }) +} From 173e7abc291b6663bad2c590a91aeabd0ec7ae73 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Thu, 12 Sep 2024 16:21:07 +1000 Subject: [PATCH 03/23] Add test to check various edge cases --- pkg/streamingpromql/engine_test.go | 126 +++++++++++++++++++++++++++++ pkg/streamingpromql/testing.go | 21 +++++ 2 files changed, 147 insertions(+) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 4bbe2ecdabe..65fe2b0b577 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -1682,3 +1682,129 @@ func TestAnnotations(t *testing.T) { }) } } + +func TestCompareVariousMixedMetrics(t *testing.T) { + // Although most tests are covered with the promql test files (both ours and upstream), + // there is a lot of repetition around a few edge cases. + // This is not intended to be comprehensive, but instead check for some common edge cases + // ensuring MQE and Prometheus' engines return the same result when querying: + // - Series with mixed floats and histograms + // - Aggregations with mixed data types + // - Points with NaN + // - Stale markers + // - Look backs + + opts := NewTestEngineOpts() + mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) + require.NoError(t, err) + + prometheusEngine := promql.NewEngine(opts.CommonOpts) + + // We're loading series with the follow combinations of values. This is difficult to visually see in the actual + // data loaded, so it is represented in a table here. + // f = float value, h = native histogram, _ = no value, N = NaN, s = stale + // {a} f f f f f f + // {b} h h h h h h + // {c} f h f h N h + // {d} f _ _ s f f + // {f} h h _ s h N + // {g} f N _ f f N + // {h} N N N N N N + // {i} N N N _ N s + // {j} f h _ N h s + // {k} f f s s s s + // {l} 0 0 0 N s 0 + + pointsPerSeries := 6 + samples := ` + series{label="a", group="a"} 1 2 3 4 5 -50 + series{label="b", group="a"} {{schema:5 sum:15 count:10 buckets:[3 2 5]}} {{schema:5 sum:20 count:15 buckets:[4 5 6]}} {{schema:5 sum:25 count:20 buckets:[5 7 8]}} {{schema:5 sum:30 count:25 buckets:[6 9 10]}} {{schema:5 sum:35 count:30 buckets:[7 10 13]}} {{schema:5 sum:40 count:35 buckets:[8 11 14]}} + series{label="c", group="a"} 1 {{schema:3 sum:5 count:3 buckets:[1 1 1]}} 3 {{schema:3 sum:10 count:6 buckets:[2 2 2]}} NaN {{schema:3 sum:12 count:7 buckets:[2 2 3]}} + series{label="d", group="a"} 1 _ _ stale 5 6 + series{label="e", group="b"} {{schema:4 sum:12 count:8 buckets:[2 3 3]}} {{schema:4 sum:14 count:9 buckets:[3 3 3]}} _ stale {{schema:4 sum:18 count:11 buckets:[4 4 3]}} NaN + series{label="f", group="b"} 1 NaN _ 4 5 NaN + series{label="g", group="b"} NaN NaN NaN NaN NaN NaN + series{label="h", group="b"} NaN NaN NaN _ NaN stale + series{label="i", group="c"} 1 {{schema:5 sum:15 count:10 buckets:[3 2 5]}} _ NaN {{schema:5 sum:30 count:25 buckets:[6 9 10]}} stale + series{label="j", group="c"} 1 -20 stale stale stale stale + series{label="k", group="c"} 0 0 0 NaN stale 0 + ` + + // Labels for generating combinations + labels := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"} + + // Generate combinations of 2 and 3 labels. (e.g., "a,b", "e,f", "c,d,e" etc) + // These will be used for binary operations, so we can add up to 3 series. + labelCombinations := combinations(labels, 2) + labelCombinations = append(labelCombinations, combinations(labels, 3)...) + + expressions := []string{} + + // Binary operations + for _, labels := range labelCombinations { + if len(labels) >= 2 { + for _, op := range []string{"+", "-", "*", "/"} { + binaryExpr := fmt.Sprintf(`series{label="%s"}`, labels[0]) + for _, label := range labels[1:] { + binaryExpr += fmt.Sprintf(` %s series{label="%s"}`, op, label) + } + expressions = append(expressions, binaryExpr) + } + } + } + + // For aggregations, also add combinations of 4 labels. (e.g., "a,b,c,d", "c,d,e,f" etc) + labelCombinations = append(labelCombinations, combinations(labels, 4)...) + + // Aggregations + for _, labels := range labelCombinations { + labelRegex := strings.Join(labels, "|") + for _, aggFunc := range []string{"sum", "avg", "min", "max"} { + expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"})`, aggFunc, labelRegex)) + expressions = append(expressions, fmt.Sprintf(`%s by (group) (series{label=~"(%s)"})`, aggFunc, labelRegex)) + expressions = append(expressions, fmt.Sprintf(`%s without (group) (series{label=~"(%s)"})`, aggFunc, labelRegex)) + } + } + + timeRanges := []struct { + loadStep int + interval time.Duration + }{ + {loadStep: 1, interval: 1 * time.Minute}, + {loadStep: 6, interval: 6 * time.Minute}, + {loadStep: 6, interval: 5 * time.Minute}, + } + + // Total tests: + // Binary operation labels: 11C2 + 11C3 = 220 + // * 4 operations = 880 + // Aggregation labels: 220 + 11C4 = 550 + // * 4 aggregations * 3 groups = 6600 + // Total = 7480 + // * 3 time ranges = 22440 + + for _, tr := range timeRanges { + start := timestamp.Time(0) + end := start.Add(time.Duration(pointsPerSeries) * time.Duration(tr.loadStep) * time.Minute) // Deliberately queries 1 step past the final loaded point + + storage := promqltest.LoadedStorage(t, fmt.Sprintf("load %dm", tr.loadStep)+samples) + t.Cleanup(func() { require.NoError(t, storage.Close()) }) + + for _, expr := range expressions { + testName := fmt.Sprintf("Expr: %s, Start: %d, End: %d, Interval: %s", expr, start.Unix(), end.Unix(), tr.interval) + t.Run(testName, func(t *testing.T) { + q, err := prometheusEngine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, tr.interval) + require.NoError(t, err) + defer q.Close() + expectedResults := q.Exec(context.Background()) + + q, err = mimirEngine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, tr.interval) + require.NoError(t, err) + defer q.Close() + mimirResults := q.Exec(context.Background()) + + RequireEqualResults(t, expr, expectedResults, mimirResults) + }) + } + } +} diff --git a/pkg/streamingpromql/testing.go b/pkg/streamingpromql/testing.go index 7655801ec9d..087a593aede 100644 --- a/pkg/streamingpromql/testing.go +++ b/pkg/streamingpromql/testing.go @@ -202,3 +202,24 @@ func sortVector(v promql.Vector) { return labels.Compare(a.Metric, b.Metric) }) } + +// combinations generates all combinations of a given length from a slice of strings. +func combinations(arr []string, length int) [][]string { + if length == 0 || length > len(arr) { + return nil + } + var combine func([]string, int, int) [][]string + combine = func(arr []string, length, start int) [][]string { + if length == 0 { + return [][]string{{}} + } + result := [][]string{} + for i := start; i <= len(arr)-length; i++ { + for _, suffix := range combine(arr, length-1, i+1) { + result = append(result, append([]string{arr[i]}, suffix...)) + } + } + return result + } + return combine(arr, length, 0) +} From 879ef2919098a4212162fd3e55708f4b2e5492d2 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Thu, 12 Sep 2024 16:22:28 +1000 Subject: [PATCH 04/23] Return nil values where promql does --- pkg/streamingpromql/query.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 54033bbbe93..3563b053c46 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -493,7 +493,10 @@ func (q *Query) Exec(ctx context.Context) *promql.Result { return &promql.Result{Err: compat.NewNotSupportedError(fmt.Sprintf("unsupported result type %s", parser.DocumentedType(q.statement.Expr.Type())))} } - q.result.Warnings = *q.annotations + // To make comparing to Prometheus' engine easier, only return Annotations if there are some, otherwise, return nil. + if len(*q.annotations) > 0 { + q.result.Warnings = *q.annotations + } return q.result } @@ -547,6 +550,10 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o t } func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o types.InstantVectorOperator, series []types.SeriesMetadata) (promql.Matrix, error) { + if len(series) == 0 { + return nil, nil + } + m := types.GetMatrix(len(series)) for i, s := range series { From bc0f1758dafc0c96bac63c7729dac5b84df5f064 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Thu, 12 Sep 2024 16:28:11 +1000 Subject: [PATCH 05/23] Add some extra time combinations --- pkg/streamingpromql/engine_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 65fe2b0b577..11f4d4c2912 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -1771,6 +1771,8 @@ func TestCompareVariousMixedMetrics(t *testing.T) { interval time.Duration }{ {loadStep: 1, interval: 1 * time.Minute}, + {loadStep: 1, interval: 6 * time.Minute}, + {loadStep: 1, interval: 5 * time.Minute}, {loadStep: 6, interval: 6 * time.Minute}, {loadStep: 6, interval: 5 * time.Minute}, } @@ -1781,7 +1783,7 @@ func TestCompareVariousMixedMetrics(t *testing.T) { // Aggregation labels: 220 + 11C4 = 550 // * 4 aggregations * 3 groups = 6600 // Total = 7480 - // * 3 time ranges = 22440 + // * 5 time ranges = 37400 for _, tr := range timeRanges { start := timestamp.Time(0) From a9a5eba7c7296193de6e53c800fe6732f9f3b6c5 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Thu, 12 Sep 2024 16:28:27 +1000 Subject: [PATCH 06/23] Add test for combinations --- pkg/streamingpromql/testing_test.go | 63 +++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 pkg/streamingpromql/testing_test.go diff --git a/pkg/streamingpromql/testing_test.go b/pkg/streamingpromql/testing_test.go new file mode 100644 index 00000000000..08cf7e032ea --- /dev/null +++ b/pkg/streamingpromql/testing_test.go @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package streamingpromql + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCombinations(t *testing.T) { + tests := map[string]struct { + name string + input []string + length int + expected [][]string + }{ + "combinations of 2 from 3 items": { + + input: []string{"a", "b", "c"}, + length: 2, + expected: [][]string{{"a", "b"}, {"a", "c"}, {"b", "c"}}, + }, + "combinations of 3 from 4 items": { + + input: []string{"a", "b", "c", "d"}, + length: 3, + expected: [][]string{{"a", "b", "c"}, {"a", "b", "d"}, {"a", "c", "d"}, {"b", "c", "d"}}, + }, + "combinations of 1 from 3 items": { + input: []string{"a", "b", "c"}, + length: 1, + expected: [][]string{{"a"}, {"b"}, {"c"}}, + }, + "combinations of 3 from 3 items": { + input: []string{"a", "b", "c"}, + length: 3, + expected: [][]string{{"a", "b", "c"}}, + }, + "combinations of 0 length": { + input: []string{"a", "b", "c"}, + length: 0, + expected: nil, + }, + "length greater than array size": { + input: []string{"a", "b"}, + length: 3, + expected: nil, // nil because length is greater than the input size + }, + "empty array": { + input: []string{}, + length: 2, + expected: nil, + }, + } + + for tName, test := range tests { + t.Run(tName, func(t *testing.T) { + output := combinations(test.input, test.length) + require.Equal(t, test.expected, output) + }) + } +} From 77928e191b21fd02851e2494ad7b829893de421a Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Thu, 12 Sep 2024 16:48:24 +1000 Subject: [PATCH 07/23] Fix comment after reordering --- pkg/streamingpromql/engine_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 11f4d4c2912..58e12b866a2 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -1707,13 +1707,13 @@ func TestCompareVariousMixedMetrics(t *testing.T) { // {b} h h h h h h // {c} f h f h N h // {d} f _ _ s f f - // {f} h h _ s h N - // {g} f N _ f f N - // {h} N N N N N N - // {i} N N N _ N s - // {j} f h _ N h s - // {k} f f s s s s - // {l} 0 0 0 N s 0 + // {e} h h _ s h N + // {f} f N _ f f N + // {g} N N N N N N + // {h} N N N _ N s + // {i} f h _ N h s + // {j} f f s s s s + // {k} 0 0 0 N s 0 pointsPerSeries := 6 samples := ` From 9fa4f715376ece74cf58bafb28809bcce85746d9 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 16 Sep 2024 16:53:53 +1000 Subject: [PATCH 08/23] Test different size buckets --- pkg/streamingpromql/engine_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 58e12b866a2..fd7f8944b18 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -1718,14 +1718,14 @@ func TestCompareVariousMixedMetrics(t *testing.T) { pointsPerSeries := 6 samples := ` series{label="a", group="a"} 1 2 3 4 5 -50 - series{label="b", group="a"} {{schema:5 sum:15 count:10 buckets:[3 2 5]}} {{schema:5 sum:20 count:15 buckets:[4 5 6]}} {{schema:5 sum:25 count:20 buckets:[5 7 8]}} {{schema:5 sum:30 count:25 buckets:[6 9 10]}} {{schema:5 sum:35 count:30 buckets:[7 10 13]}} {{schema:5 sum:40 count:35 buckets:[8 11 14]}} + series{label="b", group="a"} {{schema:1 sum:15 count:10 buckets:[3 2 5 7 9]}} {{schema:2 sum:20 count:15 buckets:[4]}} {{schema:3 sum:25 count:20 buckets:[5 8]}} {{schema:4 sum:30 count:25 buckets:[6 9 10 11]}} {{schema:5 sum:35 count:30 buckets:[7 10 13]}} {{schema:6 sum:40 count:35 buckets:[8 11 14]}} series{label="c", group="a"} 1 {{schema:3 sum:5 count:3 buckets:[1 1 1]}} 3 {{schema:3 sum:10 count:6 buckets:[2 2 2]}} NaN {{schema:3 sum:12 count:7 buckets:[2 2 3]}} series{label="d", group="a"} 1 _ _ stale 5 6 series{label="e", group="b"} {{schema:4 sum:12 count:8 buckets:[2 3 3]}} {{schema:4 sum:14 count:9 buckets:[3 3 3]}} _ stale {{schema:4 sum:18 count:11 buckets:[4 4 3]}} NaN series{label="f", group="b"} 1 NaN _ 4 5 NaN series{label="g", group="b"} NaN NaN NaN NaN NaN NaN series{label="h", group="b"} NaN NaN NaN _ NaN stale - series{label="i", group="c"} 1 {{schema:5 sum:15 count:10 buckets:[3 2 5]}} _ NaN {{schema:5 sum:30 count:25 buckets:[6 9 10]}} stale + series{label="i", group="c"} 1 {{schema:5 sum:15 count:10 buckets:[3 2 5]}} _ NaN {{schema:2 sum:30 count:25 buckets:[6 9 10 9 1]}} stale series{label="j", group="c"} 1 -20 stale stale stale stale series{label="k", group="c"} 0 0 0 NaN stale 0 ` From 47817f12e27b1e3790b8c7c3992531d0cd497b0f Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 16 Sep 2024 16:55:52 +1000 Subject: [PATCH 09/23] Simplify span matching --- pkg/streamingpromql/testing.go | 71 ++++------------------------------ 1 file changed, 7 insertions(+), 64 deletions(-) diff --git a/pkg/streamingpromql/testing.go b/pkg/streamingpromql/testing.go index 087a593aede..be363583522 100644 --- a/pkg/streamingpromql/testing.go +++ b/pkg/streamingpromql/testing.go @@ -104,10 +104,10 @@ func RequireEqualResults(t testing.TB, expr string, expected, actual *promql.Res requireInEpsilonIfNotZero(t, h1.ZeroThreshold, h2.ZeroThreshold, "histogram thresholds match") requireInEpsilonIfNotZero(t, h1.ZeroCount, h2.ZeroCount, "histogram zero counts match") - require.True(t, spansMatch(h1.NegativeSpans, h2.NegativeSpans), "spans match") + requireSpansMatch(t, h1.NegativeSpans, h2.NegativeSpans) requireFloatBucketsMatch(t, h1.NegativeBuckets, h2.NegativeBuckets) - require.True(t, spansMatch(h1.PositiveSpans, h2.PositiveSpans)) + requireSpansMatch(t, h1.PositiveSpans, h2.PositiveSpans) requireFloatBucketsMatch(t, h1.PositiveBuckets, h2.PositiveBuckets) } } @@ -132,69 +132,12 @@ func requireFloatBucketsMatch(t testing.TB, b1, b2 []float64) { } } -// Copied from prometheus as it is not exported -// spansMatch returns true if both spans represent the same bucket layout -// after combining zero length spans with the next non-zero length span. -func spansMatch(s1, s2 []histogram.Span) bool { - if len(s1) == 0 && len(s2) == 0 { - return true +func requireSpansMatch(t testing.TB, s1, s2 []histogram.Span) { + require.Equal(t, len(s1), len(s2), "number of spans") + for i, _ := range s1 { + require.Equal(t, s1[i].Length, s2[i].Length, "Span lengths match") + require.Equal(t, s1[i].Offset, s2[i].Offset, "Span offsets match") } - - s1idx, s2idx := 0, 0 - for { - if s1idx >= len(s1) { - return allEmptySpans(s2[s2idx:]) - } - if s2idx >= len(s2) { - return allEmptySpans(s1[s1idx:]) - } - - currS1, currS2 := s1[s1idx], s2[s2idx] - s1idx++ - s2idx++ - if currS1.Length == 0 { - // This span is zero length, so we add consecutive such spans - // until we find a non-zero span. - for ; s1idx < len(s1) && s1[s1idx].Length == 0; s1idx++ { - currS1.Offset += s1[s1idx].Offset - } - if s1idx < len(s1) { - currS1.Offset += s1[s1idx].Offset - currS1.Length = s1[s1idx].Length - s1idx++ - } - } - if currS2.Length == 0 { - // This span is zero length, so we add consecutive such spans - // until we find a non-zero span. - for ; s2idx < len(s2) && s2[s2idx].Length == 0; s2idx++ { - currS2.Offset += s2[s2idx].Offset - } - if s2idx < len(s2) { - currS2.Offset += s2[s2idx].Offset - currS2.Length = s2[s2idx].Length - s2idx++ - } - } - - if currS1.Length == 0 && currS2.Length == 0 { - // The last spans of both set are zero length. Previous spans match. - return true - } - - if currS1.Offset != currS2.Offset || currS1.Length != currS2.Length { - return false - } - } -} - -func allEmptySpans(s []histogram.Span) bool { - for _, ss := range s { - if ss.Length > 0 { - return false - } - } - return true } func sortVector(v promql.Vector) { From 6596f615df772fef5400ef2607bbc5c84b5006a3 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 16 Sep 2024 16:56:37 +1000 Subject: [PATCH 10/23] Return nil when query has no results --- pkg/streamingpromql/query.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 3563b053c46..3eb1d42ae53 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -578,6 +578,10 @@ func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o t }) } + if len(m) == 0 { + return nil, nil + } + slices.SortFunc(m, func(a, b promql.Series) int { return labels.Compare(a.Metric, b.Metric) }) From d6eb49f19be91f4a5f1253798aeef599a9b3fbbf Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 16 Sep 2024 16:56:53 +1000 Subject: [PATCH 11/23] Simplify combinations helper --- pkg/streamingpromql/testing.go | 29 +++++++++++++++-------------- pkg/streamingpromql/testing_test.go | 20 ++++++++++++++------ 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/pkg/streamingpromql/testing.go b/pkg/streamingpromql/testing.go index be363583522..f9b4748c39a 100644 --- a/pkg/streamingpromql/testing.go +++ b/pkg/streamingpromql/testing.go @@ -148,21 +148,22 @@ func sortVector(v promql.Vector) { // combinations generates all combinations of a given length from a slice of strings. func combinations(arr []string, length int) [][]string { - if length == 0 || length > len(arr) { - return nil + if length < 0 || length > len(arr) { + panic("Invalid length requested") } - var combine func([]string, int, int) [][]string - combine = func(arr []string, length, start int) [][]string { - if length == 0 { - return [][]string{{}} - } - result := [][]string{} - for i := start; i <= len(arr)-length; i++ { - for _, suffix := range combine(arr, length-1, i+1) { - result = append(result, append([]string{arr[i]}, suffix...)) - } + return combine(arr, length, 0) +} + +func combine(arr []string, length int, start int) [][]string { + if length == 0 { + return [][]string{{}} + } + result := [][]string{} + for i := start; i <= len(arr)-length; i++ { + for _, suffix := range combine(arr, length-1, i+1) { + combination := append([]string{arr[i]}, suffix...) + result = append(result, combination) } - return result } - return combine(arr, length, 0) + return result } diff --git a/pkg/streamingpromql/testing_test.go b/pkg/streamingpromql/testing_test.go index 08cf7e032ea..1576126705d 100644 --- a/pkg/streamingpromql/testing_test.go +++ b/pkg/streamingpromql/testing_test.go @@ -14,6 +14,7 @@ func TestCombinations(t *testing.T) { input []string length int expected [][]string + panics bool }{ "combinations of 2 from 3 items": { @@ -40,22 +41,29 @@ func TestCombinations(t *testing.T) { "combinations of 0 length": { input: []string{"a", "b", "c"}, length: 0, - expected: nil, + expected: [][]string{{}}, }, "length greater than array size": { - input: []string{"a", "b"}, - length: 3, - expected: nil, // nil because length is greater than the input size + input: []string{"a", "b"}, + length: 3, + panics: true, }, "empty array": { input: []string{}, - length: 2, - expected: nil, + length: 0, + expected: [][]string{{}}, }, } for tName, test := range tests { t.Run(tName, func(t *testing.T) { + if test.panics { + f := func() { + combinations(test.input, test.length) + } + require.Panics(t, f) + return + } output := combinations(test.input, test.length) require.Equal(t, test.expected, output) }) From 138adf7066b22902241743f9252df03b576a40bd Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 16 Sep 2024 16:58:02 +1000 Subject: [PATCH 12/23] Move MetricsNames to types package --- pkg/streamingpromql/operators/aggregation.go | 4 ++-- pkg/streamingpromql/operators/aggregation_test.go | 2 +- pkg/streamingpromql/operators/function_over_range_vector.go | 4 ++-- pkg/streamingpromql/{operators => types}/metric_names.go | 6 ++---- 4 files changed, 7 insertions(+), 9 deletions(-) rename pkg/streamingpromql/{operators => types}/metric_names.go (81%) diff --git a/pkg/streamingpromql/operators/aggregation.go b/pkg/streamingpromql/operators/aggregation.go index 359d84152d5..709139a8437 100644 --- a/pkg/streamingpromql/operators/aggregation.go +++ b/pkg/streamingpromql/operators/aggregation.go @@ -36,7 +36,7 @@ type Aggregation struct { Annotations *annotations.Annotations - metricNames *MetricNames + metricNames *types.MetricNames currentSeriesIndex int expressionPosition posrange.PositionRange @@ -79,7 +79,7 @@ func NewAggregation( Without: without, MemoryConsumptionTracker: memoryConsumptionTracker, Annotations: annotations, - metricNames: &MetricNames{}, + metricNames: &types.MetricNames{}, expressionPosition: expressionPosition, aggregationGroupFactory: opGroupFactory, } diff --git a/pkg/streamingpromql/operators/aggregation_test.go b/pkg/streamingpromql/operators/aggregation_test.go index beaf605ea69..e7aa3e09645 100644 --- a/pkg/streamingpromql/operators/aggregation_test.go +++ b/pkg/streamingpromql/operators/aggregation_test.go @@ -81,7 +81,7 @@ func TestAggregation_ReturnsGroupsFinishedFirstEarliest(t *testing.T) { aggregator := &Aggregation{ Inner: &testOperator{series: testCase.inputSeries}, Grouping: testCase.grouping, - metricNames: &MetricNames{}, + metricNames: &types.MetricNames{}, aggregationGroupFactory: func() aggregations.AggregationGroup { return &aggregations.SumAggregationGroup{} }, } diff --git a/pkg/streamingpromql/operators/function_over_range_vector.go b/pkg/streamingpromql/operators/function_over_range_vector.go index 9ebce607d41..eea7d7d9c29 100644 --- a/pkg/streamingpromql/operators/function_over_range_vector.go +++ b/pkg/streamingpromql/operators/function_over_range_vector.go @@ -26,7 +26,7 @@ type FunctionOverRangeVector struct { Annotations *annotations.Annotations - metricNames *MetricNames + metricNames *types.MetricNames currentSeriesIndex int numSteps int @@ -61,7 +61,7 @@ func NewFunctionOverRangeVector( } if f.NeedsSeriesNamesForAnnotations { - o.metricNames = &MetricNames{} + o.metricNames = &types.MetricNames{} } o.emitAnnotationFunc = o.emitAnnotation // This is an optimisation to avoid creating the EmitAnnotationFunc instance on every usage. diff --git a/pkg/streamingpromql/operators/metric_names.go b/pkg/streamingpromql/types/metric_names.go similarity index 81% rename from pkg/streamingpromql/operators/metric_names.go rename to pkg/streamingpromql/types/metric_names.go index 9980a406221..352ba459713 100644 --- a/pkg/streamingpromql/operators/metric_names.go +++ b/pkg/streamingpromql/types/metric_names.go @@ -1,11 +1,9 @@ // SPDX-License-Identifier: AGPL-3.0-only -package operators +package types import ( "github.com/prometheus/prometheus/model/labels" - - "github.com/grafana/mimir/pkg/streamingpromql/types" ) // MetricNames captures and stores the metric names of each series for later use in an operator. @@ -16,7 +14,7 @@ type MetricNames struct { names []string } -func (n *MetricNames) CaptureMetricNames(metadata []types.SeriesMetadata) { +func (n *MetricNames) CaptureMetricNames(metadata []SeriesMetadata) { n.names = make([]string, len(metadata)) for i, series := range metadata { From 4125e72325ad08559dd2d661fb8e42f1f349f0b7 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 16 Sep 2024 22:06:14 +1000 Subject: [PATCH 13/23] Also check range vectors and rate function --- pkg/streamingpromql/engine_test.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index fd7f8944b18..71969d6188d 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -1756,14 +1756,16 @@ func TestCompareVariousMixedMetrics(t *testing.T) { // For aggregations, also add combinations of 4 labels. (e.g., "a,b,c,d", "c,d,e,f" etc) labelCombinations = append(labelCombinations, combinations(labels, 4)...) - // Aggregations for _, labels := range labelCombinations { labelRegex := strings.Join(labels, "|") + // Aggregations for _, aggFunc := range []string{"sum", "avg", "min", "max"} { expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"})`, aggFunc, labelRegex)) expressions = append(expressions, fmt.Sprintf(`%s by (group) (series{label=~"(%s)"})`, aggFunc, labelRegex)) expressions = append(expressions, fmt.Sprintf(`%s without (group) (series{label=~"(%s)"})`, aggFunc, labelRegex)) } + expressions = append(expressions, fmt.Sprintf(`rate(series{label=~"(%s)"}[45s])`, labelRegex)) + expressions = append(expressions, fmt.Sprintf(`avg(rate(series{label=~"(%s)"}[2m15s]))`, labelRegex)) } timeRanges := []struct { @@ -1777,17 +1779,9 @@ func TestCompareVariousMixedMetrics(t *testing.T) { {loadStep: 6, interval: 5 * time.Minute}, } - // Total tests: - // Binary operation labels: 11C2 + 11C3 = 220 - // * 4 operations = 880 - // Aggregation labels: 220 + 11C4 = 550 - // * 4 aggregations * 3 groups = 6600 - // Total = 7480 - // * 5 time ranges = 37400 - for _, tr := range timeRanges { start := timestamp.Time(0) - end := start.Add(time.Duration(pointsPerSeries) * time.Duration(tr.loadStep) * time.Minute) // Deliberately queries 1 step past the final loaded point + end := start.Add(time.Duration(pointsPerSeries*tr.loadStep) * time.Minute) // Deliberately queries 1 step past the final loaded point storage := promqltest.LoadedStorage(t, fmt.Sprintf("load %dm", tr.loadStep)+samples) t.Cleanup(func() { require.NoError(t, storage.Close()) }) From 1346a57b07c83788b03103980e1d24c99b794067 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 16 Sep 2024 22:06:36 +1000 Subject: [PATCH 14/23] Temporarily work around different annotations --- .../benchmarks/comparison_test.go | 4 ++-- pkg/streamingpromql/engine_test.go | 24 ++++++++++++++++++- pkg/streamingpromql/testing.go | 12 ++++++---- 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/pkg/streamingpromql/benchmarks/comparison_test.go b/pkg/streamingpromql/benchmarks/comparison_test.go index 26644aab975..18ef6922e29 100644 --- a/pkg/streamingpromql/benchmarks/comparison_test.go +++ b/pkg/streamingpromql/benchmarks/comparison_test.go @@ -66,7 +66,7 @@ func BenchmarkQuery(b *testing.B) { prometheusResult, prometheusClose := c.Run(ctx, b, start, end, interval, prometheusEngine, q) mimirResult, mimirClose := c.Run(ctx, b, start, end, interval, mimirEngine, q) - streamingpromql.RequireEqualResults(b, c.Expr, prometheusResult, mimirResult) + streamingpromql.RequireEqualResults(b, c.Expr, prometheusResult, mimirResult, true) prometheusClose() mimirClose() @@ -107,7 +107,7 @@ func TestBothEnginesReturnSameResultsForBenchmarkQueries(t *testing.T) { prometheusResult, prometheusClose := c.Run(ctx, t, start, end, interval, prometheusEngine, q) mimirResult, mimirClose := c.Run(ctx, t, start, end, interval, mimirEngine, q) - streamingpromql.RequireEqualResults(t, c.Expr, prometheusResult, mimirResult) + streamingpromql.RequireEqualResults(t, c.Expr, prometheusResult, mimirResult, true) prometheusClose() mimirClose() diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 71969d6188d..79867e75a98 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -1628,6 +1628,26 @@ func TestAnnotations(t *testing.T) { `PromQL info: metric might not be a counter, name does not end in _total/_sum/_count/_bucket: "other_float_metric" (1:99)`, }, }, + "no rate annotation when sample ends with histogram": { + data: ` + series 3 1 {{schema:3 sum:12 count:7 buckets:[2 2 3]}} + `, + expr: "rate(series[45s])", + expectedWarningAnnotations: []string{}, + expectedInfoAnnotations: []string{}, + // This can be removed once https://github.com/prometheus/prometheus/pull/14910 is vendored. + skipComparisonWithPrometheusReason: "Prometheus only considers the type of the last point in the vector selector rather than the output value", + }, + "no rate annotation when only 1 point in range vector": { + data: ` + series 1 + `, + expr: "rate(series[1m])", + expectedWarningAnnotations: []string{}, + expectedInfoAnnotations: []string{}, + // This can be removed once https://github.com/prometheus/prometheus/pull/14910 is vendored. + skipComparisonWithPrometheusReason: "Prometheus only considers the type of the last point in the vector selector rather than the output value", + }, } opts := NewTestEngineOpts() @@ -1799,7 +1819,9 @@ func TestCompareVariousMixedMetrics(t *testing.T) { defer q.Close() mimirResults := q.Exec(context.Background()) - RequireEqualResults(t, expr, expectedResults, mimirResults) + // We currently omit checking the annotations due to a difference between the engines. + // This can be re-enabled once https://github.com/prometheus/prometheus/pull/14910 is vendored. + RequireEqualResults(t, expr, expectedResults, mimirResults, false) }) } } diff --git a/pkg/streamingpromql/testing.go b/pkg/streamingpromql/testing.go index f9b4748c39a..fbe3bb6b7fb 100644 --- a/pkg/streamingpromql/testing.go +++ b/pkg/streamingpromql/testing.go @@ -33,14 +33,16 @@ func NewTestEngineOpts() EngineOpts { // Why do we do this rather than require.Equal(t, expected, actual)? // It's possible that floating point values are slightly different due to imprecision, but require.Equal doesn't allow us to set an allowable difference. -func RequireEqualResults(t testing.TB, expr string, expected, actual *promql.Result) { +func RequireEqualResults(t testing.TB, expr string, expected, actual *promql.Result, checkAnnotations bool) { require.Equal(t, expected.Err, actual.Err) require.Equal(t, expected.Value.Type(), actual.Value.Type()) - expectedWarnings, expectedInfos := expected.Warnings.AsStrings(expr, 0, 0) - actualWarnings, actualInfos := actual.Warnings.AsStrings(expr, 0, 0) - require.ElementsMatch(t, expectedWarnings, actualWarnings) - require.ElementsMatch(t, expectedInfos, actualInfos) + if checkAnnotations { + expectedWarnings, expectedInfos := expected.Warnings.AsStrings(expr, 0, 0) + actualWarnings, actualInfos := actual.Warnings.AsStrings(expr, 0, 0) + require.ElementsMatch(t, expectedWarnings, actualWarnings) + require.ElementsMatch(t, expectedInfos, actualInfos) + } switch expected.Value.Type() { case parser.ValueTypeVector: From 39c7314bb7ab80a792eb8d111495243c6d021996 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 16 Sep 2024 22:07:31 +1000 Subject: [PATCH 15/23] Update CHANGELOG --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b4560403436..6006dfbf105 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,7 +33,7 @@ * [CHANGE] Query-scheduler: Remove the experimental `-query-scheduler.use-multi-algorithm-query-queue` flag. The new multi-algorithm tree queue is always used for the scheduler. #9210 * [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173 * [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 * [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988 * What it is: * When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path. From c7c3db465b9c895daac03d272836d83c504fca42 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 16 Sep 2024 23:48:00 +1000 Subject: [PATCH 16/23] Fix linting --- pkg/streamingpromql/testing.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/streamingpromql/testing.go b/pkg/streamingpromql/testing.go index fbe3bb6b7fb..f0774892654 100644 --- a/pkg/streamingpromql/testing.go +++ b/pkg/streamingpromql/testing.go @@ -136,7 +136,7 @@ func requireFloatBucketsMatch(t testing.TB, b1, b2 []float64) { func requireSpansMatch(t testing.TB, s1, s2 []histogram.Span) { require.Equal(t, len(s1), len(s2), "number of spans") - for i, _ := range s1 { + for i := range s1 { require.Equal(t, s1[i].Length, s2[i].Length, "Span lengths match") require.Equal(t, s1[i].Offset, s2[i].Offset, "Span offsets match") } From 54a52b3b87f3792c44aa61484e64e73cd227b7a3 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Tue, 17 Sep 2024 14:19:33 +1000 Subject: [PATCH 17/23] Add a few extra combinations --- pkg/streamingpromql/engine_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 79867e75a98..181eb30d07d 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -1734,6 +1734,9 @@ func TestCompareVariousMixedMetrics(t *testing.T) { // {i} f h _ N h s // {j} f f s s s s // {k} 0 0 0 N s 0 + // {l} h _ f _ s N + // {m} s s N _ _ f + // {n} _ _ _ _ _ _ pointsPerSeries := 6 samples := ` @@ -1748,6 +1751,9 @@ func TestCompareVariousMixedMetrics(t *testing.T) { series{label="i", group="c"} 1 {{schema:5 sum:15 count:10 buckets:[3 2 5]}} _ NaN {{schema:2 sum:30 count:25 buckets:[6 9 10 9 1]}} stale series{label="j", group="c"} 1 -20 stale stale stale stale series{label="k", group="c"} 0 0 0 NaN stale 0 + series{label="l", group="d"} {{schema:1 sum:10 count:5 buckets:[1 2]}} _ 3 _ stale NaN + series{label="m", group="d"} stale stale NaN _ _ 4 + series{label="n", group="d"} ` // Labels for generating combinations From 8eec96f43bec16568e6c6882bdb185f9805351dc Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Fri, 20 Sep 2024 10:41:13 +1000 Subject: [PATCH 18/23] Address review feedback --- pkg/streamingpromql/engine_test.go | 40 +++++++++++++++--------------- pkg/streamingpromql/query.go | 6 +---- 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 181eb30d07d..4176bc414c3 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -1555,6 +1555,26 @@ func TestAnnotations(t *testing.T) { `PromQL warning: vector contains histograms with incompatible custom buckets for metric name "metric" (1:6)`, }, }, + "rate() over metric without counter suffix with single float or histogram in range": { + data: ` + series 3 1 {{schema:3 sum:12 count:7 buckets:[2 2 3]}} + `, + expr: "rate(series[45s])", + expectedWarningAnnotations: []string{}, + expectedInfoAnnotations: []string{}, + // This can be removed once https://github.com/prometheus/prometheus/pull/14910 is vendored. + skipComparisonWithPrometheusReason: "Prometheus only considers the type of the last point in the vector selector rather than the output value", + }, + "rate() over one point in range": { + data: ` + series 1 + `, + expr: "rate(series[1m])", + expectedWarningAnnotations: []string{}, + expectedInfoAnnotations: []string{}, + // This can be removed once https://github.com/prometheus/prometheus/pull/14910 is vendored. + skipComparisonWithPrometheusReason: "Prometheus only considers the type of the last point in the vector selector rather than the output value", + }, "sum_over_time() over series with both floats and histograms": { data: `some_metric 10 {{schema:0 sum:1 count:1 buckets:[1]}}`, @@ -1628,26 +1648,6 @@ func TestAnnotations(t *testing.T) { `PromQL info: metric might not be a counter, name does not end in _total/_sum/_count/_bucket: "other_float_metric" (1:99)`, }, }, - "no rate annotation when sample ends with histogram": { - data: ` - series 3 1 {{schema:3 sum:12 count:7 buckets:[2 2 3]}} - `, - expr: "rate(series[45s])", - expectedWarningAnnotations: []string{}, - expectedInfoAnnotations: []string{}, - // This can be removed once https://github.com/prometheus/prometheus/pull/14910 is vendored. - skipComparisonWithPrometheusReason: "Prometheus only considers the type of the last point in the vector selector rather than the output value", - }, - "no rate annotation when only 1 point in range vector": { - data: ` - series 1 - `, - expr: "rate(series[1m])", - expectedWarningAnnotations: []string{}, - expectedInfoAnnotations: []string{}, - // This can be removed once https://github.com/prometheus/prometheus/pull/14910 is vendored. - skipComparisonWithPrometheusReason: "Prometheus only considers the type of the last point in the vector selector rather than the output value", - }, } opts := NewTestEngineOpts() diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 3eb1d42ae53..e97da911489 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -493,7 +493,7 @@ func (q *Query) Exec(ctx context.Context) *promql.Result { return &promql.Result{Err: compat.NewNotSupportedError(fmt.Sprintf("unsupported result type %s", parser.DocumentedType(q.statement.Expr.Type())))} } - // To make comparing to Prometheus' engine easier, only return Annotations if there are some, otherwise, return nil. + // To make comparing to Prometheus' engine easier, only return the annotations if there are some, otherwise, return nil. if len(*q.annotations) > 0 { q.result.Warnings = *q.annotations } @@ -550,10 +550,6 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o t } func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o types.InstantVectorOperator, series []types.SeriesMetadata) (promql.Matrix, error) { - if len(series) == 0 { - return nil, nil - } - m := types.GetMatrix(len(series)) for i, s := range series { From 4b6b749071f5109bab9964133a20342021c14615 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Fri, 20 Sep 2024 10:50:42 +1000 Subject: [PATCH 19/23] Create testutils package --- .../benchmarks/comparison_test.go | 5 +- pkg/streamingpromql/engine_test.go | 9 +- pkg/streamingpromql/testing.go | 145 ------------------ 3 files changed, 8 insertions(+), 151 deletions(-) diff --git a/pkg/streamingpromql/benchmarks/comparison_test.go b/pkg/streamingpromql/benchmarks/comparison_test.go index 18ef6922e29..2678f0d5c46 100644 --- a/pkg/streamingpromql/benchmarks/comparison_test.go +++ b/pkg/streamingpromql/benchmarks/comparison_test.go @@ -31,6 +31,7 @@ import ( "github.com/grafana/mimir/pkg/querier" "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/streamingpromql" + "github.com/grafana/mimir/pkg/streamingpromql/testutils" "github.com/grafana/mimir/pkg/util/validation" ) @@ -66,7 +67,7 @@ func BenchmarkQuery(b *testing.B) { prometheusResult, prometheusClose := c.Run(ctx, b, start, end, interval, prometheusEngine, q) mimirResult, mimirClose := c.Run(ctx, b, start, end, interval, mimirEngine, q) - streamingpromql.RequireEqualResults(b, c.Expr, prometheusResult, mimirResult, true) + testutils.RequireEqualResults(b, c.Expr, prometheusResult, mimirResult, true) prometheusClose() mimirClose() @@ -107,7 +108,7 @@ func TestBothEnginesReturnSameResultsForBenchmarkQueries(t *testing.T) { prometheusResult, prometheusClose := c.Run(ctx, t, start, end, interval, prometheusEngine, q) mimirResult, mimirClose := c.Run(ctx, t, start, end, interval, mimirEngine, q) - streamingpromql.RequireEqualResults(t, c.Expr, prometheusResult, mimirResult, true) + testutils.RequireEqualResults(t, c.Expr, prometheusResult, mimirResult, true) prometheusClose() mimirClose() diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 4176bc414c3..1ee2b156b6c 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -31,6 +31,7 @@ import ( "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/streamingpromql/compat" + "github.com/grafana/mimir/pkg/streamingpromql/testutils" "github.com/grafana/mimir/pkg/streamingpromql/types" "github.com/grafana/mimir/pkg/util/globalerror" ) @@ -1761,8 +1762,8 @@ func TestCompareVariousMixedMetrics(t *testing.T) { // Generate combinations of 2 and 3 labels. (e.g., "a,b", "e,f", "c,d,e" etc) // These will be used for binary operations, so we can add up to 3 series. - labelCombinations := combinations(labels, 2) - labelCombinations = append(labelCombinations, combinations(labels, 3)...) + labelCombinations := testutils.Combinations(labels, 2) + labelCombinations = append(labelCombinations, testutils.Combinations(labels, 3)...) expressions := []string{} @@ -1780,7 +1781,7 @@ func TestCompareVariousMixedMetrics(t *testing.T) { } // For aggregations, also add combinations of 4 labels. (e.g., "a,b,c,d", "c,d,e,f" etc) - labelCombinations = append(labelCombinations, combinations(labels, 4)...) + labelCombinations = append(labelCombinations, testutils.Combinations(labels, 4)...) for _, labels := range labelCombinations { labelRegex := strings.Join(labels, "|") @@ -1827,7 +1828,7 @@ func TestCompareVariousMixedMetrics(t *testing.T) { // We currently omit checking the annotations due to a difference between the engines. // This can be re-enabled once https://github.com/prometheus/prometheus/pull/14910 is vendored. - RequireEqualResults(t, expr, expectedResults, mimirResults, false) + testutils.RequireEqualResults(t, expr, expectedResults, mimirResults, false) }) } } diff --git a/pkg/streamingpromql/testing.go b/pkg/streamingpromql/testing.go index f0774892654..01ff956bb8d 100644 --- a/pkg/streamingpromql/testing.go +++ b/pkg/streamingpromql/testing.go @@ -4,15 +4,9 @@ package streamingpromql import ( "math" - "testing" "time" - "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/promql/parser" - "github.com/stretchr/testify/require" - "golang.org/x/exp/slices" ) func NewTestEngineOpts() EngineOpts { @@ -30,142 +24,3 @@ func NewTestEngineOpts() EngineOpts { Pedantic: true, } } - -// Why do we do this rather than require.Equal(t, expected, actual)? -// It's possible that floating point values are slightly different due to imprecision, but require.Equal doesn't allow us to set an allowable difference. -func RequireEqualResults(t testing.TB, expr string, expected, actual *promql.Result, checkAnnotations bool) { - require.Equal(t, expected.Err, actual.Err) - require.Equal(t, expected.Value.Type(), actual.Value.Type()) - - if checkAnnotations { - expectedWarnings, expectedInfos := expected.Warnings.AsStrings(expr, 0, 0) - actualWarnings, actualInfos := actual.Warnings.AsStrings(expr, 0, 0) - require.ElementsMatch(t, expectedWarnings, actualWarnings) - require.ElementsMatch(t, expectedInfos, actualInfos) - } - - switch expected.Value.Type() { - case parser.ValueTypeVector: - expectedVector, err := expected.Vector() - require.NoError(t, err) - actualVector, err := actual.Vector() - require.NoError(t, err) - - // Instant queries don't guarantee any particular sort order, so sort results here so that we can easily compare them. - sortVector(expectedVector) - sortVector(actualVector) - - require.Len(t, actualVector, len(expectedVector)) - - for i, expectedSample := range expectedVector { - actualSample := actualVector[i] - - require.Equal(t, expectedSample.Metric, actualSample.Metric) - require.Equal(t, expectedSample.T, actualSample.T) - require.Equal(t, expectedSample.H, actualSample.H) - requireInEpsilonIfNotZero(t, expectedSample.F, actualSample.F) - } - case parser.ValueTypeMatrix: - expectedMatrix, err := expected.Matrix() - require.NoError(t, err) - actualMatrix, err := actual.Matrix() - require.NoError(t, err) - - require.Len(t, actualMatrix, len(expectedMatrix)) - - for i, expectedSeries := range expectedMatrix { - actualSeries := actualMatrix[i] - - require.Equal(t, expectedSeries.Metric, actualSeries.Metric) - - for j, expectedPoint := range expectedSeries.Floats { - actualPoint := actualSeries.Floats[j] - - require.Equal(t, expectedPoint.T, actualPoint.T) - requireInEpsilonIfNotZero(t, expectedPoint.F, actualPoint.F, "expected series %v to have points %v, but result is %v", expectedSeries.Metric.String(), expectedSeries.Floats, actualSeries.Floats) - } - - for j, expectedPoint := range actualSeries.Histograms { - actualPoint := actualSeries.Histograms[j] - - require.Equal(t, expectedPoint.T, actualPoint.T) - if expectedPoint.H == nil { - require.Equal(t, expectedPoint.H, actualPoint.H) - } else { - h1 := expectedPoint.H - h2 := actualPoint.H - - require.Equal(t, h1.Schema, h2.Schema, "histogram schemas match") - requireInEpsilonIfNotZero(t, h1.Count, h2.Count, "histogram counts match") - requireInEpsilonIfNotZero(t, h1.Sum, h2.Sum, "histogram sums match") - - if h1.UsesCustomBuckets() { - requireFloatBucketsMatch(t, h1.CustomValues, h2.CustomValues) - } - - requireInEpsilonIfNotZero(t, h1.ZeroThreshold, h2.ZeroThreshold, "histogram thresholds match") - requireInEpsilonIfNotZero(t, h1.ZeroCount, h2.ZeroCount, "histogram zero counts match") - - requireSpansMatch(t, h1.NegativeSpans, h2.NegativeSpans) - requireFloatBucketsMatch(t, h1.NegativeBuckets, h2.NegativeBuckets) - - requireSpansMatch(t, h1.PositiveSpans, h2.PositiveSpans) - requireFloatBucketsMatch(t, h1.PositiveBuckets, h2.PositiveBuckets) - } - } - } - default: - require.Fail(t, "unexpected value type", "type: %v", expected.Value.Type()) - } -} - -func requireInEpsilonIfNotZero(t testing.TB, expected, actual float64, msgAndArgs ...interface{}) { - if expected == 0 { - require.Equal(t, expected, actual, msgAndArgs...) - } else { - require.InEpsilon(t, expected, actual, 1e-10, msgAndArgs...) - } -} - -func requireFloatBucketsMatch(t testing.TB, b1, b2 []float64) { - require.Equal(t, len(b1), len(b2), "bucket lengths match") - for i, b := range b1 { - require.InEpsilon(t, b, b2[i], 1e-10, "bucket values match") - } -} - -func requireSpansMatch(t testing.TB, s1, s2 []histogram.Span) { - require.Equal(t, len(s1), len(s2), "number of spans") - for i := range s1 { - require.Equal(t, s1[i].Length, s2[i].Length, "Span lengths match") - require.Equal(t, s1[i].Offset, s2[i].Offset, "Span offsets match") - } -} - -func sortVector(v promql.Vector) { - slices.SortFunc(v, func(a, b promql.Sample) int { - return labels.Compare(a.Metric, b.Metric) - }) -} - -// combinations generates all combinations of a given length from a slice of strings. -func combinations(arr []string, length int) [][]string { - if length < 0 || length > len(arr) { - panic("Invalid length requested") - } - return combine(arr, length, 0) -} - -func combine(arr []string, length int, start int) [][]string { - if length == 0 { - return [][]string{{}} - } - result := [][]string{} - for i := start; i <= len(arr)-length; i++ { - for _, suffix := range combine(arr, length-1, i+1) { - combination := append([]string{arr[i]}, suffix...) - result = append(result, combination) - } - } - return result -} From d9c17de1a3f1379ba52adedb4f73d6049bd9f110 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Fri, 20 Sep 2024 10:53:43 +1000 Subject: [PATCH 20/23] Actually add files --- pkg/streamingpromql/testutils/utils.go | 153 ++++++++++++++++++ .../utils_test.go} | 6 +- 2 files changed, 156 insertions(+), 3 deletions(-) create mode 100644 pkg/streamingpromql/testutils/utils.go rename pkg/streamingpromql/{testing_test.go => testutils/utils_test.go} (92%) diff --git a/pkg/streamingpromql/testutils/utils.go b/pkg/streamingpromql/testutils/utils.go new file mode 100644 index 00000000000..18d00639a77 --- /dev/null +++ b/pkg/streamingpromql/testutils/utils.go @@ -0,0 +1,153 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package testutils + +import ( + "testing" + + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" +) + +// Why do we do this rather than require.Equal(t, expected, actual)? +// It's possible that floating point values are slightly different due to imprecision, but require.Equal doesn't allow us to set an allowable difference. +func RequireEqualResults(t testing.TB, expr string, expected, actual *promql.Result, checkAnnotations bool) { + require.Equal(t, expected.Err, actual.Err) + require.Equal(t, expected.Value.Type(), actual.Value.Type()) + + if checkAnnotations { + expectedWarnings, expectedInfos := expected.Warnings.AsStrings(expr, 0, 0) + actualWarnings, actualInfos := actual.Warnings.AsStrings(expr, 0, 0) + require.ElementsMatch(t, expectedWarnings, actualWarnings) + require.ElementsMatch(t, expectedInfos, actualInfos) + } + + switch expected.Value.Type() { + case parser.ValueTypeVector: + expectedVector, err := expected.Vector() + require.NoError(t, err) + actualVector, err := actual.Vector() + require.NoError(t, err) + + // Instant queries don't guarantee any particular sort order, so sort results here so that we can easily compare them. + sortVector(expectedVector) + sortVector(actualVector) + + require.Len(t, actualVector, len(expectedVector)) + + for i, expectedSample := range expectedVector { + actualSample := actualVector[i] + + require.Equal(t, expectedSample.Metric, actualSample.Metric) + require.Equal(t, expectedSample.T, actualSample.T) + require.Equal(t, expectedSample.H, actualSample.H) + requireInEpsilonIfNotZero(t, expectedSample.F, actualSample.F) + } + case parser.ValueTypeMatrix: + expectedMatrix, err := expected.Matrix() + require.NoError(t, err) + actualMatrix, err := actual.Matrix() + require.NoError(t, err) + + require.Len(t, actualMatrix, len(expectedMatrix)) + + for i, expectedSeries := range expectedMatrix { + actualSeries := actualMatrix[i] + + require.Equal(t, expectedSeries.Metric, actualSeries.Metric) + + for j, expectedPoint := range expectedSeries.Floats { + actualPoint := actualSeries.Floats[j] + + require.Equal(t, expectedPoint.T, actualPoint.T) + requireInEpsilonIfNotZero(t, expectedPoint.F, actualPoint.F, "expected series %v to have points %v, but result is %v", expectedSeries.Metric.String(), expectedSeries.Floats, actualSeries.Floats) + } + + for j, expectedPoint := range actualSeries.Histograms { + actualPoint := actualSeries.Histograms[j] + + require.Equal(t, expectedPoint.T, actualPoint.T) + if expectedPoint.H == nil { + require.Equal(t, expectedPoint.H, actualPoint.H) + } else { + h1 := expectedPoint.H + h2 := actualPoint.H + + require.Equal(t, h1.Schema, h2.Schema, "histogram schemas match") + requireInEpsilonIfNotZero(t, h1.Count, h2.Count, "histogram counts match") + requireInEpsilonIfNotZero(t, h1.Sum, h2.Sum, "histogram sums match") + + if h1.UsesCustomBuckets() { + requireFloatBucketsMatch(t, h1.CustomValues, h2.CustomValues) + } + + requireInEpsilonIfNotZero(t, h1.ZeroThreshold, h2.ZeroThreshold, "histogram thresholds match") + requireInEpsilonIfNotZero(t, h1.ZeroCount, h2.ZeroCount, "histogram zero counts match") + + requireSpansMatch(t, h1.NegativeSpans, h2.NegativeSpans) + requireFloatBucketsMatch(t, h1.NegativeBuckets, h2.NegativeBuckets) + + requireSpansMatch(t, h1.PositiveSpans, h2.PositiveSpans) + requireFloatBucketsMatch(t, h1.PositiveBuckets, h2.PositiveBuckets) + } + } + } + default: + require.Fail(t, "unexpected value type", "type: %v", expected.Value.Type()) + } +} + +func requireInEpsilonIfNotZero(t testing.TB, expected, actual float64, msgAndArgs ...interface{}) { + if expected == 0 { + require.Equal(t, expected, actual, msgAndArgs...) + } else { + require.InEpsilon(t, expected, actual, 1e-10, msgAndArgs...) + } +} + +func requireFloatBucketsMatch(t testing.TB, b1, b2 []float64) { + require.Equal(t, len(b1), len(b2), "bucket lengths match") + for i, b := range b1 { + require.InEpsilon(t, b, b2[i], 1e-10, "bucket values match") + } +} + +func requireSpansMatch(t testing.TB, s1, s2 []histogram.Span) { + require.Equal(t, len(s1), len(s2), "number of spans") + for i := range s1 { + require.Equal(t, s1[i].Length, s2[i].Length, "Span lengths match") + require.Equal(t, s1[i].Offset, s2[i].Offset, "Span offsets match") + } +} + +func sortVector(v promql.Vector) { + slices.SortFunc(v, func(a, b promql.Sample) int { + return labels.Compare(a.Metric, b.Metric) + }) +} + +// Combinations generates all Combinations of a given length from a slice of strings. +func Combinations(arr []string, length int) [][]string { + if length < 0 || length > len(arr) { + panic("Invalid length requested") + } + return combine(arr, length, 0) +} + +func combine(arr []string, length int, start int) [][]string { + if length == 0 { + return [][]string{{}} + } + result := [][]string{} + for i := start; i <= len(arr)-length; i++ { + for _, suffix := range combine(arr, length-1, i+1) { + combination := append([]string{arr[i]}, suffix...) + result = append(result, combination) + } + } + return result +} diff --git a/pkg/streamingpromql/testing_test.go b/pkg/streamingpromql/testutils/utils_test.go similarity index 92% rename from pkg/streamingpromql/testing_test.go rename to pkg/streamingpromql/testutils/utils_test.go index 1576126705d..77f4b2a3a13 100644 --- a/pkg/streamingpromql/testing_test.go +++ b/pkg/streamingpromql/testutils/utils_test.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-only -package streamingpromql +package testutils import ( "testing" @@ -59,12 +59,12 @@ func TestCombinations(t *testing.T) { t.Run(tName, func(t *testing.T) { if test.panics { f := func() { - combinations(test.input, test.length) + Combinations(test.input, test.length) } require.Panics(t, f) return } - output := combinations(test.input, test.length) + output := Combinations(test.input, test.length) require.Equal(t, test.expected, output) }) } From 61a8b8a4e1faa01dac14ae70298150fcc1d48c2e Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Fri, 20 Sep 2024 10:58:30 +1000 Subject: [PATCH 21/23] Move metric names back --- pkg/streamingpromql/operators/aggregation.go | 4 ++-- pkg/streamingpromql/operators/aggregation_test.go | 2 +- pkg/streamingpromql/operators/function_over_range_vector.go | 4 ++-- pkg/streamingpromql/{types => operators}/metric_names.go | 5 +++-- 4 files changed, 8 insertions(+), 7 deletions(-) rename pkg/streamingpromql/{types => operators}/metric_names.go (81%) diff --git a/pkg/streamingpromql/operators/aggregation.go b/pkg/streamingpromql/operators/aggregation.go index 709139a8437..359d84152d5 100644 --- a/pkg/streamingpromql/operators/aggregation.go +++ b/pkg/streamingpromql/operators/aggregation.go @@ -36,7 +36,7 @@ type Aggregation struct { Annotations *annotations.Annotations - metricNames *types.MetricNames + metricNames *MetricNames currentSeriesIndex int expressionPosition posrange.PositionRange @@ -79,7 +79,7 @@ func NewAggregation( Without: without, MemoryConsumptionTracker: memoryConsumptionTracker, Annotations: annotations, - metricNames: &types.MetricNames{}, + metricNames: &MetricNames{}, expressionPosition: expressionPosition, aggregationGroupFactory: opGroupFactory, } diff --git a/pkg/streamingpromql/operators/aggregation_test.go b/pkg/streamingpromql/operators/aggregation_test.go index e7aa3e09645..beaf605ea69 100644 --- a/pkg/streamingpromql/operators/aggregation_test.go +++ b/pkg/streamingpromql/operators/aggregation_test.go @@ -81,7 +81,7 @@ func TestAggregation_ReturnsGroupsFinishedFirstEarliest(t *testing.T) { aggregator := &Aggregation{ Inner: &testOperator{series: testCase.inputSeries}, Grouping: testCase.grouping, - metricNames: &types.MetricNames{}, + metricNames: &MetricNames{}, aggregationGroupFactory: func() aggregations.AggregationGroup { return &aggregations.SumAggregationGroup{} }, } diff --git a/pkg/streamingpromql/operators/function_over_range_vector.go b/pkg/streamingpromql/operators/function_over_range_vector.go index eea7d7d9c29..9ebce607d41 100644 --- a/pkg/streamingpromql/operators/function_over_range_vector.go +++ b/pkg/streamingpromql/operators/function_over_range_vector.go @@ -26,7 +26,7 @@ type FunctionOverRangeVector struct { Annotations *annotations.Annotations - metricNames *types.MetricNames + metricNames *MetricNames currentSeriesIndex int numSteps int @@ -61,7 +61,7 @@ func NewFunctionOverRangeVector( } if f.NeedsSeriesNamesForAnnotations { - o.metricNames = &types.MetricNames{} + o.metricNames = &MetricNames{} } o.emitAnnotationFunc = o.emitAnnotation // This is an optimisation to avoid creating the EmitAnnotationFunc instance on every usage. diff --git a/pkg/streamingpromql/types/metric_names.go b/pkg/streamingpromql/operators/metric_names.go similarity index 81% rename from pkg/streamingpromql/types/metric_names.go rename to pkg/streamingpromql/operators/metric_names.go index 352ba459713..c6d84c0ce2f 100644 --- a/pkg/streamingpromql/types/metric_names.go +++ b/pkg/streamingpromql/operators/metric_names.go @@ -1,8 +1,9 @@ // SPDX-License-Identifier: AGPL-3.0-only -package types +package operators import ( + "github.com/grafana/mimir/pkg/streamingpromql/types" "github.com/prometheus/prometheus/model/labels" ) @@ -14,7 +15,7 @@ type MetricNames struct { names []string } -func (n *MetricNames) CaptureMetricNames(metadata []SeriesMetadata) { +func (n *MetricNames) CaptureMetricNames(metadata []types.SeriesMetadata) { n.names = make([]string, len(metadata)) for i, series := range metadata { From f23e4a9d507769fec3df3b8144736e85dd9936a1 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Fri, 20 Sep 2024 11:05:04 +1000 Subject: [PATCH 22/23] Add comment --- pkg/streamingpromql/engine_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 1ee2b156b6c..5b431a2792e 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -1791,8 +1791,11 @@ func TestCompareVariousMixedMetrics(t *testing.T) { expressions = append(expressions, fmt.Sprintf(`%s by (group) (series{label=~"(%s)"})`, aggFunc, labelRegex)) expressions = append(expressions, fmt.Sprintf(`%s without (group) (series{label=~"(%s)"})`, aggFunc, labelRegex)) } + // Multiple range-vector times are used to check lookbacks that only select single points, multiple points, and boundaries. expressions = append(expressions, fmt.Sprintf(`rate(series{label=~"(%s)"}[45s])`, labelRegex)) + expressions = append(expressions, fmt.Sprintf(`rate(series{label=~"(%s)"}[1m])`, labelRegex)) expressions = append(expressions, fmt.Sprintf(`avg(rate(series{label=~"(%s)"}[2m15s]))`, labelRegex)) + expressions = append(expressions, fmt.Sprintf(`avg(rate(series{label=~"(%s)"}[5m]))`, labelRegex)) } timeRanges := []struct { From 1b4497a4631152579bc0eed656c574a81ce63c93 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Fri, 20 Sep 2024 11:09:17 +1000 Subject: [PATCH 23/23] Fix lint --- pkg/streamingpromql/operators/metric_names.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/streamingpromql/operators/metric_names.go b/pkg/streamingpromql/operators/metric_names.go index c6d84c0ce2f..9980a406221 100644 --- a/pkg/streamingpromql/operators/metric_names.go +++ b/pkg/streamingpromql/operators/metric_names.go @@ -3,8 +3,9 @@ package operators import ( - "github.com/grafana/mimir/pkg/streamingpromql/types" "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/mimir/pkg/streamingpromql/types" ) // MetricNames captures and stores the metric names of each series for later use in an operator.