Skip to content

Commit

Permalink
Subquery Spin-off POC
Browse files Browse the repository at this point in the history
  • Loading branch information
julienduchesne committed Jan 21, 2025
1 parent aa0bf13 commit fc11f73
Show file tree
Hide file tree
Showing 17 changed files with 1,573 additions and 51 deletions.
129 changes: 129 additions & 0 deletions pkg/frontend/querymiddleware/astmapper/subquery_spin_off.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package astmapper

import (
"context"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
)

const (
SubqueryMetricName = "__subquery_spinoff__"
SubqueryQueryLabelName = "__query__"
SubqueryRangeLabelName = "__range__"
SubqueryStepLabelName = "__step__"
SubqueryOffsetLabelName = "__offset__"

DownstreamQueryMetricName = "__downstream_query__"
DownstreamQueryLabelName = "__query__"
)

type subquerySpinOffMapper struct {
ctx context.Context

logger log.Logger
stats *SubquerySpinOffMapperStats
}

// NewSubqueryExtractor creates a new instant query mapper.
func NewSubquerySpinOffMapper(ctx context.Context, logger log.Logger, stats *SubquerySpinOffMapperStats) ASTMapper {
queryMapper := NewASTExprMapper(
&subquerySpinOffMapper{
ctx: ctx,
logger: logger,
stats: stats,
},
)

return NewMultiMapper(
queryMapper,
)
}

func (m *subquerySpinOffMapper) MapExpr(expr parser.Expr) (mapped parser.Expr, finished bool, err error) {
if err := m.ctx.Err(); err != nil {
return nil, false, err
}

// Immediately clone the expr to avoid mutating the original
expr, err = cloneExpr(expr)
if err != nil {
return nil, false, err
}

downstreamQuery := func(expr parser.Expr) (mapped parser.Expr, finished bool, err error) {
if !usesMetricsData(expr) {
return expr, false, nil
}
selector := &parser.VectorSelector{
Name: DownstreamQueryMetricName,
LabelMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, DownstreamQueryLabelName, expr.String()),
},
}
m.stats.AddDownstreamQuery()
return selector, false, nil
}

switch e := expr.(type) {
case *parser.Call:
if len(e.Args) == 0 {
return expr, false, nil
}
lastArgIdx := len(e.Args) - 1
if sq, ok := e.Args[lastArgIdx].(*parser.SubqueryExpr); ok {
// Filter out subqueries that are just selectors, they aren't optimized enough to be worth spinning off.
if _, ok := sq.Expr.(*parser.VectorSelector); ok {
return downstreamQuery(expr)
}

// Filter out subqueries with offsets, not supported yet
if sq.OriginalOffset > 0 {
return downstreamQuery(expr)
}

// Evaluate constants within the frontend engine
if !usesMetricsData(sq.Expr) {
return expr, false, nil
}

selector := &parser.VectorSelector{
Name: SubqueryMetricName,
LabelMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, SubqueryQueryLabelName, sq.Expr.String()),
labels.MustNewMatcher(labels.MatchEqual, SubqueryRangeLabelName, sq.Range.String()),
labels.MustNewMatcher(labels.MatchEqual, SubqueryStepLabelName, sq.Step.String()),
},
}

e.Args[lastArgIdx] = &parser.MatrixSelector{
VectorSelector: selector,
Range: sq.Range,
}
m.stats.AddSpunOffSubquery()
return e, true, nil
}

return downstreamQuery(expr)
default:
return expr, false, nil
}

}

func usesMetricsData(expr parser.Node) bool {
switch expr.(type) {
case *parser.VectorSelector:
return true
case *parser.MatrixSelector:
return true
default:
for _, child := range parser.Children(expr) {
if usesMetricsData(child) {
return true
}
}
return false
}
}
28 changes: 28 additions & 0 deletions pkg/frontend/querymiddleware/astmapper/subquery_spin_off_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// SPDX-License-Identifier: AGPL-3.0-only

package astmapper

type SubquerySpinOffMapperStats struct {
spunOffSubqueries int // counter of subqueries extracted
downstreamQueries int // counter of downstream queries extracted
}

func NewSubquerySpinOffMapperStats() *SubquerySpinOffMapperStats {
return &SubquerySpinOffMapperStats{}
}

func (s *SubquerySpinOffMapperStats) AddSpunOffSubquery() {
s.spunOffSubqueries++
}

func (s *SubquerySpinOffMapperStats) AddDownstreamQuery() {
s.downstreamQueries++
}

