Skip to content

Commit

Permalink
Subquery Spin-off POC
Browse files Browse the repository at this point in the history
  • Loading branch information
julienduchesne committed Jan 21, 2025
1 parent aa0bf13 commit 42d7dbc
Show file tree
Hide file tree
Showing 20 changed files with 1,628 additions and 51 deletions.
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6576,6 +6576,17 @@
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "spin_off_instant_subqueries",
"required": false,
"desc": "Set to true to enable spinning off subqueries as range queries in instant queries. This makes use of range query optimizations and may improve performance for expensive subqueries.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "query-frontend.spin-off-instant-subqueries",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "query_result_response_format",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2427,6 +2427,8 @@ Usage of ./cmd/mimir/mimir:
Number of concurrent workers forwarding queries to single query-scheduler. (default 5)
-query-frontend.shard-active-series-queries
[experimental] True to enable sharding of active series queries.
-query-frontend.spin-off-instant-subqueries
[experimental] Set to true to enable spinning off subqueries as range queries in instant queries. This makes use of range query optimizations and may improve performance for expensive subqueries.
-query-frontend.split-instant-queries-by-interval duration
[experimental] Split instant queries by an interval and execute in parallel. 0 to disable it.
-query-frontend.split-queries-by-interval duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,12 @@ results_cache:
# CLI flag: -query-frontend.use-active-series-decoder
[use_active_series_decoder: <boolean> | default = false]

# (experimental) Set to true to enable spinning off subqueries as range queries
# in instant queries. This makes use of range query optimizations and may
# improve performance for expensive subqueries.
# CLI flag: -query-frontend.spin-off-instant-subqueries
[spin_off_instant_subqueries: <boolean> | default = false]

# Format to use when retrieving query results from queriers. Supported values:
# json, protobuf
# CLI flag: -query-frontend.query-result-response-format
Expand Down
154 changes: 154 additions & 0 deletions pkg/frontend/querymiddleware/astmapper/subquery_spin_off.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// SPDX-License-Identifier: AGPL-3.0-only

package astmapper

import (
"context"
"time"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
)

const (
SubqueryMetricName = "__subquery_spinoff__"
SubqueryQueryLabelName = "__query__"
SubqueryRangeLabelName = "__range__"
SubqueryStepLabelName = "__step__"
SubqueryOffsetLabelName = "__offset__"

DownstreamQueryMetricName = "__downstream_query__"
DownstreamQueryLabelName = "__query__"
)

type subquerySpinOffMapper struct {
ctx context.Context
defaultStepFunc func(rangeMillis int64) int64

logger log.Logger
stats *SubquerySpinOffMapperStats
}

// NewSubqueryExtractor creates a new instant query mapper.
func NewSubquerySpinOffMapper(ctx context.Context, defaultStepFunc func(rangeMillis int64) int64, logger log.Logger, stats *SubquerySpinOffMapperStats) ASTMapper {
queryMapper := NewASTExprMapper(
&subquerySpinOffMapper{
ctx: ctx,
defaultStepFunc: defaultStepFunc,
logger: logger,
stats: stats,
},
)

return NewMultiMapper(
queryMapper,
)
}

func (m *subquerySpinOffMapper) MapExpr(expr parser.Expr) (mapped parser.Expr, finished bool, err error) {
if err := m.ctx.Err(); err != nil {
return nil, false, err
}

// Immediately clone the expr to avoid mutating the original
expr, err = cloneExpr(expr)
if err != nil {
return nil, false, err
}

downstreamQuery := func(expr parser.Expr) (mapped parser.Expr, finished bool, err error) {
if !usesMetricsData(expr) {
return expr, false, nil
}
selector := &parser.VectorSelector{
Name: DownstreamQueryMetricName,
LabelMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, DownstreamQueryLabelName, expr.String()),
},
}
m.stats.AddDownstreamQuery()
return selector, false, nil
}

