Skip to content

Commit

Permalink
Address PR comments + fixes
Browse files Browse the repository at this point in the history
- Support offsets
- Disable `@`
- Improve tests. Run each query with a different offset each time
- Add new test cases with `offset x`
- Add new test case with a long range (more than 11000 steps). It has to be split into multiple range queries
- Allow setting query path in frontend arg (instead of hardcoding `/prometheus/api/v1/query_range`)
  • Loading branch information
julienduchesne committed Jan 28, 2025
1 parent 3849ab9 commit bedc55e
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 521 deletions.
9 changes: 7 additions & 2 deletions pkg/frontend/querymiddleware/astmapper/subquery_spin_off.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (m *subquerySpinOffMapper) MapExpr(expr parser.Expr) (mapped parser.Expr, f
// The last argument will typically contain the subquery in an aggregation function
// Examples: last_over_time(<subquery>[5m:]) or quantile_over_time(0.5, <subquery>[5m:])
if sq, ok := e.Args[lastArgIdx].(*parser.SubqueryExpr); ok {
// Filter out subqueries with offsets, not supported yet
if sq.OriginalOffset > 0 {
// @ is not supported
if sq.StartOrEnd != 0 || sq.Timestamp != nil {
return downstreamQuery(expr)
}

Expand Down Expand Up @@ -135,6 +135,11 @@ func (m *subquerySpinOffMapper) MapExpr(expr parser.Expr) (mapped parser.Expr, f
},
}

if sq.OriginalOffset != 0 {
selector.LabelMatchers = append(selector.LabelMatchers, labels.MustNewMatcher(labels.MatchEqual, SubqueryOffsetLabelName, sq.OriginalOffset.String()))
selector.OriginalOffset = sq.OriginalOffset
}

e.Args[lastArgIdx] = &parser.MatrixSelector{
VectorSelector: selector,
Range: sq.Range,
Expand Down
29 changes: 25 additions & 4 deletions pkg/frontend/querymiddleware/astmapper/subquery_spin_off_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,27 @@ func TestSubquerySpinOffMapper(t *testing.T) {
expectedSubqueries: 1,
expectedDownstreamQueries: 0,
},
{
name: "@ end ignored",
in: `avg_over_time((foo * bar)[3d:1m] @ end())`,
out: `__downstream_query__{__query__="avg_over_time((foo * bar)[3d:1m] @ end())"}`,
expectedSubqueries: 0,
expectedDownstreamQueries: 1,
},
{
name: "@ start ignored",
in: `avg_over_time((foo * bar)[3d:1m] @ start())`,
out: `__downstream_query__{__query__="avg_over_time((foo * bar)[3d:1m] @ start())"}`,
expectedSubqueries: 0,
expectedDownstreamQueries: 1,
},
{
name: "@ <timestamp> ignored",
in: `avg_over_time((foo * bar)[3d:1m] @ 1738086610)`,
out: `__downstream_query__{__query__="avg_over_time((foo * bar)[3d:1m] @ 1738086610.000)"}`,
expectedSubqueries: 0,
expectedDownstreamQueries: 1,
},
{
name: "range too short",
in: `avg_over_time((foo * bar)[30m:1m])`,
Expand Down Expand Up @@ -72,11 +93,11 @@ func TestSubquerySpinOffMapper(t *testing.T) {
expectedDownstreamQueries: 0,
},
{
name: "offsets aren't supported",
name: "with an offset",
in: `avg_over_time((foo * bar)[3d:1m] offset 3d) * 2`,
out: `__downstream_query__{__query__="avg_over_time((foo * bar)[3d:1m] offset 3d)"} * 2`,
expectedSubqueries: 0,
expectedDownstreamQueries: 1,
out: `avg_over_time(__subquery_spinoff__{__offset__="72h0m0s",__query__="(foo * bar)",__range__="72h0m0s",__step__="1m0s"}[3d] offset 3d) * 2`,
expectedSubqueries: 1,
expectedDownstreamQueries: 0,
},
{
name: "aggregated query",
Expand Down
4 changes: 3 additions & 1 deletion pkg/frontend/querymiddleware/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ var (
prometheusCodecPropagateHeadersLabels = []string{api.ReadConsistencyOffsetsHeader}
)

const maxResolutionPoints = 11000

const (
// statusSuccess Prometheus success result.
statusSuccess = "success"
Expand Down Expand Up @@ -532,7 +534,7 @@ func DecodeRangeQueryTimeParams(reqValues *url.Values) (start, end, step int64,

// For safety, limit the number of returned points per timeseries.
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
if (end-start)/step > 11000 {
if (end-start)/step > maxResolutionPoints {
return 0, 0, 0, errStepTooSmall
}

Expand Down
25 changes: 17 additions & 8 deletions pkg/frontend/querymiddleware/spin_off_subqueries.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,23 +257,23 @@ func (s *spinOffSubqueriesMiddleware) Do(ctx context.Context, req MetricsQueryRe
}

func newSpinOffQueryHandler(codec Codec, logger log.Logger, sendURL string) (MetricsQueryHandler, error) {
baseURL, err := url.Parse(sendURL)
rangeQueryURL, err := url.Parse(sendURL)
if err != nil {
return nil, fmt.Errorf("invalid spin off URL: %w", err)
}

return &spinOffQueryHandler{
codec: codec,
logger: logger,
baseURL: baseURL,
codec: codec,
logger: logger,
rangeQueryURL: rangeQueryURL,
}, nil
}

// spinOffQueryHandler is a query handler that takes a request and sends it to a remote endpoint.
type spinOffQueryHandler struct {
codec Codec
logger log.Logger
baseURL *url.URL
codec Codec
logger log.Logger
rangeQueryURL *url.URL
}

func (s *spinOffQueryHandler) Do(ctx context.Context, req MetricsQueryRequest) (Response, error) {
Expand All @@ -282,7 +282,16 @@ func (s *spinOffQueryHandler) Do(ctx context.Context, req MetricsQueryRequest) (
return nil, fmt.Errorf("error encoding request: %w", err)
}
httpReq.RequestURI = "" // Reset RequestURI to force URL to be used in the request.
httpReq.URL = s.baseURL.ResolveReference(httpReq.URL)
// Override the URL with the configured range query URL (only parts that are set).
if s.rangeQueryURL.Scheme != "" {
httpReq.URL.Scheme = s.rangeQueryURL.Scheme
}
if s.rangeQueryURL.Host != "" {
httpReq.URL.Host = s.rangeQueryURL.Host
}
if s.rangeQueryURL.Path != "" {
httpReq.URL.Path = s.rangeQueryURL.Path
}

if err := user.InjectOrgIDIntoHTTPRequest(ctx, httpReq); err != nil {
return nil, fmt.Errorf("error injecting org ID into request: %v", err)
Expand Down
46 changes: 24 additions & 22 deletions pkg/frontend/querymiddleware/spin_off_subqueries_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"time"

"github.com/grafana/dskit/concurrency"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
Expand Down Expand Up @@ -93,6 +92,7 @@ func (q *spinOffSubqueriesQuerier) Select(ctx context.Context, _ bool, hints *st
expr := values[astmapper.SubqueryQueryLabelName]
rangeStr := values[astmapper.SubqueryRangeLabelName]
stepStr := values[astmapper.SubqueryStepLabelName]
offsetStr := values[astmapper.SubqueryOffsetLabelName]
if expr == "" || rangeStr == "" || stepStr == "" {
return storage.ErrSeriesSet(errors.New("missing required labels for subquery"))
}
Expand All @@ -110,62 +110,64 @@ func (q *spinOffSubqueriesQuerier) Select(ctx context.Context, _ bool, hints *st
if err != nil {
return storage.ErrSeriesSet(errors.Wrap(err, "failed to parse subquery step"))
}
var queryOffset time.Duration
if offsetStr == "" {
queryOffset = 0
} else if queryOffset, err = time.ParseDuration(offsetStr); err != nil {
return storage.ErrSeriesSet(errors.Wrap(err, "failed to parse subquery offset"))
}

end := q.req.GetEnd()
// Align query to absolute time by querying slightly into the future.
end := q.req.GetEnd() - queryOffset.Milliseconds()
// Subqueries are aligned on absolute time, while range queries are aligned on relative time.
// To match the same behavior, we can query slightly into the future.
// The extra data points aren't used because the subquery is aggregated with an _over_time-style function.
step := queryStep.Milliseconds()
if end%step != 0 {
end += step - (end % step)
}
reqRange := q.req.GetEnd() - q.req.GetStart()
// Calculate the earliest data point we need to query.
start := end - reqRange - queryRange.Milliseconds()
// Split queries into multiple smaller queries if they have more than 10000 datapoints
start := end - queryRange.Milliseconds()

// Split queries into multiple smaller queries if they have more than 11000 datapoints
rangeStart := start
var rangeQueries []MetricsQueryRequest
for {
var rangeEnd int64
if remainingPoints := (end - start) / step; remainingPoints > 10000 {
rangeEnd = start + 10000*step
if remainingPoints := (end - rangeStart) / step; remainingPoints > maxResolutionPoints {
rangeEnd = rangeStart + maxResolutionPoints*step
} else {
rangeEnd = end
}
headers := q.req.GetHeaders()
headers = append(headers,
&PrometheusHeader{Name: "Content-Type", Values: []string{"application/json"}},
&PrometheusHeader{Name: "X-Mimir-Spun-Off-Subquery", Values: []string{"true"}},
&PrometheusHeader{Name: "Content-Type", Values: []string{"application/x-www-form-urlencoded"}},
) // Downstream is the querier, which is HTTP req.
newRangeRequest := NewPrometheusRangeQueryRequest("/prometheus"+queryRangePathSuffix, headers, rangeStart, rangeEnd, step, q.req.GetLookbackDelta(), queryExpr, q.req.GetOptions(), q.req.GetHints())
newRangeRequest := NewPrometheusRangeQueryRequest(queryRangePathSuffix, headers, rangeStart, rangeEnd, step, q.req.GetLookbackDelta(), queryExpr, q.req.GetOptions(), q.req.GetHints())
rangeQueries = append(rangeQueries, newRangeRequest)
if rangeEnd == end {
break
}
rangeStart = rangeEnd // Move the start to the end of the previous range.
}

// Concurrently run each query. It breaks and cancels each worker context on first error.
streams := make([][]SampleStream, len(rangeQueries))
err = concurrency.ForEachJob(ctx, len(rangeQueries), len(rangeQueries), func(ctx context.Context, idx int) error {
req := rangeQueries[idx]

for idx, req := range rangeQueries {
resp, err := q.upstreamRangeHandler.Do(ctx, req)
if err != nil {
return err
return storage.ErrSeriesSet(err)
}
promRes, ok := resp.(*PrometheusResponse)
if !ok {
return errors.Errorf("error invalid response type: %T, expected: %T", resp, &PrometheusResponse{})
return storage.ErrSeriesSet(errors.Errorf("error invalid response type: %T, expected: %T", resp, &PrometheusResponse{}))
}
resStreams, err := ResponseToSamples(promRes)
if err != nil {
return err
return storage.ErrSeriesSet(err)
}
streams[idx] = resStreams // No mutex is needed since each job writes its own index. This is like writing separate variables.
streams[idx] = resStreams
q.annotationAccumulator.addInfos(promRes.Infos)
q.annotationAccumulator.addWarnings(promRes.Warnings)
return nil
})
if err != nil {
return storage.ErrSeriesSet(err)
}
return newSeriesSetFromEmbeddedQueriesResults(streams, hints)
default:
Expand Down
Loading

0 comments on commit bedc55e

Please sign in to comment.