func (s *SubquerySpinOffMapperStats) SpunOffSubqueries() int {
return s.spunOffSubqueries
}

func (s *SubquerySpinOffMapperStats) DownstreamQueries() int {
return s.downstreamQueries
}
159 changes: 159 additions & 0 deletions pkg/frontend/querymiddleware/astmapper/subquery_spin_off_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package astmapper

import (
"context"
"testing"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSubquerySpinOffMapper(t *testing.T) {
for _, tt := range []struct {
name string
in string
out string
expectedSubqueries int
expectedDownstreamQueries int
}{
{
name: "subquery too simple",
in: `avg_over_time(foo[5m:1m])`,
out: `__downstream_query__{__query__="avg_over_time(foo[5m:1m])"}`,
expectedSubqueries: 0,
expectedDownstreamQueries: 1,
},
{
name: "spin off subquery",
in: `avg_over_time((foo * bar)[5m:1m])`,
out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="5m0s",__step__="1m0s"}[5m])`,
expectedSubqueries: 1,
expectedDownstreamQueries: 0,
},
{
name: "spin off multiple subqueries",
in: `avg_over_time((foo * bar)[5m:1m]) * max_over_time((foo * bar)[10m:2m])`,
out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="5m0s",__step__="1m0s"}[5m])
* max_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="10m0s",__step__="2m0s"}[10m])`,
expectedSubqueries: 2,
expectedDownstreamQueries: 0,
},
{
name: "downstream query",
in: `avg_over_time((foo * bar)[5m:1m]) * avg_over_time(foo[5m])`,
out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="5m0s",__step__="1m0s"}[5m]) * __downstream_query__{__query__="avg_over_time(foo[5m])"}`,
expectedSubqueries: 1,
expectedDownstreamQueries: 1,
},
{
name: "scalars",
in: `avg_over_time((foo * bar)[5m:1m]) * 2`,
out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="5m0s",__step__="1m0s"}[5m]) * 2`,
expectedSubqueries: 1,
expectedDownstreamQueries: 0,
},
{
name: "offsets aren't supported",
in: `avg_over_time((foo * bar)[5m:1m] offset 5m) * 2`,
out: `__downstream_query__{__query__="avg_over_time((foo * bar)[5m:1m] offset 5m)"} * 2`,
expectedSubqueries: 0,
expectedDownstreamQueries: 1,
},
{
name: "aggregated query",
in: `sum(avg_over_time((foo * bar)[5m:1m]) * avg_over_time(foo[5m]))`,
out: `sum(avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="5m0s",__step__="1m0s"}[5m]) * __downstream_query__{__query__="avg_over_time(foo[5m])"})`,
expectedSubqueries: 1,
expectedDownstreamQueries: 1,
},
{
name: "test",
in: `(((sum(count_over_time(((((sum(count_over_time((kafka_consumergroup_lag > 12738)[1m:]))) or vector(0)) / ((sum(count_over_time(kafka_consumergroup_lag[1m:]))) > 0)) > 0.05)[3d:]))) or vector(0))) / (count_over_time(vector(1)[3d:]))`,
out: `(((sum(count_over_time(__subquery_spinoff__{__query__="((((sum(count_over_time((kafka_consumergroup_lag > 12738)[1m:]))) or vector(0)) / ((sum(count_over_time(kafka_consumergroup_lag[1m:]))) > 0)) > 0.05)",__range__="72h0m0s",__step__="0s"}[3d]))) or vector(0))) / (count_over_time(vector(1)[3d:]))`,
expectedSubqueries: 1,
},
{
name: "complex query",
in: `
(
(
(
sum(
count_over_time(
(
(
(
(
sum(
increase(
kafka_event_processed_failure{aws_region="eu-central-1",pods=~".*prd.*",service="my-service"}[1m:]
)
)
)
or
vector(0)
)
/
(
(
sum(
increase(
kafka_event_handled{aws_region="eu-central-1",pods=~".*prd.*",service="my-service"}[1m:]
)
)
)
>
0
)
)
>
0.01
)[3d:]
)
)
)
or
vector(0)
)
)
/
(count_over_time(vector(1)[3d:]))`,
out: ` (
(
(
sum(
count_over_time(
__subquery_spinoff__{__query__="((((sum(increase(kafka_event_processed_failure{aws_region=\"eu-central-1\",pods=~\".*prd.*\",service=\"my-service\"}[1m:]))) or vector(0)) / ((sum(increase(kafka_event_handled{aws_region=\"eu-central-1\",pods=~\".*prd.*\",service=\"my-service\"}[1m:]))) > 0)) > 0.01)",__range__="72h0m0s",__step__="0s"}[3d]
)
)
)
or
vector(0)
)
)
/
(count_over_time(vector(1)[3d:]))`,
expectedSubqueries: 1,
expectedDownstreamQueries: 0,
},
} {
tt := tt

t.Run(tt.name, func(t *testing.T) {
stats := NewSubquerySpinOffMapperStats()
mapper := NewSubquerySpinOffMapper(context.Background(), log.NewNopLogger(), stats)
expr, err := parser.ParseExpr(tt.in)
require.NoError(t, err)
out, err := parser.ParseExpr(tt.out)
require.NoError(t, err)

mapped, err := mapper.Map(expr)
require.NoError(t, err)
require.Equal(t, out.String(), mapped.String())
assert.Equal(t, tt.expectedSubqueries, stats.SpunOffSubqueries())
assert.Equal(t, tt.expectedDownstreamQueries, stats.DownstreamQueries())
})
}
}
1 change: 1 addition & 0 deletions pkg/frontend/querymiddleware/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ type MetricsQueryRequest interface {
WithEstimatedSeriesCountHint(uint64) (MetricsQueryRequest, error)
// AddSpanTags writes information about this request to an OpenTracing span
AddSpanTags(opentracing.Span)
GetLookbackDelta() time.Duration
}