switch e := expr.(type) {
case *parser.Call:
if len(e.Args) == 0 {
return expr, false, nil
}
lastArgIdx := len(e.Args) - 1
if sq, ok := e.Args[lastArgIdx].(*parser.SubqueryExpr); ok {
// Filter out subqueries that are just selectors, they aren't optimized enough to be worth spinning off.
if _, ok := sq.Expr.(*parser.VectorSelector); ok {
return downstreamQuery(expr)
}

// Filter out subqueries with offsets, not supported yet
if sq.OriginalOffset > 0 {
return downstreamQuery(expr)
}

// Filter out subqueries with ranges less than 1 hour as they are not worth spinning off.
if sq.Range < 1*time.Hour {
return downstreamQuery(expr)
}

// Evaluate constants within the frontend engine
if !usesMetricsData(sq.Expr) {
return expr, false, nil
}

step := sq.Step
if step == 0 {
if m.defaultStepFunc == nil {
return nil, false, errors.New("defaultStepFunc is not set")
}
step = time.Duration(m.defaultStepFunc(sq.Range.Milliseconds())) * time.Millisecond
}

// Filter out subqueries with less than 10 steps as they are not worth spinning off.
numberOfSteps := int(sq.Range / step)
if numberOfSteps < 10 {
return downstreamQuery(expr)
}

selector := &parser.VectorSelector{
Name: SubqueryMetricName,
LabelMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, SubqueryQueryLabelName, sq.Expr.String()),
labels.MustNewMatcher(labels.MatchEqual, SubqueryRangeLabelName, sq.Range.String()),
labels.MustNewMatcher(labels.MatchEqual, SubqueryStepLabelName, step.String()),
},
}

e.Args[lastArgIdx] = &parser.MatrixSelector{
VectorSelector: selector,
Range: sq.Range,
}
m.stats.AddSpunOffSubquery()
return e, true, nil
}

return downstreamQuery(expr)
default:
return expr, false, nil
}

}

func usesMetricsData(expr parser.Node) bool {
switch expr.(type) {
case *parser.VectorSelector:
return true
case *parser.MatrixSelector:
return true
default:
for _, child := range parser.Children(expr) {
if usesMetricsData(child) {
return true
}
}
return false
}
}
28 changes: 28 additions & 0 deletions pkg/frontend/querymiddleware/astmapper/subquery_spin_off_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// SPDX-License-Identifier: AGPL-3.0-only

package astmapper

type SubquerySpinOffMapperStats struct {
spunOffSubqueries int // counter of subqueries extracted
downstreamQueries int // counter of downstream queries extracted
}

func NewSubquerySpinOffMapperStats() *SubquerySpinOffMapperStats {
return &SubquerySpinOffMapperStats{}
}

func (s *SubquerySpinOffMapperStats) AddSpunOffSubquery() {
s.spunOffSubqueries++
}

func (s *SubquerySpinOffMapperStats) AddDownstreamQuery() {
s.downstreamQueries++
}

func (s *SubquerySpinOffMapperStats) SpunOffSubqueries() int {
return s.spunOffSubqueries
}

func (s *SubquerySpinOffMapperStats) DownstreamQueries() int {
return s.downstreamQueries
}
174 changes: 174 additions & 0 deletions pkg/frontend/querymiddleware/astmapper/subquery_spin_off_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// SPDX-License-Identifier: AGPL-3.0-only

package astmapper

