Skip to content

Commit

Permalink
Subquery Spin-off POC
Browse files Browse the repository at this point in the history
  • Loading branch information
julienduchesne committed Jan 21, 2025
1 parent aa0bf13 commit 16cbd65
Show file tree
Hide file tree
Showing 20 changed files with 1,592 additions and 51 deletions.
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6576,6 +6576,17 @@
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "spin_off_instant_subqueries",
"required": false,
"desc": "Set to true to enable spinning off subqueries as range queries in instant queries. This makes use of range query optimizations and may improve performance for expensive subqueries.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "query-frontend.spin-off-instant-subqueries",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "query_result_response_format",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2427,6 +2427,8 @@ Usage of ./cmd/mimir/mimir:
Number of concurrent workers forwarding queries to single query-scheduler. (default 5)
-query-frontend.shard-active-series-queries
[experimental] True to enable sharding of active series queries.
-query-frontend.spin-off-instant-subqueries
[experimental] Set to true to enable spinning off subqueries as range queries in instant queries. This makes use of range query optimizations and may improve performance for expensive subqueries.
-query-frontend.split-instant-queries-by-interval duration
[experimental] Split instant queries by an interval and execute in parallel. 0 to disable it.
-query-frontend.split-queries-by-interval duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,12 @@ results_cache:
# CLI flag: -query-frontend.use-active-series-decoder
[use_active_series_decoder: <boolean> | default = false]

# (experimental) Set to true to enable spinning off subqueries as range queries
# in instant queries. This makes use of range query optimizations and may
# improve performance for expensive subqueries.
# CLI flag: -query-frontend.spin-off-instant-subqueries
[spin_off_instant_subqueries: <boolean> | default = false]

# Format to use when retrieving query results from queriers. Supported values:
# json, protobuf
# CLI flag: -query-frontend.query-result-response-format
Expand Down
129 changes: 129 additions & 0 deletions pkg/frontend/querymiddleware/astmapper/subquery_spin_off.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package astmapper

import (
"context"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
)

const (
SubqueryMetricName = "__subquery_spinoff__"
SubqueryQueryLabelName = "__query__"
SubqueryRangeLabelName = "__range__"
SubqueryStepLabelName = "__step__"
SubqueryOffsetLabelName = "__offset__"

DownstreamQueryMetricName = "__downstream_query__"
DownstreamQueryLabelName = "__query__"
)

type subquerySpinOffMapper struct {
ctx context.Context

logger log.Logger
stats *SubquerySpinOffMapperStats
}

// NewSubqueryExtractor creates a new instant query mapper.
func NewSubquerySpinOffMapper(ctx context.Context, logger log.Logger, stats *SubquerySpinOffMapperStats) ASTMapper {
queryMapper := NewASTExprMapper(
&subquerySpinOffMapper{
ctx: ctx,
logger: logger,
stats: stats,
},
)

return NewMultiMapper(
queryMapper,
)
}

func (m *subquerySpinOffMapper) MapExpr(expr parser.Expr) (mapped parser.Expr, finished bool, err error) {
if err := m.ctx.Err(); err != nil {
return nil, false, err
}

// Immediately clone the expr to avoid mutating the original
expr, err = cloneExpr(expr)
if err != nil {
return nil, false, err
}

downstreamQuery := func(expr parser.Expr) (mapped parser.Expr, finished bool, err error) {
if !usesMetricsData(expr) {
return expr, false, nil
}
selector := &parser.VectorSelector{
Name: DownstreamQueryMetricName,
LabelMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, DownstreamQueryLabelName, expr.String()),
},
}
m.stats.AddDownstreamQuery()
return selector, false, nil
}

