Skip to content

Commit

Permalink
Fixes following tests
Browse files Browse the repository at this point in the history
  • Loading branch information
julienduchesne committed Jan 24, 2025
1 parent 1b3db5e commit 11a1801
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 76 deletions.
65 changes: 51 additions & 14 deletions pkg/frontend/querymiddleware/astmapper/subquery_spin_off.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (m *subquerySpinOffMapper) MapExpr(expr parser.Expr) (mapped parser.Expr, f
}

downstreamQuery := func(expr parser.Expr) (mapped parser.Expr, finished bool, err error) {
if !usesMetricsData(expr) {
if countSelectors(expr) == 0 {
return expr, false, nil
}
selector := &parser.VectorSelector{
Expand All @@ -79,11 +79,6 @@ func (m *subquerySpinOffMapper) MapExpr(expr parser.Expr) (mapped parser.Expr, f
}
lastArgIdx := len(e.Args) - 1
if sq, ok := e.Args[lastArgIdx].(*parser.SubqueryExpr); ok {
// Filter out subqueries that are just selectors, they aren't optimized enough to be worth spinning off.
if _, ok := sq.Expr.(*parser.VectorSelector); ok {
return downstreamQuery(expr)
}

// Filter out subqueries with offsets, not supported yet
if sq.OriginalOffset > 0 {
return downstreamQuery(expr)
Expand All @@ -94,11 +89,18 @@ func (m *subquerySpinOffMapper) MapExpr(expr parser.Expr) (mapped parser.Expr, f
return downstreamQuery(expr)
}

selectorsCt := countSelectors(sq.Expr)

// Evaluate constants within the frontend engine
if !usesMetricsData(sq.Expr) {
if selectorsCt == 0 {
return expr, false, nil
}

// Filter out subqueries that are just selectors, they are fast enough that they aren't worth spinning off.
if selectorsCt == 1 && !isComplexExpr(sq.Expr) {
return downstreamQuery(expr)
}

step := sq.Step
if step == 0 {
if m.defaultStepFunc == nil {
Expand Down Expand Up @@ -132,23 +134,58 @@ func (m *subquerySpinOffMapper) MapExpr(expr parser.Expr) (mapped parser.Expr, f

return downstreamQuery(expr)
default:
// If there's no subquery in the children, we can just
if !hasSubqueryInChildren(expr) {
return downstreamQuery(expr)
}
return expr, false, nil
}

}

func usesMetricsData(expr parser.Node) bool {
switch expr.(type) {
case *parser.VectorSelector:
func isComplexExpr(expr parser.Node) bool {
switch e := expr.(type) {
case *parser.SubqueryExpr:
return true
case *parser.MatrixSelector:
case *parser.Call:
for _, arg := range e.Args {
if _, ok := arg.(*parser.MatrixSelector); ok || isComplexExpr(arg) {
return true
}
}
return false
default:
for _, child := range parser.Children(e) {
if isComplexExpr(child) {
return true
}
}
return false
}
}

func hasSubqueryInChildren(expr parser.Node) bool {
switch e := expr.(type) {
case *parser.SubqueryExpr:
return true
default:
for _, child := range parser.Children(expr) {
if usesMetricsData(child) {
for _, child := range parser.Children(e) {
if hasSubqueryInChildren(child) {
return true
}
}
return false
}
}

func countSelectors(expr parser.Node) int {
switch e := expr.(type) {
case *parser.VectorSelector, *parser.MatrixSelector:
return 1
default:
count := 0
for _, child := range parser.Children(e) {
count += countSelectors(child)
}
return count
}
}
169 changes: 110 additions & 59 deletions pkg/frontend/querymiddleware/astmapper/subquery_spin_off_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestSubquerySpinOffMapper(t *testing.T) {
name: "spin off multiple subqueries",
in: `avg_over_time((foo * bar)[3d:1m]) * max_over_time((foo * bar)[2d:2m])`,
out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="72h0m0s",__step__="1m0s"}[3d])
* max_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="48h0m0s",__step__="2m0s"}[2d])`,
* max_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="48h0m0s",__step__="2m0s"}[2d])`,
expectedSubqueries: 2,
expectedDownstreamQueries: 0,
},
Expand Down Expand Up @@ -86,66 +86,117 @@ func TestSubquerySpinOffMapper(t *testing.T) {
expectedDownstreamQueries: 1,
},
{
name: "complex query",
name: "ignore single selector subquery",
in: `sum(avg_over_time((foo > 1)[3d:1m]) * avg_over_time(foo[3d]))`,
out: `sum(__downstream_query__{__query__="avg_over_time((foo > 1)[3d:1m])"} * __downstream_query__{__query__="avg_over_time(foo[3d])"})`,
expectedSubqueries: 0,
expectedDownstreamQueries: 2,
},
{
name: "complex query 1",
in: `
(
(
(
sum(
count_over_time(
(
(
(
(
sum(
increase(
kafka_event_processed_failure{aws_region="eu-central-1",pods=~".*prd.*",service="my-service"}[1m:]
)
)
)
or
vector(0)
)
/
(
(
sum(
increase(
kafka_event_handled{aws_region="eu-central-1",pods=~".*prd.*",service="my-service"}[1m:]
)
)
)
>
0
)
)
>
0.01
)[3d:]
)
)
)
or
vector(0)
)
)
/
(count_over_time(vector(1)[3d:]))`,
(
(
(
sum(
count_over_time(
(
(
(
(
sum(
increase(
kafka_event_processed_failure{aws_region="eu-central-1",pods=~".*prd.*",service="my-service"}[1m:]
)
)
)
or
vector(0)
)
/
(
(
sum(
increase(
kafka_event_handled{aws_region="eu-central-1",pods=~".*prd.*",service="my-service"}[1m:]
)
)
)
>
0
)
)
>
0.01
)[3d:]
)
)
)
or
vector(0)
)
)
/
(count_over_time(vector(1)[3d:]))`,
out: ` (
(
(
sum(
count_over_time(
__subquery_spinoff__{__query__="((((sum(increase(kafka_event_processed_failure{aws_region=\"eu-central-1\",pods=~\".*prd.*\",service=\"my-service\"}[1m:]))) or vector(0)) / ((sum(increase(kafka_event_handled{aws_region=\"eu-central-1\",pods=~\".*prd.*\",service=\"my-service\"}[1m:]))) > 0)) > 0.01)",__range__="72h0m0s",__step__="1m0s"}[3d]
)
)
)
or
vector(0)
)
)
/
(count_over_time(vector(1)[3d:]))`,
(
(
sum(
count_over_time(
__subquery_spinoff__{__query__="((((sum(increase(kafka_event_processed_failure{aws_region=\"eu-central-1\",pods=~\".*prd.*\",service=\"my-service\"}[1m:]))) or vector(0)) / ((sum(increase(kafka_event_handled{aws_region=\"eu-central-1\",pods=~\".*prd.*\",service=\"my-service\"}[1m:]))) > 0)) > 0.01)",__range__="72h0m0s",__step__="1m0s"}[3d]
)
)
)
or
vector(0)
)
)
/
(count_over_time(vector(1)[3d:]))`,
expectedSubqueries: 1,
expectedDownstreamQueries: 0,
},
{
name: "complex query 2",
in: `
1 - grafana_slo_sli_6h{grafana_slo_uuid="ktr6jo1nptzickyko7k98"} > 1 * 0.0050000000000000044
and
1 - grafana_slo_sli_3d{grafana_slo_uuid="ktr6jo1nptzickyko7k98"} > 1 * 0.0050000000000000044
and
300
*
(
sum_over_time((increase(grafana_slo_total_rate_5m{grafana_slo_uuid="ktr6jo1nptzickyko7k98"}[5m]) < 1e+308)[3d:5m])
-
sum_over_time(
(increase(grafana_slo_success_rate_5m{grafana_slo_uuid="ktr6jo1nptzickyko7k98"}[5m]) < 1e+308)[3d:5m]
)
)
>
25`,
out: `
__downstream_query__{__query__="1 - grafana_slo_sli_6h{grafana_slo_uuid=\"ktr6jo1nptzickyko7k98\"} > 1 * 0.0050000000000000044 and 1 - grafana_slo_sli_3d{grafana_slo_uuid=\"ktr6jo1nptzickyko7k98\"} > 1 * 0.0050000000000000044"}
and
300
*
(
sum_over_time(
__subquery_spinoff__{__query__="(increase(grafana_slo_total_rate_5m{grafana_slo_uuid=\"ktr6jo1nptzickyko7k98\"}[5m]) < 1e+308)",__range__="72h0m0s",__step__="5m0s"}[3d]
)
-
sum_over_time(
__subquery_spinoff__{__query__="(increase(grafana_slo_success_rate_5m{grafana_slo_uuid=\"ktr6jo1nptzickyko7k98\"}[5m]) < 1e+308)",__range__="72h0m0s",__step__="5m0s"}[3d]
)
)
>
25`,
expectedSubqueries: 2,
expectedDownstreamQueries: 1,
},
{
name: "complex query 3",
in: `max_over_time( deriv( rate(metric_counter[10m])[5m:1m] )[3d:] )`,
out: `max_over_time(__subquery_spinoff__{__query__="deriv(rate(metric_counter[10m])[5m:1m])",__range__="72h0m0s",__step__="1m0s"}[3d])`,
expectedSubqueries: 1,
expectedDownstreamQueries: 0,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func newQueryMiddlewares(
// Spin off subqueries to a remote URL (or localhost)
spinOffQueryHandler, err := newSpinOffQueryHandler(codec, log, spinOffURL)
if err != nil {
level.Error(log).Log("failed to create spin off query handler", "error", err)
level.Error(log).Log("msg", "failed to create spin off query handler", "error", err)
} else {
queryInstantMiddleware = append(
queryInstantMiddleware,
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/spin_off_subqueries.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/dskit/user"
apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/frontend/querymiddleware/astmapper"
"github.com/grafana/mimir/pkg/querier/stats"
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/spin_off_subqueries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/user"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/dskit/user"
"github.com/grafana/mimir/pkg/util"
)

Expand Down

0 comments on commit 11a1801

Please sign in to comment.