import (
"context"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSubquerySpinOffMapper(t *testing.T) {
for _, tt := range []struct {
name string
in string
out string
expectedSubqueries int
expectedDownstreamQueries int
}{
{
name: "subquery too simple",
in: `avg_over_time(foo[3d:1m])`,
out: `__downstream_query__{__query__="avg_over_time(foo[3d:1m])"}`,
expectedSubqueries: 0,
expectedDownstreamQueries: 1,
},
{
name: "spin off subquery",
in: `avg_over_time((foo * bar)[3d:1m])`,
out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="72h0m0s",__step__="1m0s"}[3d])`,
expectedSubqueries: 1,
expectedDownstreamQueries: 0,
},
{
name: "range too short",
in: `avg_over_time((foo * bar)[30m:1m])`,
out: `__downstream_query__{__query__="avg_over_time((foo * bar)[30m:1m])"}`,
expectedSubqueries: 0,
expectedDownstreamQueries: 1,
},
{
name: "too few steps",
in: `avg_over_time((foo * bar)[3d:1d])`,
out: `__downstream_query__{__query__="avg_over_time((foo * bar)[3d:1d])"}`,
expectedSubqueries: 0,
expectedDownstreamQueries: 1,
},
{
name: "spin off multiple subqueries",
in: `avg_over_time((foo * bar)[3d:1m]) * max_over_time((foo * bar)[2d:2m])`,
out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="72h0m0s",__step__="1m0s"}[3d])
* max_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="48h0m0s",__step__="2m0s"}[2d])`,
expectedSubqueries: 2,
expectedDownstreamQueries: 0,
},
{
name: "downstream query",
in: `avg_over_time((foo * bar)[3d:1m]) * avg_over_time(foo[3d])`,
out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="72h0m0s",__step__="1m0s"}[3d]) * __downstream_query__{__query__="avg_over_time(foo[3d])"}`,
expectedSubqueries: 1,
expectedDownstreamQueries: 1,
},
{
name: "scalars",
in: `avg_over_time((foo * bar)[3d:1m]) * 2`,
out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="72h0m0s",__step__="1m0s"}[3d]) * 2`,
expectedSubqueries: 1,
expectedDownstreamQueries: 0,
},
{
name: "offsets aren't supported",
in: `avg_over_time((foo * bar)[3d:1m] offset 3d) * 2`,
out: `__downstream_query__{__query__="avg_over_time((foo * bar)[3d:1m] offset 3d)"} * 2`,
expectedSubqueries: 0,
expectedDownstreamQueries: 1,
},
{
name: "aggregated query",
in: `sum(avg_over_time((foo * bar)[3d:1m]) * avg_over_time(foo[3d]))`,
out: `sum(avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="72h0m0s",__step__="1m0s"}[3d]) * __downstream_query__{__query__="avg_over_time(foo[3d])"})`,
expectedSubqueries: 1,
expectedDownstreamQueries: 1,
},
{
name: "complex query",
in: `
(
(
(
sum(
count_over_time(
(
(
(
(
sum(
increase(
kafka_event_processed_failure{aws_region="eu-central-1",pods=~".*prd.*",service="my-service"}[1m:]
)
)
)
or
vector(0)
)
/
(
(
sum(
increase(
kafka_event_handled{aws_region="eu-central-1",pods=~".*prd.*",service="my-service"}[1m:]
)
)
)
>
0
)
)
>
0.01
)[3d:]
)
)
)
or
vector(0)
)
)
/
(count_over_time(vector(1)[3d:]))`,
out: ` (
(
(
sum(
count_over_time(
__subquery_spinoff__{__query__="((((sum(increase(kafka_event_processed_failure{aws_region=\"eu-central-1\",pods=~\".*prd.*\",service=\"my-service\"}[1m:]))) or vector(0)) / ((sum(increase(kafka_event_handled{aws_region=\"eu-central-1\",pods=~\".*prd.*\",service=\"my-service\"}[1m:]))) > 0)) > 0.01)",__range__="72h0m0s",__step__="1m0s"}[3d]
)
)
)
or
vector(0)
)
)
/
(count_over_time(vector(1)[3d:]))`,
expectedSubqueries: 1,
expectedDownstreamQueries: 0,
},
} {
tt := tt

t.Run(tt.name, func(t *testing.T) {
stats := NewSubquerySpinOffMapperStats()
mapper := NewSubquerySpinOffMapper(context.Background(), defaultStepFunc, log.NewNopLogger(), stats)
expr, err := parser.ParseExpr(tt.in)
require.NoError(t, err)
out, err := parser.ParseExpr(tt.out)
require.NoError(t, err)

mapped, err := mapper.Map(expr)
require.NoError(t, err)
require.Equal(t, out.String(), mapped.String())
assert.Equal(t, tt.expectedSubqueries, stats.SpunOffSubqueries())
assert.Equal(t, tt.expectedDownstreamQueries, stats.DownstreamQueries())
})
}
}

var defaultStepFunc = func(int64) int64 {
return (1 * time.Minute).Milliseconds()
}
Loading

0 comments on commit 42d7dbc

Please sign in to comment.