Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: Add tests for various mixed data and edge cases #9281

Merged
merged 23 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* [CHANGE] Query-scheduler: Remove the experimental `-query-scheduler.use-multi-algorithm-query-queue` flag. The new multi-algorithm tree queue is always used for the scheduler. #9210
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
78 changes: 2 additions & 76 deletions pkg/streamingpromql/benchmarks/comparison_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"math"
"os"
"slices"
"testing"
"time"

Expand All @@ -23,7 +22,6 @@ import (
"github.com/grafana/dskit/user"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -68,7 +66,7 @@ func BenchmarkQuery(b *testing.B) {
prometheusResult, prometheusClose := c.Run(ctx, b, start, end, interval, prometheusEngine, q)
mimirResult, mimirClose := c.Run(ctx, b, start, end, interval, mimirEngine, q)

requireEqualResults(b, c.Expr, prometheusResult, mimirResult)
streamingpromql.RequireEqualResults(b, c.Expr, prometheusResult, mimirResult, true)

prometheusClose()
mimirClose()
Expand Down Expand Up @@ -109,7 +107,7 @@ func TestBothEnginesReturnSameResultsForBenchmarkQueries(t *testing.T) {
prometheusResult, prometheusClose := c.Run(ctx, t, start, end, interval, prometheusEngine, q)
mimirResult, mimirClose := c.Run(ctx, t, start, end, interval, mimirEngine, q)

requireEqualResults(t, c.Expr, prometheusResult, mimirResult)
streamingpromql.RequireEqualResults(t, c.Expr, prometheusResult, mimirResult, true)

prometheusClose()
mimirClose()
Expand Down Expand Up @@ -175,72 +173,6 @@ func TestBenchmarkSetup(t *testing.T) {
require.Equal(t, 18.4, series.Histograms[0].H.Sum)
}

// Why do we do this rather than require.Equal(t, expected, actual)?
// It's possible that floating point values are slightly different due to imprecision, but require.Equal doesn't allow us to set an allowable difference.
func requireEqualResults(t testing.TB, expr string, expected, actual *promql.Result) {
require.Equal(t, expected.Err, actual.Err)
require.Equal(t, expected.Value.Type(), actual.Value.Type())

expectedWarnings, expectedInfos := expected.Warnings.AsStrings(expr, 0, 0)
actualWarnings, actualInfos := actual.Warnings.AsStrings(expr, 0, 0)
require.ElementsMatch(t, expectedWarnings, actualWarnings)
require.ElementsMatch(t, expectedInfos, actualInfos)

switch expected.Value.Type() {
case parser.ValueTypeVector:
expectedVector, err := expected.Vector()
require.NoError(t, err)
actualVector, err := actual.Vector()
require.NoError(t, err)

// Instant queries don't guarantee any particular sort order, so sort results here so that we can easily compare them.
sortVector(expectedVector)
sortVector(actualVector)

require.Len(t, actualVector, len(expectedVector))

for i, expectedSample := range expectedVector {
actualSample := actualVector[i]

require.Equal(t, expectedSample.Metric, actualSample.Metric)
require.Equal(t, expectedSample.T, actualSample.T)
require.Equal(t, expectedSample.H, actualSample.H)
if expectedSample.F == 0 {
require.Equal(t, expectedSample.F, actualSample.F)
} else {
require.InEpsilon(t, expectedSample.F, actualSample.F, 1e-10)
}
}
case parser.ValueTypeMatrix:
expectedMatrix, err := expected.Matrix()
require.NoError(t, err)
actualMatrix, err := actual.Matrix()
require.NoError(t, err)

require.Len(t, actualMatrix, len(expectedMatrix))

for i, expectedSeries := range expectedMatrix {
actualSeries := actualMatrix[i]

require.Equal(t, expectedSeries.Metric, actualSeries.Metric)
require.Equal(t, expectedSeries.Histograms, actualSeries.Histograms)

for j, expectedPoint := range expectedSeries.Floats {
actualPoint := actualSeries.Floats[j]

require.Equal(t, expectedPoint.T, actualPoint.T)
if expectedPoint.F == 0 {
require.Equal(t, expectedPoint.F, actualPoint.F)
} else {
require.InEpsilonf(t, expectedPoint.F, actualPoint.F, 1e-10, "expected series %v to have points %v, but result is %v", expectedSeries.Metric.String(), expectedSeries.Floats, actualSeries.Floats)
}
}
}
default:
require.Fail(t, "unexpected value type", "type: %v", expected.Value.Type())
}
}

func createBenchmarkQueryable(t testing.TB, metricSizes []int) storage.Queryable {
addr := os.Getenv("MIMIR_PROMQL_ENGINE_BENCHMARK_INGESTER_ADDR")

Expand Down Expand Up @@ -317,9 +249,3 @@ type alwaysQueryIngestersConfigProvider struct{}
func (a alwaysQueryIngestersConfigProvider) QueryIngestersWithin(string) time.Duration {
return time.Duration(math.MaxInt64)
}

func sortVector(v promql.Vector) {
slices.SortFunc(v, func(a, b promql.Sample) int {
return labels.Compare(a.Metric, b.Metric)
})
}
150 changes: 150 additions & 0 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1628,6 +1628,26 @@ func TestAnnotations(t *testing.T) {
`PromQL info: metric might not be a counter, name does not end in _total/_sum/_count/_bucket: "other_float_metric" (1:99)`,
},
},
"no rate annotation when sample ends with histogram": {
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
data: `
series 3 1 {{schema:3 sum:12 count:7 buckets:[2 2 3]}}
`,
expr: "rate(series[45s])",
expectedWarningAnnotations: []string{},
expectedInfoAnnotations: []string{},
// This can be removed once https://github.com/prometheus/prometheus/pull/14910 is vendored.
skipComparisonWithPrometheusReason: "Prometheus only considers the type of the last point in the vector selector rather than the output value",
},
"no rate annotation when only 1 point in range vector": {
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
data: `
series 1
`,
expr: "rate(series[1m])",
expectedWarningAnnotations: []string{},
expectedInfoAnnotations: []string{},
// This can be removed once https://github.com/prometheus/prometheus/pull/14910 is vendored.
skipComparisonWithPrometheusReason: "Prometheus only considers the type of the last point in the vector selector rather than the output value",
},
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
}

opts := NewTestEngineOpts()
Expand Down Expand Up @@ -1682,3 +1702,133 @@ func TestAnnotations(t *testing.T) {
})
}
}