switch e := expr.(type) {
case *parser.Call:
if len(e.Args) == 0 {
return expr, false, nil
}
lastArgIdx := len(e.Args) - 1
if sq, ok := e.Args[lastArgIdx].(*parser.SubqueryExpr); ok {
// Filter out subqueries that are just selectors, they aren't optimized enough to be worth spinning off.
if _, ok := sq.Expr.(*parser.VectorSelector); ok {
return downstreamQuery(expr)
}

// Filter out subqueries with offsets, not supported yet
if sq.OriginalOffset > 0 {
return downstreamQuery(expr)
}

// Evaluate constants within the frontend engine
if !usesMetricsData(sq.Expr) {
return expr, false, nil
}

selector := &parser.VectorSelector{
Name: SubqueryMetricName,
LabelMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, SubqueryQueryLabelName, sq.Expr.String()),
labels.MustNewMatcher(labels.MatchEqual, SubqueryRangeLabelName, sq.Range.String()),
labels.MustNewMatcher(labels.MatchEqual, SubqueryStepLabelName, sq.Step.String()),
},
}

e.Args[lastArgIdx] = &parser.MatrixSelector{
VectorSelector: selector,
Range: sq.Range,
}
m.stats.AddSpunOffSubquery()
return e, true, nil
}

return downstreamQuery(expr)
default:
return expr, false, nil
}

}

func usesMetricsData(expr parser.Node) bool {
switch expr.(type) {
case *parser.VectorSelector:
return true
case *parser.MatrixSelector:
return true
default:
for _, child := range parser.Children(expr) {
if usesMetricsData(child) {
return true
}
}
return false
}
}
28 changes: 28 additions & 0 deletions pkg/frontend/querymiddleware/astmapper/subquery_spin_off_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// SPDX-License-Identifier: AGPL-3.0-only

package astmapper

type SubquerySpinOffMapperStats struct {
spunOffSubqueries int // counter of subqueries extracted
downstreamQueries int // counter of downstream queries extracted
}

func NewSubquerySpinOffMapperStats() *SubquerySpinOffMapperStats {
return &SubquerySpinOffMapperStats{}
}

func (s *SubquerySpinOffMapperStats) AddSpunOffSubquery() {
s.spunOffSubqueries++
}

func (s *SubquerySpinOffMapperStats) AddDownstreamQuery() {
s.downstreamQueries++
}

func (s *SubquerySpinOffMapperStats) SpunOffSubqueries() int {
return s.spunOffSubqueries
}

func (s *SubquerySpinOffMapperStats) DownstreamQueries() int {
return s.downstreamQueries
}
159 changes: 159 additions & 0 deletions pkg/frontend/querymiddleware/astmapper/subquery_spin_off_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package astmapper

