From 8d0e0ba565962656f2de0daea3305949d11a7f74 Mon Sep 17 00:00:00 2001 From: Julien Duchesne Date: Wed, 29 Jan 2025 12:13:08 -0500 Subject: [PATCH] Address PR comments: - Use step align code from MQE - Fix up comments and log messages - Return error in case of a wrong --- .../astmapper/subquery_spin_off_test.go | 4 +-- pkg/frontend/querymiddleware/roundtrip.go | 6 ++-- .../querymiddleware/spin_off_subqueries.go | 29 +++++++------------ .../spin_off_subqueries_queryable.go | 29 ++++++++++++------- .../spin_off_subqueries_test.go | 14 ++++----- 5 files changed, 42 insertions(+), 40 deletions(-) diff --git a/pkg/frontend/querymiddleware/astmapper/subquery_spin_off_test.go b/pkg/frontend/querymiddleware/astmapper/subquery_spin_off_test.go index 05afe50d96..def28cd757 100644 --- a/pkg/frontend/querymiddleware/astmapper/subquery_spin_off_test.go +++ b/pkg/frontend/querymiddleware/astmapper/subquery_spin_off_test.go @@ -29,7 +29,7 @@ func TestSubquerySpinOffMapper(t *testing.T) { expectedDownstreamQueries: 1, }, { - name: "spin off subquery", + name: "spin-off subquery", in: `avg_over_time((foo * bar)[3d:1m])`, out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="72h0m0s",__step__="1m0s"}[3d])`, expectedSubqueries: 1, @@ -71,7 +71,7 @@ func TestSubquerySpinOffMapper(t *testing.T) { expectedDownstreamQueries: 1, }, { - name: "spin off multiple subqueries", + name: "spin-off multiple subqueries", in: `avg_over_time((foo * bar)[3d:1m]) * max_over_time((foo * bar)[2d:2m])`, out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="72h0m0s",__step__="1m0s"}[3d]) * max_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="48h0m0s",__step__="2m0s"}[2d])`, diff --git a/pkg/frontend/querymiddleware/roundtrip.go b/pkg/frontend/querymiddleware/roundtrip.go index 81c808a7b2..57d2a8476c 100644 --- a/pkg/frontend/querymiddleware/roundtrip.go +++ b/pkg/frontend/querymiddleware/roundtrip.go @@ -382,11 +382,13 @@ func newQueryMiddlewares( ) if spinOffURL := cfg.SpinOffInstantSubqueriesToURL; spinOffURL != "" { - // Spin off subqueries to a remote URL (or localhost) + // Spin-off subqueries to a remote URL (or localhost) spinOffQueryHandler, err := newSpinOffQueryHandler(codec, log, spinOffURL) if err != nil { - level.Error(log).Log("msg", "failed to create spin off query handler", "error", err) + level.Error(log).Log("msg", "failed to create spin-off query handler", "error", err) } else { + // We're only interested in instant queries for now because their query rate is usually much higher than range queries. + // They are also less optimized than range queries, so we can benefit more from the spin-off. queryInstantMiddleware = append( queryInstantMiddleware, newInstrumentMiddleware("spin_off_subqueries", metrics), diff --git a/pkg/frontend/querymiddleware/spin_off_subqueries.go b/pkg/frontend/querymiddleware/spin_off_subqueries.go index d10f3ed12b..5cf8adbdb6 100644 --- a/pkg/frontend/querymiddleware/spin_off_subqueries.go +++ b/pkg/frontend/querymiddleware/spin_off_subqueries.go @@ -57,7 +57,7 @@ func newSpinOffSubqueriesMetrics(registerer prometheus.Registerer) spinOffSubque m := spinOffSubqueriesMetrics{ spinOffAttempts: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_frontend_subquery_spinoff_attempts_total", - Help: "Total number of queries the query-frontend attempted to spin off subqueries from.", + Help: "Total number of queries the query-frontend attempted to spin-off subqueries from.", }), spinOffSuccesses: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_frontend_subquery_spinoff_successes_total", @@ -65,7 +65,7 @@ func newSpinOffSubqueriesMetrics(registerer prometheus.Registerer) spinOffSubque }), spinOffSkipped: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_frontend_subquery_spinoff_skipped_total", - Help: "Total number of queries the query-frontend skipped or failed to spin off subqueries from.", + Help: "Total number of queries the query-frontend skipped or failed to spin-off subqueries from.", }, []string{"reason"}), spunOffSubqueries: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_frontend_spun_off_subqueries_total", @@ -135,8 +135,7 @@ func (s *spinOffSubqueriesMiddleware) Do(ctx context.Context, req MetricsQueryRe for _, pattern := range patterns { matcher, err := labels.NewFastRegexMatcher(pattern) if err != nil { - level.Error(spanLog).Log("msg", "failed to compile regex pattern", "pattern", pattern, "err", err) - continue + return nil, apierror.New(apierror.TypeBadData, err.Error()) } if matcher.MatchString(req.GetQuery()) { @@ -151,11 +150,11 @@ func (s *spinOffSubqueriesMiddleware) Do(ctx context.Context, req MetricsQueryRe } if !matched { - spanLog.DebugLog("msg", "subquery spin off is disabled for this query") + spanLog.DebugLog("msg", "expression did not match any configured subquery spin-off patterns, so subquery spin-off is disabled for this query") return s.next.Do(ctx, req) } - // Increment total number of instant queries attempted to spin off subqueries from. + // Increment total number of instant queries attempted to spin-off subqueries from. s.metrics.spinOffAttempts.Inc() mapperStats := astmapper.NewSubquerySpinOffMapperStats() @@ -196,7 +195,7 @@ func (s *spinOffSubqueriesMiddleware) Do(ctx context.Context, req MetricsQueryRe return s.next.Do(ctx, req) } - spanLog.DebugLog("msg", "instant query has been rewritten to spin off subqueries", "rewritten", spinOffQuery, "regular_downstream_queries", mapperStats.DownstreamQueries(), "subqueries_spun_off", mapperStats.SpunOffSubqueries()) + spanLog.DebugLog("msg", "instant query has been rewritten to spin-off subqueries", "rewritten", spinOffQuery, "regular_downstream_queries", mapperStats.DownstreamQueries(), "subqueries_spun_off", mapperStats.SpunOffSubqueries()) // Update query stats. queryStats := stats.FromContext(ctx) @@ -259,7 +258,7 @@ func (s *spinOffSubqueriesMiddleware) Do(ctx context.Context, req MetricsQueryRe func newSpinOffQueryHandler(codec Codec, logger log.Logger, sendURL string) (MetricsQueryHandler, error) { rangeQueryURL, err := url.Parse(sendURL) if err != nil { - return nil, fmt.Errorf("invalid spin off URL: %w", err) + return nil, fmt.Errorf("invalid spin-off URL: %w", err) } return &spinOffQueryHandler{ @@ -282,16 +281,10 @@ func (s *spinOffQueryHandler) Do(ctx context.Context, req MetricsQueryRequest) ( return nil, fmt.Errorf("error encoding request: %w", err) } httpReq.RequestURI = "" // Reset RequestURI to force URL to be used in the request. - // Override the URL with the configured range query URL (only parts that are set). - if s.rangeQueryURL.Scheme != "" { - httpReq.URL.Scheme = s.rangeQueryURL.Scheme - } - if s.rangeQueryURL.Host != "" { - httpReq.URL.Host = s.rangeQueryURL.Host - } - if s.rangeQueryURL.Path != "" { - httpReq.URL.Path = s.rangeQueryURL.Path - } + // Override the URL with the configured range query URL. + httpReq.URL.Scheme = s.rangeQueryURL.Scheme + httpReq.URL.Host = s.rangeQueryURL.Host + httpReq.URL.Path = s.rangeQueryURL.Path if err := user.InjectOrgIDIntoHTTPRequest(ctx, httpReq); err != nil { return nil, fmt.Errorf("error injecting org ID into request: %v", err) diff --git a/pkg/frontend/querymiddleware/spin_off_subqueries_queryable.go b/pkg/frontend/querymiddleware/spin_off_subqueries_queryable.go index 183705828b..84ad4d295d 100644 --- a/pkg/frontend/querymiddleware/spin_off_subqueries_queryable.go +++ b/pkg/frontend/querymiddleware/spin_off_subqueries_queryable.go @@ -117,19 +117,26 @@ func (q *spinOffSubqueriesQuerier) Select(ctx context.Context, _ bool, hints *st return storage.ErrSeriesSet(errors.Wrap(err, "failed to parse subquery offset")) } - end := q.req.GetEnd() - queryOffset.Milliseconds() - // Subqueries are aligned on absolute time, while range queries are aligned on relative time. - // To match the same behavior, we can query slightly into the future. - // The extra data points aren't used because the subquery is aggregated with an _over_time-style function. + start := q.req.GetStart() + end := q.req.GetEnd() step := queryStep.Milliseconds() - if end%step != 0 { - end += step - (end % step) + + // The following code only works for instant queries. Supporting subqueries within range queries would + // require lots of changes. It hasnt been tested. + if start != end { + return storage.ErrSeriesSet(errors.New("subqueries spin-off is not supported in range queries")) + } + + // Subqueries are always aligned to absolute time in PromQL, so we need to make the same adjustment here for correctness. + // Find the first timestamp inside the subquery range that is aligned to the step. + // This is taken from MQE: https://github.com/grafana/mimir/blob/266a393379b2c981a83557c5d66e56c97251ffeb/pkg/streamingpromql/query.go#L384-L398 + alignedStart := step * ((start - queryOffset.Milliseconds() - queryRange.Milliseconds()) / step) + if alignedStart < start-queryOffset.Milliseconds()-queryRange.Milliseconds() { + alignedStart += step } - // Calculate the earliest data point we need to query. - start := end - queryRange.Milliseconds() // Split queries into multiple smaller queries if they have more than 11000 datapoints - rangeStart := start + rangeStart := alignedStart var rangeQueries []MetricsQueryRequest for { var rangeEnd int64 @@ -142,7 +149,7 @@ func (q *spinOffSubqueriesQuerier) Select(ctx context.Context, _ bool, hints *st headers = append(headers, &PrometheusHeader{Name: "X-Mimir-Spun-Off-Subquery", Values: []string{"true"}}, &PrometheusHeader{Name: "Content-Type", Values: []string{"application/x-www-form-urlencoded"}}, - ) // Downstream is the querier, which is HTTP req. + ) newRangeRequest := NewPrometheusRangeQueryRequest(queryRangePathSuffix, headers, rangeStart, rangeEnd, step, q.req.GetLookbackDelta(), queryExpr, q.req.GetOptions(), q.req.GetHints()) rangeQueries = append(rangeQueries, newRangeRequest) if rangeEnd == end { @@ -171,7 +178,7 @@ func (q *spinOffSubqueriesQuerier) Select(ctx context.Context, _ bool, hints *st } return newSeriesSetFromEmbeddedQueriesResults(streams, hints) default: - return storage.ErrSeriesSet(errors.Errorf("invalid metric name for the spin off middleware: %s", name)) + return storage.ErrSeriesSet(errors.Errorf("invalid metric name for the spin-off middleware: %s", name)) } } diff --git a/pkg/frontend/querymiddleware/spin_off_subqueries_test.go b/pkg/frontend/querymiddleware/spin_off_subqueries_test.go index fecf3284f6..78643ba814 100644 --- a/pkg/frontend/querymiddleware/spin_off_subqueries_test.go +++ b/pkg/frontend/querymiddleware/spin_off_subqueries_test.go @@ -30,8 +30,9 @@ func TestSubquerySpinOff_Correctness(t *testing.T) { var ( numSeries = 1000 numStaleSeries = 100 - samplesStart = time.Now().Add(-2 * 24 * time.Hour) - samplesEnd = time.Now().Add(30 * time.Minute) + now = time.Now() + samplesStart = now.Add(-2 * 24 * time.Hour) + samplesEnd = now.Add(30 * time.Minute) samplesStep = 30 * time.Second ) @@ -100,11 +101,11 @@ func TestSubquerySpinOff_Correctness(t *testing.T) { expectedSpunOffSubqueries: 1, }, "sum of subquery min with small offset": { - query: `sum by(group_1) (min_over_time((changes(metric_counter[5m]))[3d:2m] offset 20s))`, + query: `sum by(group_1) (min_over_time((changes(metric_counter[5m]))[1d:1m] offset 20s))`, expectedSpunOffSubqueries: 1, }, "sum of subquery min with offset": { - query: `sum by(group_1) (min_over_time((changes(metric_counter[5m]))[3d:2m] offset 1d))`, + query: `sum by(group_1) (min_over_time((changes(metric_counter[5m]))[1d:1m] offset 1d))`, expectedSpunOffSubqueries: 1, }, "triple subquery": { @@ -161,7 +162,6 @@ func TestSubquerySpinOff_Correctness(t *testing.T) { // Add a special series whose data points start later than the start of the queried time range. series = append(series, newSeries(newTestCounterLabels(seriesID), time.Now().Add(5*time.Minute), samplesEnd, samplesStep, factor(2))) - seriesID++ // Create a queryable on the fixtures. queryable := storageSeriesQueryable(series) @@ -274,10 +274,10 @@ func TestSubquerySpinOff_Correctness(t *testing.T) { # HELP cortex_frontend_spun_off_subqueries_total Total number of subqueries that were spun off. # TYPE cortex_frontend_spun_off_subqueries_total counter cortex_frontend_spun_off_subqueries_total %d -# HELP cortex_frontend_subquery_spinoff_attempts_total Total number of queries the query-frontend attempted to spin off subqueries from. +# HELP cortex_frontend_subquery_spinoff_attempts_total Total number of queries the query-frontend attempted to spin-off subqueries from. # TYPE cortex_frontend_subquery_spinoff_attempts_total counter cortex_frontend_subquery_spinoff_attempts_total 1 -# HELP cortex_frontend_subquery_spinoff_skipped_total Total number of queries the query-frontend skipped or failed to spin off subqueries from. +# HELP cortex_frontend_subquery_spinoff_skipped_total Total number of queries the query-frontend skipped or failed to spin-off subqueries from. # TYPE cortex_frontend_subquery_spinoff_skipped_total counter cortex_frontend_subquery_spinoff_skipped_total{reason="mapping-failed"} 0 cortex_frontend_subquery_spinoff_skipped_total{reason="no-subqueries"} %d