From bedc55ecc3da6ea1d8d08425f3eb80aa572ba301 Mon Sep 17 00:00:00 2001 From: Julien Duchesne Date: Tue, 28 Jan 2025 16:06:05 -0500 Subject: [PATCH] Address PR comments + fixes - Support offsets - Disable `@` - Improve tests. Run each query with a different offset each time - Add new test cases with `offset x` - Add new test case with a long range (more than 11000 steps). It has to be split into multiple range queries - Allow setting query path in frontend arg (instead of hardcoding `/prometheus/api/v1/query_range`) --- .../astmapper/subquery_spin_off.go | 9 +- .../astmapper/subquery_spin_off_test.go | 29 +- pkg/frontend/querymiddleware/codec.go | 4 +- .../querymiddleware/spin_off_subqueries.go | 25 +- .../spin_off_subqueries_queryable.go | 46 +- .../spin_off_subqueries_test.go | 594 ++++-------------- 6 files changed, 186 insertions(+), 521 deletions(-) diff --git a/pkg/frontend/querymiddleware/astmapper/subquery_spin_off.go b/pkg/frontend/querymiddleware/astmapper/subquery_spin_off.go index 7c9d3943fe..e076c28355 100644 --- a/pkg/frontend/querymiddleware/astmapper/subquery_spin_off.go +++ b/pkg/frontend/querymiddleware/astmapper/subquery_spin_off.go @@ -90,8 +90,8 @@ func (m *subquerySpinOffMapper) MapExpr(expr parser.Expr) (mapped parser.Expr, f // The last argument will typically contain the subquery in an aggregation function // Examples: last_over_time([5m:]) or quantile_over_time(0.5, [5m:]) if sq, ok := e.Args[lastArgIdx].(*parser.SubqueryExpr); ok { - // Filter out subqueries with offsets, not supported yet - if sq.OriginalOffset > 0 { + // @ is not supported + if sq.StartOrEnd != 0 || sq.Timestamp != nil { return downstreamQuery(expr) } @@ -135,6 +135,11 @@ func (m *subquerySpinOffMapper) MapExpr(expr parser.Expr) (mapped parser.Expr, f }, } + if sq.OriginalOffset != 0 { + selector.LabelMatchers = append(selector.LabelMatchers, labels.MustNewMatcher(labels.MatchEqual, SubqueryOffsetLabelName, sq.OriginalOffset.String())) + selector.OriginalOffset = sq.OriginalOffset + } + e.Args[lastArgIdx] = &parser.MatrixSelector{ VectorSelector: selector, Range: sq.Range, diff --git a/pkg/frontend/querymiddleware/astmapper/subquery_spin_off_test.go b/pkg/frontend/querymiddleware/astmapper/subquery_spin_off_test.go index df8db7f59e..05afe50d96 100644 --- a/pkg/frontend/querymiddleware/astmapper/subquery_spin_off_test.go +++ b/pkg/frontend/querymiddleware/astmapper/subquery_spin_off_test.go @@ -35,6 +35,27 @@ func TestSubquerySpinOffMapper(t *testing.T) { expectedSubqueries: 1, expectedDownstreamQueries: 0, }, + { + name: "@ end ignored", + in: `avg_over_time((foo * bar)[3d:1m] @ end())`, + out: `__downstream_query__{__query__="avg_over_time((foo * bar)[3d:1m] @ end())"}`, + expectedSubqueries: 0, + expectedDownstreamQueries: 1, + }, + { + name: "@ start ignored", + in: `avg_over_time((foo * bar)[3d:1m] @ start())`, + out: `__downstream_query__{__query__="avg_over_time((foo * bar)[3d:1m] @ start())"}`, + expectedSubqueries: 0, + expectedDownstreamQueries: 1, + }, + { + name: "@ ignored", + in: `avg_over_time((foo * bar)[3d:1m] @ 1738086610)`, + out: `__downstream_query__{__query__="avg_over_time((foo * bar)[3d:1m] @ 1738086610.000)"}`, + expectedSubqueries: 0, + expectedDownstreamQueries: 1, + }, { name: "range too short", in: `avg_over_time((foo * bar)[30m:1m])`, @@ -72,11 +93,11 @@ func TestSubquerySpinOffMapper(t *testing.T) { expectedDownstreamQueries: 0, }, { - name: "offsets aren't supported", + name: "with an offset", in: `avg_over_time((foo * bar)[3d:1m] offset 3d) * 2`, - out: `__downstream_query__{__query__="avg_over_time((foo * bar)[3d:1m] offset 3d)"} * 2`, - expectedSubqueries: 0, - expectedDownstreamQueries: 1, + out: `avg_over_time(__subquery_spinoff__{__offset__="72h0m0s",__query__="(foo * bar)",__range__="72h0m0s",__step__="1m0s"}[3d] offset 3d) * 2`, + expectedSubqueries: 1, + expectedDownstreamQueries: 0, }, { name: "aggregated query", diff --git a/pkg/frontend/querymiddleware/codec.go b/pkg/frontend/querymiddleware/codec.go index 4563a4ad23..63e7f7ae22 100644 --- a/pkg/frontend/querymiddleware/codec.go +++ b/pkg/frontend/querymiddleware/codec.go @@ -56,6 +56,8 @@ var ( prometheusCodecPropagateHeadersLabels = []string{api.ReadConsistencyOffsetsHeader} ) +const maxResolutionPoints = 11000 + const ( // statusSuccess Prometheus success result. statusSuccess = "success" @@ -532,7 +534,7 @@ func DecodeRangeQueryTimeParams(reqValues *url.Values) (start, end, step int64, // For safety, limit the number of returned points per timeseries. // This is sufficient for 60s resolution for a week or 1h resolution for a year. - if (end-start)/step > 11000 { + if (end-start)/step > maxResolutionPoints { return 0, 0, 0, errStepTooSmall } diff --git a/pkg/frontend/querymiddleware/spin_off_subqueries.go b/pkg/frontend/querymiddleware/spin_off_subqueries.go index 2d21791b3d..d10f3ed12b 100644 --- a/pkg/frontend/querymiddleware/spin_off_subqueries.go +++ b/pkg/frontend/querymiddleware/spin_off_subqueries.go @@ -257,23 +257,23 @@ func (s *spinOffSubqueriesMiddleware) Do(ctx context.Context, req MetricsQueryRe } func newSpinOffQueryHandler(codec Codec, logger log.Logger, sendURL string) (MetricsQueryHandler, error) { - baseURL, err := url.Parse(sendURL) + rangeQueryURL, err := url.Parse(sendURL) if err != nil { return nil, fmt.Errorf("invalid spin off URL: %w", err) } return &spinOffQueryHandler{ - codec: codec, - logger: logger, - baseURL: baseURL, + codec: codec, + logger: logger, + rangeQueryURL: rangeQueryURL, }, nil } // spinOffQueryHandler is a query handler that takes a request and sends it to a remote endpoint. type spinOffQueryHandler struct { - codec Codec - logger log.Logger - baseURL *url.URL + codec Codec + logger log.Logger + rangeQueryURL *url.URL } func (s *spinOffQueryHandler) Do(ctx context.Context, req MetricsQueryRequest) (Response, error) { @@ -282,7 +282,16 @@ func (s *spinOffQueryHandler) Do(ctx context.Context, req MetricsQueryRequest) ( return nil, fmt.Errorf("error encoding request: %w", err) } httpReq.RequestURI = "" // Reset RequestURI to force URL to be used in the request. - httpReq.URL = s.baseURL.ResolveReference(httpReq.URL) + // Override the URL with the configured range query URL (only parts that are set). + if s.rangeQueryURL.Scheme != "" { + httpReq.URL.Scheme = s.rangeQueryURL.Scheme + } + if s.rangeQueryURL.Host != "" { + httpReq.URL.Host = s.rangeQueryURL.Host + } + if s.rangeQueryURL.Path != "" { + httpReq.URL.Path = s.rangeQueryURL.Path + } if err := user.InjectOrgIDIntoHTTPRequest(ctx, httpReq); err != nil { return nil, fmt.Errorf("error injecting org ID into request: %v", err) diff --git a/pkg/frontend/querymiddleware/spin_off_subqueries_queryable.go b/pkg/frontend/querymiddleware/spin_off_subqueries_queryable.go index cc086af871..183705828b 100644 --- a/pkg/frontend/querymiddleware/spin_off_subqueries_queryable.go +++ b/pkg/frontend/querymiddleware/spin_off_subqueries_queryable.go @@ -6,7 +6,6 @@ import ( "context" "time" - "github.com/grafana/dskit/concurrency" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" @@ -93,6 +92,7 @@ func (q *spinOffSubqueriesQuerier) Select(ctx context.Context, _ bool, hints *st expr := values[astmapper.SubqueryQueryLabelName] rangeStr := values[astmapper.SubqueryRangeLabelName] stepStr := values[astmapper.SubqueryStepLabelName] + offsetStr := values[astmapper.SubqueryOffsetLabelName] if expr == "" || rangeStr == "" || stepStr == "" { return storage.ErrSeriesSet(errors.New("missing required labels for subquery")) } @@ -110,62 +110,64 @@ func (q *spinOffSubqueriesQuerier) Select(ctx context.Context, _ bool, hints *st if err != nil { return storage.ErrSeriesSet(errors.Wrap(err, "failed to parse subquery step")) } + var queryOffset time.Duration + if offsetStr == "" { + queryOffset = 0 + } else if queryOffset, err = time.ParseDuration(offsetStr); err != nil { + return storage.ErrSeriesSet(errors.Wrap(err, "failed to parse subquery offset")) + } - end := q.req.GetEnd() - // Align query to absolute time by querying slightly into the future. + end := q.req.GetEnd() - queryOffset.Milliseconds() + // Subqueries are aligned on absolute time, while range queries are aligned on relative time. + // To match the same behavior, we can query slightly into the future. + // The extra data points aren't used because the subquery is aggregated with an _over_time-style function. step := queryStep.Milliseconds() if end%step != 0 { end += step - (end % step) } - reqRange := q.req.GetEnd() - q.req.GetStart() // Calculate the earliest data point we need to query. - start := end - reqRange - queryRange.Milliseconds() - // Split queries into multiple smaller queries if they have more than 10000 datapoints + start := end - queryRange.Milliseconds() + + // Split queries into multiple smaller queries if they have more than 11000 datapoints rangeStart := start var rangeQueries []MetricsQueryRequest for { var rangeEnd int64 - if remainingPoints := (end - start) / step; remainingPoints > 10000 { - rangeEnd = start + 10000*step + if remainingPoints := (end - rangeStart) / step; remainingPoints > maxResolutionPoints { + rangeEnd = rangeStart + maxResolutionPoints*step } else { rangeEnd = end } headers := q.req.GetHeaders() headers = append(headers, - &PrometheusHeader{Name: "Content-Type", Values: []string{"application/json"}}, &PrometheusHeader{Name: "X-Mimir-Spun-Off-Subquery", Values: []string{"true"}}, + &PrometheusHeader{Name: "Content-Type", Values: []string{"application/x-www-form-urlencoded"}}, ) // Downstream is the querier, which is HTTP req. - newRangeRequest := NewPrometheusRangeQueryRequest("/prometheus"+queryRangePathSuffix, headers, rangeStart, rangeEnd, step, q.req.GetLookbackDelta(), queryExpr, q.req.GetOptions(), q.req.GetHints()) + newRangeRequest := NewPrometheusRangeQueryRequest(queryRangePathSuffix, headers, rangeStart, rangeEnd, step, q.req.GetLookbackDelta(), queryExpr, q.req.GetOptions(), q.req.GetHints()) rangeQueries = append(rangeQueries, newRangeRequest) if rangeEnd == end { break } + rangeStart = rangeEnd // Move the start to the end of the previous range. } - // Concurrently run each query. It breaks and cancels each worker context on first error. streams := make([][]SampleStream, len(rangeQueries)) - err = concurrency.ForEachJob(ctx, len(rangeQueries), len(rangeQueries), func(ctx context.Context, idx int) error { - req := rangeQueries[idx] - + for idx, req := range rangeQueries { resp, err := q.upstreamRangeHandler.Do(ctx, req) if err != nil { - return err + return storage.ErrSeriesSet(err) } promRes, ok := resp.(*PrometheusResponse) if !ok { - return errors.Errorf("error invalid response type: %T, expected: %T", resp, &PrometheusResponse{}) + return storage.ErrSeriesSet(errors.Errorf("error invalid response type: %T, expected: %T", resp, &PrometheusResponse{})) } resStreams, err := ResponseToSamples(promRes) if err != nil { - return err + return storage.ErrSeriesSet(err) } - streams[idx] = resStreams // No mutex is needed since each job writes its own index. This is like writing separate variables. + streams[idx] = resStreams q.annotationAccumulator.addInfos(promRes.Infos) q.annotationAccumulator.addWarnings(promRes.Warnings) - return nil - }) - if err != nil { - return storage.ErrSeriesSet(err) } return newSeriesSetFromEmbeddedQueriesResults(streams, hints) default: diff --git a/pkg/frontend/querymiddleware/spin_off_subqueries_test.go b/pkg/frontend/querymiddleware/spin_off_subqueries_test.go index 1c8806d0e9..adb34fe80a 100644 --- a/pkg/frontend/querymiddleware/spin_off_subqueries_test.go +++ b/pkg/frontend/querymiddleware/spin_off_subqueries_test.go @@ -44,196 +44,26 @@ func TestSubquerySpinOff_Correctness(t *testing.T) { expectedSpunOffSubqueries int expectedDownstreamQueries int expectSpecificOrder bool + expectEmptyResult bool }{ - "sum() no grouping": { - query: `sum(metric_counter)`, - expectedSkippedReason: "no-subquery", - }, - "sum() offset": { - query: `sum(metric_counter offset 5s)`, - expectedSkippedReason: "no-subquery", - }, - "sum() negative offset": { - query: `sum(metric_counter offset -5s)`, - expectedSkippedReason: "no-subquery", - }, - "sum() grouping 'by'": { - query: `sum by(group_1) (metric_counter)`, - expectedSkippedReason: "no-subquery", - }, - "sum() grouping 'without'": { - query: `sum without(unique) (metric_counter)`, - expectedSkippedReason: "no-subquery", - }, - "sum(rate()) no grouping": { - query: `sum(rate(metric_counter[1m]))`, - expectedSkippedReason: "no-subquery", - }, - "sum(rate()) grouping 'by'": { - query: `sum by(group_1) (rate(metric_counter[1m]))`, - expectedSkippedReason: "no-subquery", - }, - "sum(rate()) grouping 'without'": { - query: `sum without(unique) (rate(metric_counter[1m]))`, - expectedSkippedReason: "no-subquery", - }, - "sum(rate()) with no effective grouping because all groups have 1 series": { - query: `sum by(unique) (rate(metric_counter{group_1="0"}[1m]))`, - expectedSkippedReason: "no-subquery", - }, - `group by (group_1) (metric_counter)`: { - query: `group by (group_1) (metric_counter)`, - expectedSkippedReason: "no-subquery", - }, - `group by (group_1) (group by (group_1, group_2) (metric_counter))`: { - query: `group by (group_1) (group by (group_1, group_2) (metric_counter))`, - expectedSkippedReason: "no-subquery", - }, - `count by (group_1) (group by (group_1, group_2) (metric_counter))`: { - query: `count by (group_1) (group by (group_1, group_2) (metric_counter))`, - expectedSkippedReason: "no-subquery", - }, - "histogram_quantile() grouping only 'by' le": { - query: `histogram_quantile(0.5, sum by(le) (rate(metric_histogram_bucket[1m])))`, - expectedSkippedReason: "no-subquery", - }, - "histogram_quantile() grouping 'by'": { - query: `histogram_quantile(0.5, sum by(group_1, le) (rate(metric_histogram_bucket[1m])))`, - expectedSkippedReason: "no-subquery", - }, - "histogram_quantile() grouping 'without'": { - query: `histogram_quantile(0.5, sum without(group_1, group_2, unique) (rate(metric_histogram_bucket[1m])))`, - expectedSkippedReason: "no-subquery", - }, - "histogram_quantile() with no effective grouping because all groups have 1 series": { - query: `histogram_quantile(0.5, sum by(unique, le) (rate(metric_histogram_bucket{group_1="0"}[1m])))`, - expectedSkippedReason: "no-subquery", - }, - "min() no grouping": { - query: `min(metric_counter{group_1="0"})`, - expectedSkippedReason: "no-subquery", - }, - "min() grouping 'by'": { - query: `min by(group_2) (metric_counter{group_1="0"})`, - expectedSkippedReason: "no-subquery", - }, - "min() grouping 'without'": { - query: `min without(unique) (metric_counter{group_1="0"})`, - expectedSkippedReason: "no-subquery", - }, - "max() no grouping": { - query: `max(metric_counter{group_1="0"})`, - expectedSkippedReason: "no-subquery", - }, - "max() grouping 'by'": { - query: `max by(group_2) (metric_counter{group_1="0"})`, - expectedSkippedReason: "no-subquery", - }, - "max() grouping 'without'": { - query: `max without(unique) (metric_counter{group_1="0"})`, - expectedSkippedReason: "no-subquery", - }, - "count() no grouping": { - query: `count(metric_counter)`, - expectedSkippedReason: "no-subquery", - }, - "count() grouping 'by'": { - query: `count by(group_2) (metric_counter)`, - expectedSkippedReason: "no-subquery", - }, - "count() grouping 'without'": { - query: `count without(unique) (metric_counter)`, - expectedSkippedReason: "no-subquery", - }, - "sum(count())": { - query: `sum(count by(group_1) (metric_counter))`, - expectedSkippedReason: "no-subquery", - }, - "avg() no grouping": { - query: `avg(metric_counter)`, - expectedSkippedReason: "no-subquery", // avg() is parallelized as sum()/count(). - }, - "avg() grouping 'by'": { - query: `avg by(group_2) (metric_counter)`, - expectedSkippedReason: "no-subquery", // avg() is parallelized as sum()/count(). - }, - "avg() grouping 'without'": { - query: `avg without(unique) (metric_counter)`, - expectedSkippedReason: "no-subquery", // avg() is parallelized as sum()/count(). - }, - "sum(min_over_time())": { - query: `sum by (group_1, group_2) (min_over_time(metric_counter{const="fixed"}[2m]))`, - expectedSkippedReason: "no-subquery", - }, - "sum(max_over_time())": { - query: `sum by (group_1, group_2) (max_over_time(metric_counter{const="fixed"}[2m]))`, - expectedSkippedReason: "no-subquery", - }, - "sum(avg_over_time())": { - query: `sum by (group_1, group_2) (avg_over_time(metric_counter{const="fixed"}[2m]))`, - expectedSkippedReason: "no-subquery", - }, - "or": { - query: `sum(rate(metric_counter{group_1="0"}[1m])) or sum(rate(metric_counter{group_1="1"}[1m]))`, - expectedSkippedReason: "no-subquery", - }, - "and": { - query: ` - sum without(unique) (rate(metric_counter{group_1="0"}[1m])) - and - max without(unique) (metric_counter) > 0`, - expectedSkippedReason: "no-subquery", - }, - "sum(rate()) > avg(rate())": { - query: ` - sum(rate(metric_counter[1m])) - > - avg(rate(metric_counter[1m]))`, - expectedSkippedReason: "no-subquery", // avg() is parallelized as sum()/count(). - }, - "sum by(unique) * on (unique) group_left (group_1) avg by (unique, group_1)": { - // ensure that avg transformation into sum/count does not break label matching in previous binop. - query: ` - sum by(unique) (metric_counter) - * - on (unique) group_left (group_1) - avg by (unique, group_1) (metric_counter)`, - expectedSkippedReason: "no-subquery", - }, - "sum by (rate()) / 2 ^ 2": { - query: ` - sum by (group_1) (rate(metric_counter[1m])) / 2 ^ 2`, - expectedSkippedReason: "no-subquery", - }, - "sum by (rate()) / time() *2": { - query: ` - sum by (group_1) (rate(metric_counter[1m])) / time() *2`, - expectedSkippedReason: "no-subquery", - }, - "sum(rate()) / vector(3) ^ month()": { - query: `sum(rate(metric_counter[1m])) / vector(3) ^ month()`, - expectedSkippedReason: "no-subquery", - }, - "sum(rate(metric_counter[1m])) / vector(3) ^ vector(2) + sum(ln(metric_counter))": { - query: `sum(rate(metric_counter[1m])) / vector(3) ^ vector(2) + sum(ln(metric_counter))`, - expectedSkippedReason: "no-subquery", - }, - "nested count()": { + "skipped: no subquery": { query: `sum( count( count(metric_counter) by (group_1, group_2) ) by (group_1) )`, expectedSkippedReason: "no-subquery", + expectEmptyResult: true, }, - "subquery max: too short range": { + "skipped: subquery max: too short range": { query: `max_over_time( rate(metric_counter[1m]) [30m:1m] )`, expectedSkippedReason: "no-subquery", + expectEmptyResult: true, }, - "subquery max: too few steps": { + "skipped: subquery max: too few steps": { query: `max_over_time( rate(metric_counter[1m]) [2h:15m] @@ -247,6 +77,20 @@ func TestSubquerySpinOff_Correctness(t *testing.T) { )`, expectedSpunOffSubqueries: 1, }, + "subquery max: multiple range queries": { + query: `max_over_time( + rate(metric_counter[1m]) + [30d:1m] + )`, + expectedSpunOffSubqueries: 1, + }, + "subquery max with offset": { + query: `max_over_time( + rate(metric_counter[1m]) + [3d:1m] offset 1d + )`, + expectedSpunOffSubqueries: 1, + }, "subquery min": { query: `min_over_time( rate(metric_counter[1m]) @@ -258,6 +102,14 @@ func TestSubquerySpinOff_Correctness(t *testing.T) { query: `sum by(group_1) (min_over_time((changes(metric_counter[5m]))[3d:2m]))`, expectedSpunOffSubqueries: 1, }, + "sum of subquery min with small offset": { + query: `sum by(group_1) (min_over_time((changes(metric_counter[5m]))[3d:2m] offset 20s))`, + expectedSpunOffSubqueries: 1, + }, + "sum of subquery min with offset": { + query: `sum by(group_1) (min_over_time((changes(metric_counter[5m]))[3d:2m] offset 1d))`, + expectedSpunOffSubqueries: 1, + }, "triple subquery": { query: `max_over_time( stddev_over_time( @@ -272,69 +124,6 @@ func TestSubquerySpinOff_Correctness(t *testing.T) { query: `max_over_time( deriv( rate(metric_counter[10m])[5m:1m] )[3d:] )`, expectedSpunOffSubqueries: 1, }, - "@ modifier": { - query: `sum by (group_1)(rate(metric_counter[1h] @ end())) + sum by (group_1)(rate(metric_counter[1h] @ start()))`, - expectedSkippedReason: "no-subquery", - }, - "@ modifier and offset": { - query: `sum by (group_1)(rate(metric_counter[1h] @ end() offset 1m))`, - expectedSkippedReason: "no-subquery", - }, - "@ modifier and negative offset": { - query: `sum by (group_1)(rate(metric_counter[1h] @ start() offset -1m))`, - expectedSkippedReason: "no-subquery", - }, - "label_replace": { - query: `sum by (foo)( - label_replace( - rate(metric_counter{group_1="0"}[1m]), - "foo", "bar$1", "group_2", "(.*)" - ) - )`, - expectedSkippedReason: "no-subquery", - }, - "label_join": { - query: `sum by (foo)( - label_join( - rate(metric_counter{group_1="0"}[1m]), - "foo", ",", "group_1", "group_2", "const" - ) - )`, - expectedSkippedReason: "no-subquery", - }, - `query with sort() expects specific order`: { - query: `sort(sum(metric_histogram_bucket) by (le))`, - expectedSkippedReason: "no-subquery", - expectSpecificOrder: true, - }, - "scalar(aggregation)": { - query: `scalar(sum(metric_counter))`, - expectedSkippedReason: "no-subquery", - }, - `filtering binary operation with constant scalar`: { - query: `count(metric_counter > 0)`, - expectedSkippedReason: "no-subquery", - }, - `filtering binary operation of a function result with scalar`: { - query: `max_over_time(metric_counter[5m]) > 0`, - expectedSkippedReason: "no-subquery", - }, - `binary operation with an aggregation on one hand`: { - query: `sum(metric_counter) > 1`, - expectedSkippedReason: "no-subquery", - }, - `binary operation with an aggregation on the other hand`: { - query: `0 < sum(metric_counter)`, - expectedSkippedReason: "no-subquery", - }, - `binary operation with an aggregation by some label on one hand`: { - query: `count by (unique) (metric_counter) > 0`, - expectedSkippedReason: "no-subquery", - }, - `filtering binary operation with non constant`: { - query: `max by(unique) (max_over_time(metric_counter[5m])) > scalar(min(metric_counter))`, - expectedSkippedReason: "no-subquery", - }, "subquery min_over_time with aggr": { query: `min_over_time( sum by(group_1) ( @@ -343,181 +132,12 @@ func TestSubquerySpinOff_Correctness(t *testing.T) { )`, expectedSpunOffSubqueries: 1, }, - "outer subquery on top of sum": { - query: `sum(metric_counter) by (group_1)[5m:1m]`, - expectedSkippedReason: "no-subquery", - }, - "outer subquery on top of avg": { - query: `avg(metric_counter) by (group_1)[5m:1m]`, - expectedSkippedReason: "no-subquery", - }, - "stddev()": { - query: `stddev(metric_counter{const="fixed"})`, - expectedSkippedReason: "no-subquery", - }, - "stdvar()": { - query: `stdvar(metric_counter{const="fixed"})`, - expectedSkippedReason: "no-subquery", - }, - "topk()": { - query: `topk(2, metric_counter{const="fixed"})`, - expectedSkippedReason: "no-subquery", - }, - "bottomk()": { - query: `bottomk(2, metric_counter{const="fixed"})`, - expectedSkippedReason: "no-subquery", - }, - "vector()": { - query: `vector(1)`, - expectedSkippedReason: "no-subquery", - }, - "scalar(single metric)": { - query: `scalar(metric_counter{unique="1"})`, // Select a single metric. - expectedSkippedReason: "no-subquery", - }, - "histogram_quantile no grouping": { - query: fmt.Sprintf(`histogram_quantile(0.99, metric_histogram_bucket{unique="%d"})`, numSeries+10), // Select a single histogram metric. - expectedSkippedReason: "no-subquery", - }, - "histogram_quantile with inner aggregation": { - query: `sum by (group_1) (histogram_quantile(0.9, rate(metric_histogram_bucket[1m])))`, - expectedSkippedReason: "no-subquery", - }, - "histogram_quantile without aggregation": { - query: `histogram_quantile(0.5, rate(metric_histogram_bucket{group_1="0"}[1m]))`, - expectedSkippedReason: "no-subquery", - }, - `subqueries with non parallelizable function in children`: { - query: `max_over_time( - absent_over_time( - deriv( - rate(metric_counter[1m]) - [5m:1m]) - [2m:1m]) - [10m:1m] offset 25m)`, - expectedSkippedReason: "no-subquery", - }, - "string literal": { - query: `"test"`, - expectedSkippedReason: "no-subquery", - }, - "day_of_month() >= 1 and day_of_month()": { - query: `day_of_month() >= 1 and day_of_month()`, - expectedSkippedReason: "no-subquery", - }, - "month() >= 1 and month()": { - query: `month() >= 1 and month()`, - expectedSkippedReason: "no-subquery", - }, - "vector(1) > 0 and vector(1)": { - query: `vector(1) > 0 and vector(1)`, - expectedSkippedReason: "no-subquery", - }, - "sum(metric_counter) > 0 and vector(1)": { - query: `sum(metric_counter) > 0 and vector(1)`, - expectedSkippedReason: "no-subquery", - }, - "vector(1)": { - query: `vector(1)`, - expectedSkippedReason: "no-subquery", - }, - "time()": { - query: `time()`, - expectedSkippedReason: "no-subquery", - }, - "month(sum(metric_counter))": { - query: `month(sum(metric_counter))`, - expectedSkippedReason: "no-subquery", // Sharded because the contents of `sum()` is sharded. - }, - "month(sum(metric_counter)) > 0 and vector(1)": { - query: `month(sum(metric_counter)) > 0 and vector(1)`, - expectedSkippedReason: "no-subquery", // Sharded because the contents of `sum()` is sharded. - }, - "0 < bool 1": { - query: `0 < bool 1`, - expectedSkippedReason: "no-subquery", - }, - "scalar(metric_counter{const=\"fixed\"}) < bool 1": { - query: `scalar(metric_counter{const="fixed"}) < bool 1`, - expectedSkippedReason: "no-subquery", - }, - "scalar(sum(metric_counter)) < bool 1": { - query: `scalar(sum(metric_counter)) < bool 1`, - expectedSkippedReason: "no-subquery", - }, - // Summing floats and native histograms together makes no sense, see - // https://prometheus.io/docs/prometheus/latest/querying/operators/#operators-for-native-histograms - // so we exclude native histograms here and in some subsequent tests - `sum({__name__!=""}) excluding native histograms`: { - query: `sum({__name__!="",__name__!="metric_native_histogram"})`, - expectedSkippedReason: "no-subquery", - }, - `sum by (group_1) ({__name__!=""}) excluding native histograms`: { - query: `sum by (group_1) ({__name__!="",__name__!="metric_native_histogram"})`, - expectedSkippedReason: "no-subquery", - }, - `sum by (group_1) (count_over_time({__name__!=""}[1m])) excluding native histograms`: { - query: `sum by (group_1) (count_over_time({__name__!="",__name__!="metric_native_histogram"}[1m]))`, - expectedSkippedReason: "no-subquery", - }, - `sum(metric_native_histogram)`: { - query: `sum(metric_native_histogram)`, - expectedSkippedReason: "no-subquery", - }, - `sum by (group_1) (metric_native_histogram)`: { - query: `sum by (group_1) (metric_native_histogram)`, - expectedSkippedReason: "no-subquery", - }, - `sum by (group_1) (count_over_time(metric_native_histogram[1m]))`: { - query: `sum by (group_1) (count_over_time(metric_native_histogram[1m]))`, - expectedSkippedReason: "no-subquery", - }, - `count(metric_native_histogram)`: { - query: `count(metric_native_histogram)`, - expectedSkippedReason: "no-subquery", - }, - `count by (group_1) (metric_native_histogram)`: { - query: `count by (group_1) (metric_native_histogram)`, - expectedSkippedReason: "no-subquery", - }, - `count by (group_1) (count_over_time(metric_native_histogram[1m]))`: { - query: `count by (group_1) (count_over_time(metric_native_histogram[1m]))`, - expectedSkippedReason: "no-subquery", - }, - `histogram_sum(sum(metric_native_histogram))`: { - query: `histogram_sum(sum(metric_native_histogram))`, - expectedSkippedReason: "no-subquery", - }, - `histogram_count(sum(metric_native_histogram))`: { - query: `histogram_count(sum(metric_native_histogram))`, - expectedSkippedReason: "no-subquery", - }, - `histogram_quantile(0.5, sum(metric_native_histogram))`: { - query: `histogram_quantile(0.5, sum(metric_native_histogram))`, - expectedSkippedReason: "no-subquery", - }, - `histogram_fraction(0, 0.5, sum(metric_native_histogram))`: { - query: `histogram_fraction(0, 0.5, sum(metric_native_histogram))`, - expectedSkippedReason: "no-subquery", - }, - `histogram_stdvar`: { - query: `histogram_stdvar(metric_native_histogram)`, - expectedSkippedReason: "no-subquery", - }, - `histogram_stdvar on sum of metrics`: { - query: `histogram_stdvar(sum(metric_native_histogram))`, - expectedSkippedReason: "no-subquery", - }, - `histogram_stddev`: { - query: `histogram_stddev(metric_native_histogram)`, - expectedSkippedReason: "no-subquery", - }, - `histogram_stddev on sum of metrics`: { - query: `histogram_stddev(sum(metric_native_histogram))`, - expectedSkippedReason: "no-subquery", - }, } + samplesStart := time.Now().Add(-3 * 24 * time.Hour) + samplesEnd := time.Now().Add(30 * time.Minute) + samplesStep := 30 * time.Second + series := make([]storage.Series, 0, numSeries+(numConvHistograms*len(histogramBuckets))+numNativeHistograms) seriesID := 0 @@ -526,28 +146,28 @@ func TestSubquerySpinOff_Correctness(t *testing.T) { gen := factor(float64(i) * 0.1) if i >= numSeries-numStaleSeries { // Wrap the generator to inject the staleness marker between minute 10 and 20. - gen = stale(start.Add(10*time.Minute), start.Add(20*time.Minute), gen) + gen = stale(time.Now().Add(10*time.Minute), time.Now().Add(20*time.Minute), gen) } - series = append(series, newSeries(newTestCounterLabels(seriesID), start.Add(-lookbackDelta), end, step, gen)) + series = append(series, newSeries(newTestCounterLabels(seriesID), samplesStart, samplesEnd, samplesStep, gen)) seriesID++ } // Add a special series whose data points end earlier than the end of the queried time range // and has NO stale marker. series = append(series, newSeries(newTestCounterLabels(seriesID), - start.Add(-lookbackDelta), end.Add(-5*time.Minute), step, factor(2))) + samplesStart, time.Now().Add(-5*time.Minute), samplesStep, factor(2))) seriesID++ // Add a special series whose data points end earlier than the end of the queried time range // and HAS a stale marker at the end. series = append(series, newSeries(newTestCounterLabels(seriesID), - start.Add(-lookbackDelta), end.Add(-5*time.Minute), step, stale(end.Add(-6*time.Minute), end.Add(-4*time.Minute), factor(2)))) + samplesStart, time.Now().Add(-5*time.Minute), samplesStep, stale(time.Now().Add(-6*time.Minute), time.Now().Add(-4*time.Minute), factor(2)))) seriesID++ // Add a special series whose data points start later than the start of the queried time range. series = append(series, newSeries(newTestCounterLabels(seriesID), - start.Add(5*time.Minute), end, step, factor(2))) + time.Now().Add(5*time.Minute), samplesEnd, samplesStep, factor(2))) seriesID++ // Add conventional histogram series. @@ -557,11 +177,11 @@ func TestSubquerySpinOff_Correctness(t *testing.T) { gen := factor(float64(i) * float64(bucketIdx) * 0.1) if i >= numConvHistograms-numStaleConvHistograms { // Wrap the generator to inject the staleness marker between minute 10 and 20. - gen = stale(start.Add(10*time.Minute), start.Add(20*time.Minute), gen) + gen = stale(time.Now().Add(10*time.Minute), time.Now().Add(20*time.Minute), gen) } series = append(series, newSeries(newTestConventionalHistogramLabels(seriesID, bucketLe), - start.Add(-lookbackDelta), end, step, gen)) + samplesStart, samplesEnd, samplesStep, gen)) } // Increase the series ID after all per-bucket series have been created. @@ -573,10 +193,10 @@ func TestSubquerySpinOff_Correctness(t *testing.T) { gen := factor(float64(i) * 0.1) if i >= numNativeHistograms-numStaleNativeHistograms { // Wrap the generator to inject the staleness marker between minute 10 and 20. - gen = stale(start.Add(10*time.Minute), start.Add(20*time.Minute), gen) + gen = stale(time.Now().Add(10*time.Minute), time.Now().Add(20*time.Minute), gen) } - series = append(series, newNativeHistogramSeries(newTestNativeHistogramLabels(seriesID), start.Add(-lookbackDelta), end, step, gen)) + series = append(series, newNativeHistogramSeries(newTestNativeHistogramLabels(seriesID), samplesStart, samplesEnd, samplesStep, gen)) seriesID++ } @@ -625,65 +245,69 @@ func TestSubquerySpinOff_Correctness(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - t.Parallel() - - req := &PrometheusInstantQueryRequest{ - path: "/query", - time: util.TimeToMillis(end), - queryExpr: parseQuery(t, testData.query), - } - - // Run the query without subquery spin-off. - expectedRes, err := downstream.Do(context.Background(), req) - require.Nil(t, err) - expectedPrometheusRes := expectedRes.(*PrometheusResponse) - if !testData.expectSpecificOrder { - sort.Sort(byLabels(expectedPrometheusRes.Data.Result)) - } - - // Ensure the query produces some results. - require.NotEmpty(t, expectedPrometheusRes.Data.Result) - requireValidSamples(t, expectedPrometheusRes.Data.Result) - - if testData.expectedSpunOffSubqueries > 0 { - // Remove position information from annotations, to mirror what we expect from the sharded queries below. - removeAllAnnotationPositionInformation(expectedPrometheusRes.Infos) - removeAllAnnotationPositionInformation(expectedPrometheusRes.Warnings) - } - - spinOffQueryHandler, err := newSpinOffQueryHandler(codec, log.NewLogfmtLogger(os.Stdout), httpServer.URL) - require.NoError(t, err) - - reg := prometheus.NewPedanticRegistry() - spinoffMiddleware := newSpinOffSubqueriesMiddleware( - mockLimits{ - instantQueriesWithSubquerySpinOff: []string{".*"}, - }, - log.NewNopLogger(), - engine, - spinOffQueryHandler, - reg, - defaultStepFunc, - ) - - ctx := user.InjectOrgID(context.Background(), "test") - spinoffRes, err := spinoffMiddleware.Wrap(downstream).Do(ctx, req) - require.Nil(t, err) - - // Ensure the two results matches (float precision can slightly differ, there's no guarantee in PromQL engine too - // if you rerun the same query twice). - shardedPrometheusRes := spinoffRes.(*PrometheusResponse) - if !testData.expectSpecificOrder { - sort.Sort(byLabels(shardedPrometheusRes.Data.Result)) - } - approximatelyEquals(t, expectedPrometheusRes, shardedPrometheusRes) - - var noSubqueries int - if testData.expectedSkippedReason == "no-subquery" { - noSubqueries = 1 - } - - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` + for _, queryOffset := range []time.Duration{-10 * time.Minute, -33 * time.Second, 0, 33 * time.Second, 10 * time.Minute, 1 * time.Hour} { + t.Run(fmt.Sprintf("queryOffset=%s", queryOffset), func(t *testing.T) { + t.Parallel() + + req := &PrometheusInstantQueryRequest{ + path: "/query", + time: util.TimeToMillis(time.Now().Add(queryOffset)), + queryExpr: parseQuery(t, testData.query), + } + + // Run the query without subquery spin-off. + expectedRes, err := downstream.Do(context.Background(), req) + require.Nil(t, err) + expectedPrometheusRes := expectedRes.(*PrometheusResponse) + if !testData.expectSpecificOrder { + sort.Sort(byLabels(expectedPrometheusRes.Data.Result)) + } + + // Ensure the query produces some results. + if !testData.expectEmptyResult { + require.NotEmpty(t, expectedPrometheusRes.Data.Result) + requireValidSamples(t, expectedPrometheusRes.Data.Result) + } + + if testData.expectedSpunOffSubqueries > 0 { + // Remove position information from annotations, to mirror what we expect from the sharded queries below. + removeAllAnnotationPositionInformation(expectedPrometheusRes.Infos) + removeAllAnnotationPositionInformation(expectedPrometheusRes.Warnings) + } + + spinOffQueryHandler, err := newSpinOffQueryHandler(codec, log.NewLogfmtLogger(os.Stdout), httpServer.URL+"/prometheus/api/v1/query_range") + require.NoError(t, err) + + reg := prometheus.NewPedanticRegistry() + spinoffMiddleware := newSpinOffSubqueriesMiddleware( + mockLimits{ + instantQueriesWithSubquerySpinOff: []string{".*"}, + }, + log.NewNopLogger(), + engine, + spinOffQueryHandler, + reg, + defaultStepFunc, + ) + + ctx := user.InjectOrgID(context.Background(), "test") + spinoffRes, err := spinoffMiddleware.Wrap(downstream).Do(ctx, req) + require.Nil(t, err) + + // Ensure the two results matches (float precision can slightly differ, there's no guarantee in PromQL engine too + // if you rerun the same query twice). + shardedPrometheusRes := spinoffRes.(*PrometheusResponse) + if !testData.expectSpecificOrder { + sort.Sort(byLabels(shardedPrometheusRes.Data.Result)) + } + approximatelyEquals(t, expectedPrometheusRes, shardedPrometheusRes) + + var noSubqueries int + if testData.expectedSkippedReason == "no-subquery" { + noSubqueries = 1 + } + + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_frontend_spun_off_subqueries_total Total number of subqueries that were spun off. # TYPE cortex_frontend_spun_off_subqueries_total counter cortex_frontend_spun_off_subqueries_total %d @@ -700,10 +324,12 @@ cortex_frontend_subquery_spinoff_skipped_total{reason="too-many-downstream-queri # TYPE cortex_frontend_subquery_spinoff_successes_total counter cortex_frontend_subquery_spinoff_successes_total %d `, testData.expectedSpunOffSubqueries, noSubqueries, testData.expectedSpunOffSubqueries)), - "cortex_frontend_subquery_spinoff_attempts_total", - "cortex_frontend_subquery_spinoff_successes_total", - "cortex_frontend_subquery_spinoff_skipped_total", - "cortex_frontend_spun_off_subqueries_total")) + "cortex_frontend_subquery_spinoff_attempts_total", + "cortex_frontend_subquery_spinoff_successes_total", + "cortex_frontend_subquery_spinoff_skipped_total", + "cortex_frontend_spun_off_subqueries_total")) + }) + } }) }