// LabelsSeriesQueryRequest represents a label names, label values, or series query request that can be process by middlewares.
Expand Down
35 changes: 19 additions & 16 deletions pkg/frontend/querymiddleware/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ type limitedParallelismRoundTripper struct {
middleware MetricsQueryMiddleware
}

// NewLimitedParallelismRoundTripper creates a new roundtripper that enforces MaxQueryParallelism to the `next` roundtripper across `middlewares`.
func NewLimitedParallelismRoundTripper(next http.RoundTripper, codec Codec, limits Limits, middlewares ...MetricsQueryMiddleware) http.RoundTripper {
// newLimitedParallelismRoundTripper creates a new roundtripper that enforces MaxQueryParallelism to the `next` roundtripper across `middlewares`.
func newLimitedParallelismRoundTripper(next http.RoundTripper, codec Codec, limits Limits, middlewares ...MetricsQueryMiddleware) limitedParallelismRoundTripper {
return limitedParallelismRoundTripper{
downstream: roundTripperHandler{
next: next,
Expand All @@ -217,18 +217,7 @@ func NewLimitedParallelismRoundTripper(next http.RoundTripper, codec Codec, limi
}
}

func (rt limitedParallelismRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
ctx, cancel := context.WithCancelCause(r.Context())
defer cancel(errExecutingParallelQueriesFinished)

request, err := rt.codec.DecodeMetricsQueryRequest(ctx, r)
if err != nil {
return nil, err
}

if span := opentracing.SpanFromContext(ctx); span != nil {
request.AddSpanTags(span)
}
func (rt limitedParallelismRoundTripper) Do(ctx context.Context, r MetricsQueryRequest) (Response, error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, apierror.New(apierror.TypeBadData, err.Error())
Expand All @@ -241,15 +230,29 @@ func (rt limitedParallelismRoundTripper) RoundTrip(r *http.Request) (*http.Respo
// Wraps middlewares with a final handler, which will receive sub-requests in
// parallel from upstream handlers and ensure that no more than MaxQueryParallelism
// sub-requests run in parallel.
response, err := rt.middleware.Wrap(
fullHandler := rt.middleware.Wrap(
HandlerFunc(func(ctx context.Context, r MetricsQueryRequest) (Response, error) {
if err := sem.Acquire(ctx, 1); err != nil {
return nil, fmt.Errorf("could not acquire work: %w", err)
}
defer sem.Release(1)

return rt.downstream.Do(ctx, r)
})).Do(ctx, request)
}))
return fullHandler.Do(ctx, r)
}

func (rt limitedParallelismRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
ctx, cancel := context.WithCancelCause(r.Context())
defer cancel(errExecutingParallelQueriesFinished)
request, err := rt.codec.DecodeMetricsQueryRequest(ctx, r)
if err != nil {
return nil, err
}
if span := opentracing.SpanFromContext(ctx); span != nil {
request.AddSpanTags(span)
}
response, err := rt.Do(ctx, request)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit fc11f73

Please sign in to comment.