func TestCompareVariousMixedMetrics(t *testing.T) {
// Although most tests are covered with the promql test files (both ours and upstream),
// there is a lot of repetition around a few edge cases.
// This is not intended to be comprehensive, but instead check for some common edge cases
// ensuring MQE and Prometheus' engines return the same result when querying:
// - Series with mixed floats and histograms
// - Aggregations with mixed data types
// - Points with NaN
// - Stale markers
// - Look backs

opts := NewTestEngineOpts()
mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

prometheusEngine := promql.NewEngine(opts.CommonOpts)

// We're loading series with the follow combinations of values. This is difficult to visually see in the actual
// data loaded, so it is represented in a table here.
// f = float value, h = native histogram, _ = no value, N = NaN, s = stale
// {a} f f f f f f
// {b} h h h h h h
// {c} f h f h N h
// {d} f _ _ s f f
// {e} h h _ s h N
// {f} f N _ f f N
// {g} N N N N N N
// {h} N N N _ N s
// {i} f h _ N h s
// {j} f f s s s s
// {k} 0 0 0 N s 0
// {l} h _ f _ s N
// {m} s s N _ _ f
// {n} _ _ _ _ _ _

pointsPerSeries := 6
samples := `
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
series{label="a", group="a"} 1 2 3 4 5 -50
series{label="b", group="a"} {{schema:1 sum:15 count:10 buckets:[3 2 5 7 9]}} {{schema:2 sum:20 count:15 buckets:[4]}} {{schema:3 sum:25 count:20 buckets:[5 8]}} {{schema:4 sum:30 count:25 buckets:[6 9 10 11]}} {{schema:5 sum:35 count:30 buckets:[7 10 13]}} {{schema:6 sum:40 count:35 buckets:[8 11 14]}}
series{label="c", group="a"} 1 {{schema:3 sum:5 count:3 buckets:[1 1 1]}} 3 {{schema:3 sum:10 count:6 buckets:[2 2 2]}} NaN {{schema:3 sum:12 count:7 buckets:[2 2 3]}}
series{label="d", group="a"} 1 _ _ stale 5 6
series{label="e", group="b"} {{schema:4 sum:12 count:8 buckets:[2 3 3]}} {{schema:4 sum:14 count:9 buckets:[3 3 3]}} _ stale {{schema:4 sum:18 count:11 buckets:[4 4 3]}} NaN
series{label="f", group="b"} 1 NaN _ 4 5 NaN
series{label="g", group="b"} NaN NaN NaN NaN NaN NaN
series{label="h", group="b"} NaN NaN NaN _ NaN stale
series{label="i", group="c"} 1 {{schema:5 sum:15 count:10 buckets:[3 2 5]}} _ NaN {{schema:2 sum:30 count:25 buckets:[6 9 10 9 1]}} stale
series{label="j", group="c"} 1 -20 stale stale stale stale
series{label="k", group="c"} 0 0 0 NaN stale 0
series{label="l", group="d"} {{schema:1 sum:10 count:5 buckets:[1 2]}} _ 3 _ stale NaN
series{label="m", group="d"} stale stale NaN _ _ 4
series{label="n", group="d"}
`

// Labels for generating combinations
labels := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"}

// Generate combinations of 2 and 3 labels. (e.g., "a,b", "e,f", "c,d,e" etc)
// These will be used for binary operations, so we can add up to 3 series.
labelCombinations := combinations(labels, 2)
labelCombinations = append(labelCombinations, combinations(labels, 3)...)

expressions := []string{}

// Binary operations
for _, labels := range labelCombinations {
if len(labels) >= 2 {
for _, op := range []string{"+", "-", "*", "/"} {
binaryExpr := fmt.Sprintf(`series{label="%s"}`, labels[0])
for _, label := range labels[1:] {
binaryExpr += fmt.Sprintf(` %s series{label="%s"}`, op, label)
}
expressions = append(expressions, binaryExpr)
}
}
}

// For aggregations, also add combinations of 4 labels. (e.g., "a,b,c,d", "c,d,e,f" etc)
labelCombinations = append(labelCombinations, combinations(labels, 4)...)

for _, labels := range labelCombinations {
labelRegex := strings.Join(labels, "|")
// Aggregations
for _, aggFunc := range []string{"sum", "avg", "min", "max"} {
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"})`, aggFunc, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s by (group) (series{label=~"(%s)"})`, aggFunc, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s without (group) (series{label=~"(%s)"})`, aggFunc, labelRegex))
}
expressions = append(expressions, fmt.Sprintf(`rate(series{label=~"(%s)"}[45s])`, labelRegex))
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
expressions = append(expressions, fmt.Sprintf(`avg(rate(series{label=~"(%s)"}[2m15s]))`, labelRegex))
}

timeRanges := []struct {
loadStep int
interval time.Duration
}{
{loadStep: 1, interval: 1 * time.Minute},
{loadStep: 1, interval: 6 * time.Minute},
{loadStep: 1, interval: 5 * time.Minute},
{loadStep: 6, interval: 6 * time.Minute},
{loadStep: 6, interval: 5 * time.Minute},
}

for _, tr := range timeRanges {
start := timestamp.Time(0)
end := start.Add(time.Duration(pointsPerSeries*tr.loadStep) * time.Minute) // Deliberately queries 1 step past the final loaded point

storage := promqltest.LoadedStorage(t, fmt.Sprintf("load %dm", tr.loadStep)+samples)
t.Cleanup(func() { require.NoError(t, storage.Close()) })

for _, expr := range expressions {
testName := fmt.Sprintf("Expr: %s, Start: %d, End: %d, Interval: %s", expr, start.Unix(), end.Unix(), tr.interval)
t.Run(testName, func(t *testing.T) {
q, err := prometheusEngine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, tr.interval)
require.NoError(t, err)
defer q.Close()
expectedResults := q.Exec(context.Background())

q, err = mimirEngine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, tr.interval)
require.NoError(t, err)
defer q.Close()
mimirResults := q.Exec(context.Background())

// We currently omit checking the annotations due to a difference between the engines.
// This can be re-enabled once https://github.com/prometheus/prometheus/pull/14910 is vendored.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned offline - given prometheus/prometheus#14910 has been merged, could we cherry-pick it into mimir-prometheus and then vendor that into Mimir so we don't need these workarounds without needing to do the pending Prometheus 3.0 merge?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets deal with this separately as I think it's a bigger and more complicated task and we have a way forward here for now.

RequireEqualResults(t, expr, expectedResults, mimirResults, false)
})
}
}
}
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operators/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Aggregation struct {

Annotations *annotations.Annotations

metricNames *MetricNames
metricNames *types.MetricNames
currentSeriesIndex int

expressionPosition posrange.PositionRange
Expand Down Expand Up @@ -79,7 +79,7 @@ func NewAggregation(
Without: without,
MemoryConsumptionTracker: memoryConsumptionTracker,
Annotations: annotations,
metricNames: &MetricNames{},
metricNames: &types.MetricNames{},
expressionPosition: expressionPosition,
aggregationGroupFactory: opGroupFactory,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamingpromql/operators/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestAggregation_ReturnsGroupsFinishedFirstEarliest(t *testing.T) {
aggregator := &Aggregation{
Inner: &testOperator{series: testCase.inputSeries},
Grouping: testCase.grouping,
metricNames: &MetricNames{},
metricNames: &types.MetricNames{},
aggregationGroupFactory: func() aggregations.AggregationGroup { return &aggregations.SumAggregationGroup{} },
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operators/function_over_range_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type FunctionOverRangeVector struct {

Annotations *annotations.Annotations

metricNames *MetricNames
metricNames *types.MetricNames
currentSeriesIndex int

numSteps int
Expand Down Expand Up @@ -61,7 +61,7 @@ func NewFunctionOverRangeVector(
}

if f.NeedsSeriesNamesForAnnotations {
o.metricNames = &MetricNames{}
o.metricNames = &types.MetricNames{}
}

o.emitAnnotationFunc = o.emitAnnotation // This is an optimisation to avoid creating the EmitAnnotationFunc instance on every usage.
Expand Down
13 changes: 12 additions & 1 deletion pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,10 @@ func (q *Query) Exec(ctx context.Context) *promql.Result {
return &promql.Result{Err: compat.NewNotSupportedError(fmt.Sprintf("unsupported result type %s", parser.DocumentedType(q.statement.Expr.Type())))}
}

q.result.Warnings = *q.annotations
// To make comparing to Prometheus' engine easier, only return Annotations if there are some, otherwise, return nil.
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
if len(*q.annotations) > 0 {
q.result.Warnings = *q.annotations
}

return q.result
}
Expand Down Expand Up @@ -547,6 +550,10 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o t
}

func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o types.InstantVectorOperator, series []types.SeriesMetadata) (promql.Matrix, error) {
if len(series) == 0 {
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
return nil, nil
}

m := types.GetMatrix(len(series))

for i, s := range series {
Expand All @@ -571,6 +578,10 @@ func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o t
})
}

if len(m) == 0 {
return nil, nil
}

slices.SortFunc(m, func(a, b promql.Series) int {
return labels.Compare(a.Metric, b.Metric)
})
Expand Down
Loading
Loading