import (
"context"
"testing"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSubquerySpinOffMapper(t *testing.T) {
for _, tt := range []struct {
name string
in string
out string
expectedSubqueries int
expectedDownstreamQueries int
}{
{
name: "subquery too simple",
in: `avg_over_time(foo[5m:1m])`,
out: `__downstream_query__{__query__="avg_over_time(foo[5m:1m])"}`,
expectedSubqueries: 0,
expectedDownstreamQueries: 1,
},
{
name: "spin off subquery",
in: `avg_over_time((foo * bar)[5m:1m])`,
out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="5m0s",__step__="1m0s"}[5m])`,
expectedSubqueries: 1,
expectedDownstreamQueries: 0,
},
{
name: "spin off multiple subqueries",
in: `avg_over_time((foo * bar)[5m:1m]) * max_over_time((foo * bar)[10m:2m])`,
out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="5m0s",__step__="1m0s"}[5m])
* max_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="10m0s",__step__="2m0s"}[10m])`,
expectedSubqueries: 2,
expectedDownstreamQueries: 0,
},
{
name: "downstream query",
in: `avg_over_time((foo * bar)[5m:1m]) * avg_over_time(foo[5m])`,
out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="5m0s",__step__="1m0s"}[5m]) * __downstream_query__{__query__="avg_over_time(foo[5m])"}`,
expectedSubqueries: 1,
expectedDownstreamQueries: 1,
},
{
name: "scalars",
in: `avg_over_time((foo * bar)[5m:1m]) * 2`,
out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="5m0s",__step__="1m0s"}[5m]) * 2`,
expectedSubqueries: 1,
expectedDownstreamQueries: 0,
},
{
name: "offsets aren't supported",
in: `avg_over_time((foo * bar)[5m:1m] offset 5m) * 2`,
out: `__downstream_query__{__query__="avg_over_time((foo * bar)[5m:1m] offset 5m)"} * 2`,
expectedSubqueries: 0,
expectedDownstreamQueries: 1,
},
{
name: "aggregated query",
in: `sum(avg_over_time((foo * bar)[5m:1m]) * avg_over_time(foo[5m]))`,
out: `sum(avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="5m0s",__step__="1m0s"}[5m]) * __downstream_query__{__query__="avg_over_time(foo[5m])"})`,
expectedSubqueries: 1,
expectedDownstreamQueries: 1,
},
{
name: "test",
in: `(((sum(count_over_time(((((sum(count_over_time((kafka_consumergroup_lag > 12738)[1m:]))) or vector(0)) / ((sum(count_over_time(kafka_consumergroup_lag[1m:]))) > 0)) > 0.05)[3d:]))) or vector(0))) / (count_over_time(vector(1)[3d:]))`,
out: `(((sum(count_over_time(__subquery_spinoff__{__query__="((((sum(count_over_time((kafka_consumergroup_lag > 12738)[1m:]))) or vector(0)) / ((sum(count_over_time(kafka_consumergroup_lag[1m:]))) > 0)) > 0.05)",__range__="72h0m0s",__step__="0s"}[3d]))) or vector(0))) / (count_over_time(vector(1)[3d:]))`,
expectedSubqueries: 1,
},
{
name: "complex query",
in: `
(
(
(
sum(
count_over_time(
(
(
(
(
sum(
increase(
kafka_event_processed_failure{aws_region="eu-central-1",pods=~".*prd.*",service="my-service"}[1m:]
)
)
)
or
vector(0)
)
/
(
(
sum(
increase(
kafka_event_handled{aws_region="eu-central-1",pods=~".*prd.*",service="my-service"}[1m:]
)
)
)
>
0
)
)
>
0.01
)[3d:]
)
)
)
or
vector(0)
)
)
/
(count_over_time(vector(1)[3d:]))`,
out: ` (
(
(
sum(
count_over_time(
__subquery_spinoff__{__query__="((((sum(increase(kafka_event_processed_failure{aws_region=\"eu-central-1\",pods=~\".*prd.*\",service=\"my-service\"}[1m:]))) or vector(0)) / ((sum(increase(kafka_event_handled{aws_region=\"eu-central-1\",pods=~\".*prd.*\",service=\"my-service\"}[1m:]))) > 0)) > 0.01)",__range__="72h0m0s",__step__="0s"}[3d]
)
)
)
or
vector(0)
)
)
/
(count_over_time(vector(1)[3d:]))`,
expectedSubqueries: 1,
expectedDownstreamQueries: 0,
},
} {
tt := tt

t.Run(tt.name, func(t *testing.T) {
stats := NewSubquerySpinOffMapperStats()
mapper := NewSubquerySpinOffMapper(context.Background(), log.NewNopLogger(), stats)
expr, err := parser.ParseExpr(tt.in)
require.NoError(t, err)
out, err := parser.ParseExpr(tt.out)
require.NoError(t, err)

mapped, err := mapper.Map(expr)
require.NoError(t, err)
require.Equal(t, out.String(), mapped.String())
assert.Equal(t, tt.expectedSubqueries, stats.SpunOffSubqueries())
assert.Equal(t, tt.expectedDownstreamQueries, stats.DownstreamQueries())
})
}
}
1 change: 1 addition & 0 deletions pkg/frontend/querymiddleware/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ type MetricsQueryRequest interface {
WithEstimatedSeriesCountHint(uint64) (MetricsQueryRequest, error)
// AddSpanTags writes information about this request to an OpenTracing span
AddSpanTags(opentracing.Span)
GetLookbackDelta() time.Duration
}

// LabelsSeriesQueryRequest represents a label names, label values, or series query request that can be process by middlewares.
Expand Down
Loading

0 comments on commit 16cbd65

Please sign in to comment.