Skip to content

Commit

Permalink
Address PR comments:
Browse files Browse the repository at this point in the history
- Use step align code from MQE
- Fix up comments and log messages
- Return error in case of a wrong
  • Loading branch information
julienduchesne committed Jan 29, 2025
1 parent 31d8211 commit 8d0e0ba
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestSubquerySpinOffMapper(t *testing.T) {
expectedDownstreamQueries: 1,
},
{
name: "spin off subquery",
name: "spin-off subquery",
in: `avg_over_time((foo * bar)[3d:1m])`,
out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="72h0m0s",__step__="1m0s"}[3d])`,
expectedSubqueries: 1,
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestSubquerySpinOffMapper(t *testing.T) {
expectedDownstreamQueries: 1,
},
{
name: "spin off multiple subqueries",
name: "spin-off multiple subqueries",
in: `avg_over_time((foo * bar)[3d:1m]) * max_over_time((foo * bar)[2d:2m])`,
out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="72h0m0s",__step__="1m0s"}[3d])
* max_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="48h0m0s",__step__="2m0s"}[2d])`,
Expand Down
6 changes: 4 additions & 2 deletions pkg/frontend/querymiddleware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,13 @@ func newQueryMiddlewares(
)

if spinOffURL := cfg.SpinOffInstantSubqueriesToURL; spinOffURL != "" {
// Spin off subqueries to a remote URL (or localhost)
// Spin-off subqueries to a remote URL (or localhost)
spinOffQueryHandler, err := newSpinOffQueryHandler(codec, log, spinOffURL)
if err != nil {
level.Error(log).Log("msg", "failed to create spin off query handler", "error", err)
level.Error(log).Log("msg", "failed to create spin-off query handler", "error", err)
} else {
// We're only interested in instant queries for now because their query rate is usually much higher than range queries.
// They are also less optimized than range queries, so we can benefit more from the spin-off.
queryInstantMiddleware = append(
queryInstantMiddleware,
newInstrumentMiddleware("spin_off_subqueries", metrics),
Expand Down
29 changes: 11 additions & 18 deletions pkg/frontend/querymiddleware/spin_off_subqueries.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ func newSpinOffSubqueriesMetrics(registerer prometheus.Registerer) spinOffSubque
m := spinOffSubqueriesMetrics{
spinOffAttempts: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_frontend_subquery_spinoff_attempts_total",
Help: "Total number of queries the query-frontend attempted to spin off subqueries from.",
Help: "Total number of queries the query-frontend attempted to spin-off subqueries from.",
}),
spinOffSuccesses: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_frontend_subquery_spinoff_successes_total",
Help: "Total number of queries the query-frontend successfully spun off subqueries from.",
}),
spinOffSkipped: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_frontend_subquery_spinoff_skipped_total",
Help: "Total number of queries the query-frontend skipped or failed to spin off subqueries from.",
Help: "Total number of queries the query-frontend skipped or failed to spin-off subqueries from.",
}, []string{"reason"}),
spunOffSubqueries: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_frontend_spun_off_subqueries_total",
Expand Down Expand Up @@ -135,8 +135,7 @@ func (s *spinOffSubqueriesMiddleware) Do(ctx context.Context, req MetricsQueryRe
for _, pattern := range patterns {
matcher, err := labels.NewFastRegexMatcher(pattern)
if err != nil {
level.Error(spanLog).Log("msg", "failed to compile regex pattern", "pattern", pattern, "err", err)
continue
return nil, apierror.New(apierror.TypeBadData, err.Error())
}

if matcher.MatchString(req.GetQuery()) {
Expand All @@ -151,11 +150,11 @@ func (s *spinOffSubqueriesMiddleware) Do(ctx context.Context, req MetricsQueryRe
}

if !matched {
spanLog.DebugLog("msg", "subquery spin off is disabled for this query")
spanLog.DebugLog("msg", "expression did not match any configured subquery spin-off patterns, so subquery spin-off is disabled for this query")
return s.next.Do(ctx, req)
}

// Increment total number of instant queries attempted to spin off subqueries from.
// Increment total number of instant queries attempted to spin-off subqueries from.
s.metrics.spinOffAttempts.Inc()

mapperStats := astmapper.NewSubquerySpinOffMapperStats()
Expand Down Expand Up @@ -196,7 +195,7 @@ func (s *spinOffSubqueriesMiddleware) Do(ctx context.Context, req MetricsQueryRe
return s.next.Do(ctx, req)
}

spanLog.DebugLog("msg", "instant query has been rewritten to spin off subqueries", "rewritten", spinOffQuery, "regular_downstream_queries", mapperStats.DownstreamQueries(), "subqueries_spun_off", mapperStats.SpunOffSubqueries())
spanLog.DebugLog("msg", "instant query has been rewritten to spin-off subqueries", "rewritten", spinOffQuery, "regular_downstream_queries", mapperStats.DownstreamQueries(), "subqueries_spun_off", mapperStats.SpunOffSubqueries())

// Update query stats.
queryStats := stats.FromContext(ctx)
Expand Down Expand Up @@ -259,7 +258,7 @@ func (s *spinOffSubqueriesMiddleware) Do(ctx context.Context, req MetricsQueryRe
func newSpinOffQueryHandler(codec Codec, logger log.Logger, sendURL string) (MetricsQueryHandler, error) {
rangeQueryURL, err := url.Parse(sendURL)
if err != nil {
return nil, fmt.Errorf("invalid spin off URL: %w", err)
return nil, fmt.Errorf("invalid spin-off URL: %w", err)
}

return &spinOffQueryHandler{
Expand All @@ -282,16 +281,10 @@ func (s *spinOffQueryHandler) Do(ctx context.Context, req MetricsQueryRequest) (
return nil, fmt.Errorf("error encoding request: %w", err)
}
httpReq.RequestURI = "" // Reset RequestURI to force URL to be used in the request.
// Override the URL with the configured range query URL (only parts that are set).
if s.rangeQueryURL.Scheme != "" {
httpReq.URL.Scheme = s.rangeQueryURL.Scheme
}
if s.rangeQueryURL.Host != "" {
httpReq.URL.Host = s.rangeQueryURL.Host
}
if s.rangeQueryURL.Path != "" {
httpReq.URL.Path = s.rangeQueryURL.Path
}
// Override the URL with the configured range query URL.
httpReq.URL.Scheme = s.rangeQueryURL.Scheme
httpReq.URL.Host = s.rangeQueryURL.Host
httpReq.URL.Path = s.rangeQueryURL.Path

if err := user.InjectOrgIDIntoHTTPRequest(ctx, httpReq); err != nil {
return nil, fmt.Errorf("error injecting org ID into request: %v", err)
Expand Down
29 changes: 18 additions & 11 deletions pkg/frontend/querymiddleware/spin_off_subqueries_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,26 @@ func (q *spinOffSubqueriesQuerier) Select(ctx context.Context, _ bool, hints *st
return storage.ErrSeriesSet(errors.Wrap(err, "failed to parse subquery offset"))
}

end := q.req.GetEnd() - queryOffset.Milliseconds()
// Subqueries are aligned on absolute time, while range queries are aligned on relative time.
// To match the same behavior, we can query slightly into the future.
// The extra data points aren't used because the subquery is aggregated with an _over_time-style function.
start := q.req.GetStart()
end := q.req.GetEnd()
step := queryStep.Milliseconds()
if end%step != 0 {
end += step - (end % step)

// The following code only works for instant queries. Supporting subqueries within range queries would
// require lots of changes. It hasnt been tested.
if start != end {
return storage.ErrSeriesSet(errors.New("subqueries spin-off is not supported in range queries"))
}

// Subqueries are always aligned to absolute time in PromQL, so we need to make the same adjustment here for correctness.
// Find the first timestamp inside the subquery range that is aligned to the step.
// This is taken from MQE: https://github.com/grafana/mimir/blob/266a393379b2c981a83557c5d66e56c97251ffeb/pkg/streamingpromql/query.go#L384-L398
alignedStart := step * ((start - queryOffset.Milliseconds() - queryRange.Milliseconds()) / step)
if alignedStart < start-queryOffset.Milliseconds()-queryRange.Milliseconds() {
alignedStart += step
}
// Calculate the earliest data point we need to query.
start := end - queryRange.Milliseconds()

// Split queries into multiple smaller queries if they have more than 11000 datapoints
rangeStart := start
rangeStart := alignedStart
var rangeQueries []MetricsQueryRequest
for {
var rangeEnd int64
Expand All @@ -142,7 +149,7 @@ func (q *spinOffSubqueriesQuerier) Select(ctx context.Context, _ bool, hints *st
headers = append(headers,
&PrometheusHeader{Name: "X-Mimir-Spun-Off-Subquery", Values: []string{"true"}},
&PrometheusHeader{Name: "Content-Type", Values: []string{"application/x-www-form-urlencoded"}},
) // Downstream is the querier, which is HTTP req.
)
newRangeRequest := NewPrometheusRangeQueryRequest(queryRangePathSuffix, headers, rangeStart, rangeEnd, step, q.req.GetLookbackDelta(), queryExpr, q.req.GetOptions(), q.req.GetHints())
rangeQueries = append(rangeQueries, newRangeRequest)
if rangeEnd == end {
Expand Down Expand Up @@ -171,7 +178,7 @@ func (q *spinOffSubqueriesQuerier) Select(ctx context.Context, _ bool, hints *st
}
return newSeriesSetFromEmbeddedQueriesResults(streams, hints)
default:
return storage.ErrSeriesSet(errors.Errorf("invalid metric name for the spin off middleware: %s", name))
return storage.ErrSeriesSet(errors.Errorf("invalid metric name for the spin-off middleware: %s", name))
}
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/frontend/querymiddleware/spin_off_subqueries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ func TestSubquerySpinOff_Correctness(t *testing.T) {
var (
numSeries = 1000
numStaleSeries = 100
samplesStart = time.Now().Add(-2 * 24 * time.Hour)
samplesEnd = time.Now().Add(30 * time.Minute)
now = time.Now()
samplesStart = now.Add(-2 * 24 * time.Hour)
samplesEnd = now.Add(30 * time.Minute)
samplesStep = 30 * time.Second
)

Expand Down Expand Up @@ -100,11 +101,11 @@ func TestSubquerySpinOff_Correctness(t *testing.T) {
expectedSpunOffSubqueries: 1,
},
"sum of subquery min with small offset": {
query: `sum by(group_1) (min_over_time((changes(metric_counter[5m]))[3d:2m] offset 20s))`,
query: `sum by(group_1) (min_over_time((changes(metric_counter[5m]))[1d:1m] offset 20s))`,
expectedSpunOffSubqueries: 1,
},
"sum of subquery min with offset": {
query: `sum by(group_1) (min_over_time((changes(metric_counter[5m]))[3d:2m] offset 1d))`,
query: `sum by(group_1) (min_over_time((changes(metric_counter[5m]))[1d:1m] offset 1d))`,
expectedSpunOffSubqueries: 1,
},
"triple subquery": {
Expand Down Expand Up @@ -161,7 +162,6 @@ func TestSubquerySpinOff_Correctness(t *testing.T) {
// Add a special series whose data points start later than the start of the queried time range.
series = append(series, newSeries(newTestCounterLabels(seriesID),
time.Now().Add(5*time.Minute), samplesEnd, samplesStep, factor(2)))
seriesID++

// Create a queryable on the fixtures.
queryable := storageSeriesQueryable(series)
Expand Down Expand Up @@ -274,10 +274,10 @@ func TestSubquerySpinOff_Correctness(t *testing.T) {
# HELP cortex_frontend_spun_off_subqueries_total Total number of subqueries that were spun off.
# TYPE cortex_frontend_spun_off_subqueries_total counter
cortex_frontend_spun_off_subqueries_total %d
# HELP cortex_frontend_subquery_spinoff_attempts_total Total number of queries the query-frontend attempted to spin off subqueries from.
# HELP cortex_frontend_subquery_spinoff_attempts_total Total number of queries the query-frontend attempted to spin-off subqueries from.
# TYPE cortex_frontend_subquery_spinoff_attempts_total counter
cortex_frontend_subquery_spinoff_attempts_total 1
# HELP cortex_frontend_subquery_spinoff_skipped_total Total number of queries the query-frontend skipped or failed to spin off subqueries from.
# HELP cortex_frontend_subquery_spinoff_skipped_total Total number of queries the query-frontend skipped or failed to spin-off subqueries from.
# TYPE cortex_frontend_subquery_spinoff_skipped_total counter
cortex_frontend_subquery_spinoff_skipped_total{reason="mapping-failed"} 0
cortex_frontend_subquery_spinoff_skipped_total{reason="no-subqueries"} %d
Expand Down

0 comments on commit 8d0e0ba

Please sign in to comment.