diff --git a/pkg/frontend/querymiddleware/astmapper/subquery_spin_off.go b/pkg/frontend/querymiddleware/astmapper/subquery_spin_off.go new file mode 100644 index 00000000000..b2d674bd3e0 --- /dev/null +++ b/pkg/frontend/querymiddleware/astmapper/subquery_spin_off.go @@ -0,0 +1,129 @@ +package astmapper + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" +) + +const ( + SubqueryMetricName = "__subquery_spinoff__" + SubqueryQueryLabelName = "__query__" + SubqueryRangeLabelName = "__range__" + SubqueryStepLabelName = "__step__" + SubqueryOffsetLabelName = "__offset__" + + DownstreamQueryMetricName = "__downstream_query__" + DownstreamQueryLabelName = "__query__" +) + +type subquerySpinOffMapper struct { + ctx context.Context + + logger log.Logger + stats *SubquerySpinOffMapperStats +} + +// NewSubqueryExtractor creates a new instant query mapper. +func NewSubquerySpinOffMapper(ctx context.Context, logger log.Logger, stats *SubquerySpinOffMapperStats) ASTMapper { + queryMapper := NewASTExprMapper( + &subquerySpinOffMapper{ + ctx: ctx, + logger: logger, + stats: stats, + }, + ) + + return NewMultiMapper( + queryMapper, + ) +} + +func (m *subquerySpinOffMapper) MapExpr(expr parser.Expr) (mapped parser.Expr, finished bool, err error) { + if err := m.ctx.Err(); err != nil { + return nil, false, err + } + + // Immediately clone the expr to avoid mutating the original + expr, err = cloneExpr(expr) + if err != nil { + return nil, false, err + } + + downstreamQuery := func(expr parser.Expr) (mapped parser.Expr, finished bool, err error) { + if !usesMetricsData(expr) { + return expr, false, nil + } + selector := &parser.VectorSelector{ + Name: DownstreamQueryMetricName, + LabelMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, DownstreamQueryLabelName, expr.String()), + }, + } + m.stats.AddDownstreamQuery() + return selector, false, nil + } + + switch e := expr.(type) { + case *parser.Call: + if len(e.Args) == 0 { + return expr, false, nil + } + lastArgIdx := len(e.Args) - 1 + if sq, ok := e.Args[lastArgIdx].(*parser.SubqueryExpr); ok { + // Filter out subqueries that are just selectors, they aren't optimized enough to be worth spinning off. + if _, ok := sq.Expr.(*parser.VectorSelector); ok { + return downstreamQuery(expr) + } + + // Filter out subqueries with offsets, not supported yet + if sq.OriginalOffset > 0 { + return downstreamQuery(expr) + } + + // Evaluate constants within the frontend engine + if !usesMetricsData(sq.Expr) { + return expr, false, nil + } + + selector := &parser.VectorSelector{ + Name: SubqueryMetricName, + LabelMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, SubqueryQueryLabelName, sq.Expr.String()), + labels.MustNewMatcher(labels.MatchEqual, SubqueryRangeLabelName, sq.Range.String()), + labels.MustNewMatcher(labels.MatchEqual, SubqueryStepLabelName, sq.Step.String()), + }, + } + + e.Args[lastArgIdx] = &parser.MatrixSelector{ + VectorSelector: selector, + Range: sq.Range, + } + m.stats.AddSpunOffSubquery() + return e, true, nil + } + + return downstreamQuery(expr) + default: + return expr, false, nil + } + +} + +func usesMetricsData(expr parser.Node) bool { + switch expr.(type) { + case *parser.VectorSelector: + return true + case *parser.MatrixSelector: + return true + default: + for _, child := range parser.Children(expr) { + if usesMetricsData(child) { + return true + } + } + return false + } +} diff --git a/pkg/frontend/querymiddleware/astmapper/subquery_spin_off_stats.go b/pkg/frontend/querymiddleware/astmapper/subquery_spin_off_stats.go new file mode 100644 index 00000000000..fdb3f0401e0 --- /dev/null +++ b/pkg/frontend/querymiddleware/astmapper/subquery_spin_off_stats.go @@ -0,0 +1,28 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package astmapper + +type SubquerySpinOffMapperStats struct { + spunOffSubqueries int // counter of subqueries extracted + downstreamQueries int // counter of downstream queries extracted +} + +func NewSubquerySpinOffMapperStats() *SubquerySpinOffMapperStats { + return &SubquerySpinOffMapperStats{} +} + +func (s *SubquerySpinOffMapperStats) AddSpunOffSubquery() { + s.spunOffSubqueries++ +} + +func (s *SubquerySpinOffMapperStats) AddDownstreamQuery() { + s.downstreamQueries++ +} + +func (s *SubquerySpinOffMapperStats) SpunOffSubqueries() int { + return s.spunOffSubqueries +} + +func (s *SubquerySpinOffMapperStats) DownstreamQueries() int { + return s.downstreamQueries +} diff --git a/pkg/frontend/querymiddleware/astmapper/subquery_spin_off_test.go b/pkg/frontend/querymiddleware/astmapper/subquery_spin_off_test.go new file mode 100644 index 00000000000..f59e99a7d6f --- /dev/null +++ b/pkg/frontend/querymiddleware/astmapper/subquery_spin_off_test.go @@ -0,0 +1,159 @@ +package astmapper + +import ( + "context" + "testing" + + "github.com/go-kit/log" + "github.com/prometheus/prometheus/promql/parser" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSubquerySpinOffMapper(t *testing.T) { + for _, tt := range []struct { + name string + in string + out string + expectedSubqueries int + expectedDownstreamQueries int + }{ + { + name: "subquery too simple", + in: `avg_over_time(foo[5m:1m])`, + out: `__downstream_query__{__query__="avg_over_time(foo[5m:1m])"}`, + expectedSubqueries: 0, + expectedDownstreamQueries: 1, + }, + { + name: "spin off subquery", + in: `avg_over_time((foo * bar)[5m:1m])`, + out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="5m0s",__step__="1m0s"}[5m])`, + expectedSubqueries: 1, + expectedDownstreamQueries: 0, + }, + { + name: "spin off multiple subqueries", + in: `avg_over_time((foo * bar)[5m:1m]) * max_over_time((foo * bar)[10m:2m])`, + out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="5m0s",__step__="1m0s"}[5m]) + * max_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="10m0s",__step__="2m0s"}[10m])`, + expectedSubqueries: 2, + expectedDownstreamQueries: 0, + }, + { + name: "downstream query", + in: `avg_over_time((foo * bar)[5m:1m]) * avg_over_time(foo[5m])`, + out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="5m0s",__step__="1m0s"}[5m]) * __downstream_query__{__query__="avg_over_time(foo[5m])"}`, + expectedSubqueries: 1, + expectedDownstreamQueries: 1, + }, + { + name: "scalars", + in: `avg_over_time((foo * bar)[5m:1m]) * 2`, + out: `avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="5m0s",__step__="1m0s"}[5m]) * 2`, + expectedSubqueries: 1, + expectedDownstreamQueries: 0, + }, + { + name: "offsets aren't supported", + in: `avg_over_time((foo * bar)[5m:1m] offset 5m) * 2`, + out: `__downstream_query__{__query__="avg_over_time((foo * bar)[5m:1m] offset 5m)"} * 2`, + expectedSubqueries: 0, + expectedDownstreamQueries: 1, + }, + { + name: "aggregated query", + in: `sum(avg_over_time((foo * bar)[5m:1m]) * avg_over_time(foo[5m]))`, + out: `sum(avg_over_time(__subquery_spinoff__{__query__="(foo * bar)",__range__="5m0s",__step__="1m0s"}[5m]) * __downstream_query__{__query__="avg_over_time(foo[5m])"})`, + expectedSubqueries: 1, + expectedDownstreamQueries: 1, + }, + { + name: "test", + in: `(((sum(count_over_time(((((sum(count_over_time((kafka_consumergroup_lag > 12738)[1m:]))) or vector(0)) / ((sum(count_over_time(kafka_consumergroup_lag[1m:]))) > 0)) > 0.05)[3d:]))) or vector(0))) / (count_over_time(vector(1)[3d:]))`, + out: `(((sum(count_over_time(__subquery_spinoff__{__query__="((((sum(count_over_time((kafka_consumergroup_lag > 12738)[1m:]))) or vector(0)) / ((sum(count_over_time(kafka_consumergroup_lag[1m:]))) > 0)) > 0.05)",__range__="72h0m0s",__step__="0s"}[3d]))) or vector(0))) / (count_over_time(vector(1)[3d:]))`, + expectedSubqueries: 1, + }, + { + name: "complex query", + in: ` + ( + ( + ( + sum( + count_over_time( + ( + ( + ( + ( + sum( + increase( + kafka_event_processed_failure{aws_region="eu-central-1",pods=~".*prd.*",service="my-service"}[1m:] + ) + ) + ) + or + vector(0) + ) + / + ( + ( + sum( + increase( +kafka_event_handled{aws_region="eu-central-1",pods=~".*prd.*",service="my-service"}[1m:] + ) + ) + ) + > + 0 + ) + ) + > + 0.01 + )[3d:] + ) + ) + ) + or + vector(0) + ) + ) +/ + (count_over_time(vector(1)[3d:]))`, + out: ` ( + ( + ( + sum( + count_over_time( + __subquery_spinoff__{__query__="((((sum(increase(kafka_event_processed_failure{aws_region=\"eu-central-1\",pods=~\".*prd.*\",service=\"my-service\"}[1m:]))) or vector(0)) / ((sum(increase(kafka_event_handled{aws_region=\"eu-central-1\",pods=~\".*prd.*\",service=\"my-service\"}[1m:]))) > 0)) > 0.01)",__range__="72h0m0s",__step__="0s"}[3d] + ) + ) + ) + or + vector(0) + ) + ) +/ + (count_over_time(vector(1)[3d:]))`, + expectedSubqueries: 1, + expectedDownstreamQueries: 0, + }, + } { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + stats := NewSubquerySpinOffMapperStats() + mapper := NewSubquerySpinOffMapper(context.Background(), log.NewNopLogger(), stats) + expr, err := parser.ParseExpr(tt.in) + require.NoError(t, err) + out, err := parser.ParseExpr(tt.out) + require.NoError(t, err) + + mapped, err := mapper.Map(expr) + require.NoError(t, err) + require.Equal(t, out.String(), mapped.String()) + assert.Equal(t, tt.expectedSubqueries, stats.SpunOffSubqueries()) + assert.Equal(t, tt.expectedDownstreamQueries, stats.DownstreamQueries()) + }) + } +} diff --git a/pkg/frontend/querymiddleware/codec.go b/pkg/frontend/querymiddleware/codec.go index 5e9dcb78496..05f629abfa8 100644 --- a/pkg/frontend/querymiddleware/codec.go +++ b/pkg/frontend/querymiddleware/codec.go @@ -152,6 +152,7 @@ type MetricsQueryRequest interface { WithEstimatedSeriesCountHint(uint64) (MetricsQueryRequest, error) // AddSpanTags writes information about this request to an OpenTracing span AddSpanTags(opentracing.Span) + GetLookbackDelta() time.Duration } // LabelsSeriesQueryRequest represents a label names, label values, or series query request that can be process by middlewares. diff --git a/pkg/frontend/querymiddleware/limits.go b/pkg/frontend/querymiddleware/limits.go index 7e76fdc32e9..ce131cca35e 100644 --- a/pkg/frontend/querymiddleware/limits.go +++ b/pkg/frontend/querymiddleware/limits.go @@ -204,8 +204,8 @@ type limitedParallelismRoundTripper struct { middleware MetricsQueryMiddleware } -// NewLimitedParallelismRoundTripper creates a new roundtripper that enforces MaxQueryParallelism to the `next` roundtripper across `middlewares`. -func NewLimitedParallelismRoundTripper(next http.RoundTripper, codec Codec, limits Limits, middlewares ...MetricsQueryMiddleware) http.RoundTripper { +// newLimitedParallelismRoundTripper creates a new roundtripper that enforces MaxQueryParallelism to the `next` roundtripper across `middlewares`. +func newLimitedParallelismRoundTripper(next http.RoundTripper, codec Codec, limits Limits, middlewares ...MetricsQueryMiddleware) limitedParallelismRoundTripper { return limitedParallelismRoundTripper{ downstream: roundTripperHandler{ next: next, @@ -217,18 +217,7 @@ func NewLimitedParallelismRoundTripper(next http.RoundTripper, codec Codec, limi } } -func (rt limitedParallelismRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { - ctx, cancel := context.WithCancelCause(r.Context()) - defer cancel(errExecutingParallelQueriesFinished) - - request, err := rt.codec.DecodeMetricsQueryRequest(ctx, r) - if err != nil { - return nil, err - } - - if span := opentracing.SpanFromContext(ctx); span != nil { - request.AddSpanTags(span) - } +func (rt limitedParallelismRoundTripper) Do(ctx context.Context, r MetricsQueryRequest) (Response, error) { tenantIDs, err := tenant.TenantIDs(ctx) if err != nil { return nil, apierror.New(apierror.TypeBadData, err.Error()) @@ -241,7 +230,7 @@ func (rt limitedParallelismRoundTripper) RoundTrip(r *http.Request) (*http.Respo // Wraps middlewares with a final handler, which will receive sub-requests in // parallel from upstream handlers and ensure that no more than MaxQueryParallelism // sub-requests run in parallel. - response, err := rt.middleware.Wrap( + fullHandler := rt.middleware.Wrap( HandlerFunc(func(ctx context.Context, r MetricsQueryRequest) (Response, error) { if err := sem.Acquire(ctx, 1); err != nil { return nil, fmt.Errorf("could not acquire work: %w", err) @@ -249,7 +238,21 @@ func (rt limitedParallelismRoundTripper) RoundTrip(r *http.Request) (*http.Respo defer sem.Release(1) return rt.downstream.Do(ctx, r) - })).Do(ctx, request) + })) + return fullHandler.Do(ctx, r) +} + +func (rt limitedParallelismRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + ctx, cancel := context.WithCancelCause(r.Context()) + defer cancel(errExecutingParallelQueriesFinished) + request, err := rt.codec.DecodeMetricsQueryRequest(ctx, r) + if err != nil { + return nil, err + } + if span := opentracing.SpanFromContext(ctx); span != nil { + request.AddSpanTags(span) + } + response, err := rt.Do(ctx, request) if err != nil { return nil, err } diff --git a/pkg/frontend/querymiddleware/limits_test.go b/pkg/frontend/querymiddleware/limits_test.go index ac2b3615765..c11b50c06ae 100644 --- a/pkg/frontend/querymiddleware/limits_test.go +++ b/pkg/frontend/querymiddleware/limits_test.go @@ -780,7 +780,7 @@ func TestLimitedRoundTripper_MaxQueryParallelism(t *testing.T) { }) require.Nil(t, err) - _, err = NewLimitedParallelismRoundTripper(downstream, codec, mockLimits{maxQueryParallelism: maxQueryParallelism}, + _, err = newLimitedParallelismRoundTripper(downstream, codec, mockLimits{maxQueryParallelism: maxQueryParallelism}, MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler { return HandlerFunc(func(c context.Context, _ MetricsQueryRequest) (Response, error) { var wg sync.WaitGroup @@ -824,7 +824,7 @@ func TestLimitedRoundTripper_MaxQueryParallelismLateScheduling(t *testing.T) { }) require.Nil(t, err) - _, err = NewLimitedParallelismRoundTripper(downstream, codec, mockLimits{maxQueryParallelism: maxQueryParallelism}, + _, err = newLimitedParallelismRoundTripper(downstream, codec, mockLimits{maxQueryParallelism: maxQueryParallelism}, MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler { return HandlerFunc(func(c context.Context, _ MetricsQueryRequest) (Response, error) { // fire up work and we don't wait. @@ -865,7 +865,7 @@ func TestLimitedRoundTripper_OriginalRequestContextCancellation(t *testing.T) { }) require.Nil(t, err) - _, err = NewLimitedParallelismRoundTripper(downstream, codec, mockLimits{maxQueryParallelism: maxQueryParallelism}, + _, err = newLimitedParallelismRoundTripper(downstream, codec, mockLimits{maxQueryParallelism: maxQueryParallelism}, MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler { return HandlerFunc(func(c context.Context, _ MetricsQueryRequest) (Response, error) { var wg sync.WaitGroup @@ -924,7 +924,7 @@ func BenchmarkLimitedParallelismRoundTripper(b *testing.B) { for _, concurrentRequestCount := range []int{1, 10, 100} { for _, subRequestCount := range []int{1, 2, 5, 10, 20, 50, 100} { - tripper := NewLimitedParallelismRoundTripper(downstream, codec, mockLimits{maxQueryParallelism: maxParallelism}, + tripper := newLimitedParallelismRoundTripper(downstream, codec, mockLimits{maxQueryParallelism: maxParallelism}, MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler { return HandlerFunc(func(c context.Context, _ MetricsQueryRequest) (Response, error) { wg := sync.WaitGroup{} diff --git a/pkg/frontend/querymiddleware/model_extra.go b/pkg/frontend/querymiddleware/model_extra.go index 9cf07648338..0f96d759104 100644 --- a/pkg/frontend/querymiddleware/model_extra.go +++ b/pkg/frontend/querymiddleware/model_extra.go @@ -133,6 +133,10 @@ func (r *PrometheusRangeQueryRequest) GetQuery() string { return "" } +func (r *PrometheusRangeQueryRequest) GetLookbackDelta() time.Duration { + return r.lookbackDelta +} + // GetMinT returns the minimum timestamp in milliseconds of data to be queried, // as determined from the start timestamp and any range vector or offset in the query. func (r *PrometheusRangeQueryRequest) GetMinT() int64 { @@ -345,6 +349,10 @@ func (r *PrometheusInstantQueryRequest) GetHints() *Hints { return r.hints } +func (r *PrometheusInstantQueryRequest) GetLookbackDelta() time.Duration { + return r.lookbackDelta +} + func (r *PrometheusInstantQueryRequest) WithID(id int64) (MetricsQueryRequest, error) { newRequest := *r newRequest.headers = cloneHeaders(r.headers) diff --git a/pkg/frontend/querymiddleware/remote_read.go b/pkg/frontend/querymiddleware/remote_read.go index 11070ce18d5..2cda1d40c7f 100644 --- a/pkg/frontend/querymiddleware/remote_read.go +++ b/pkg/frontend/querymiddleware/remote_read.go @@ -11,6 +11,7 @@ import ( "net/http" "net/url" "strconv" + "time" "github.com/golang/snappy" "github.com/opentracing/opentracing-go" @@ -285,6 +286,10 @@ func (r *remoteReadQueryRequest) GetHeaders() []*PrometheusHeader { return nil } +func (r *remoteReadQueryRequest) GetLookbackDelta() time.Duration { + return 0 +} + func (r *remoteReadQueryRequest) WithID(_ int64) (MetricsQueryRequest, error) { return nil, apierror.New(apierror.TypeInternal, "remoteReadQueryRequest.WithID not implemented") } diff --git a/pkg/frontend/querymiddleware/roundtrip.go b/pkg/frontend/querymiddleware/roundtrip.go index 3ac38c4e573..d62cc95a5ce 100644 --- a/pkg/frontend/querymiddleware/roundtrip.go +++ b/pkg/frontend/querymiddleware/roundtrip.go @@ -48,6 +48,8 @@ const ( queryTypeActiveSeries = "active_series" queryTypeActiveNativeHistogramMetrics = "active_native_histogram_metrics" queryTypeOther = "other" + + fullRangeHandlerContextKey contextKey = 10 ) var ( @@ -56,6 +58,7 @@ var ( // Config for query_range middleware chain. type Config struct { + SpinOffInstantSubqueries bool `yaml:"spin_off_instant_subqueries" category:"advanced"` SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval" category:"advanced"` ResultsCacheConfig `yaml:"results_cache"` CacheResults bool `yaml:"cache_results"` @@ -100,6 +103,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.QueryResultResponseFormat, "query-frontend.query-result-response-format", formatProtobuf, fmt.Sprintf("Format to use when retrieving query results from queriers. Supported values: %s", strings.Join(allFormats, ", "))) f.BoolVar(&cfg.ShardActiveSeriesQueries, "query-frontend.shard-active-series-queries", false, "True to enable sharding of active series queries.") f.BoolVar(&cfg.UseActiveSeriesDecoder, "query-frontend.use-active-series-decoder", false, "Set to true to use the zero-allocation response decoder for active series queries.") + f.BoolVar(&cfg.SpinOffInstantSubqueries, "query-frontend.spin-off-instant-subqueries", false, "True to enable spinning off subqueries as range queries in instant queries.") cfg.ResultsCacheConfig.RegisterFlags(f) } @@ -256,15 +260,16 @@ func newQueryTripperware( cacheKeyGenerator = NewDefaultCacheKeyGenerator(codec, cfg.SplitQueriesByInterval) } - queryRangeMiddleware, queryInstantMiddleware, remoteReadMiddleware := newQueryMiddlewares(cfg, log, limits, codec, c, cacheKeyGenerator, cacheExtractor, engine, registerer) + queryRangeMiddleware, queryInstantMiddleware, remoteReadMiddleware := newQueryMiddlewares(cfg, log, limits, codec, c, cacheKeyGenerator, cacheExtractor, engine, engineOpts.NoStepSubqueryIntervalFn, registerer) return func(next http.RoundTripper) http.RoundTripper { // IMPORTANT: roundtrippers are executed in *reverse* order because they are wrappers. // It means that the first roundtrippers defined in this function will be the last to be // executed. - queryrange := NewLimitedParallelismRoundTripper(next, codec, limits, queryRangeMiddleware...) - instant := NewLimitedParallelismRoundTripper(next, codec, limits, queryInstantMiddleware...) + queryrangeInitial := newLimitedParallelismRoundTripper(next, codec, limits, queryRangeMiddleware...) + var queryrange http.RoundTripper = queryrangeInitial + var instant http.RoundTripper = newLimitedParallelismRoundTripper(next, codec, limits, queryInstantMiddleware...) remoteRead := NewRemoteReadRoundTripper(next, remoteReadMiddleware...) // Wrap next for cardinality, labels queries and all other queries. @@ -315,6 +320,12 @@ func newQueryTripperware( case IsRangeQuery(r.URL.Path): return queryrange.RoundTrip(r) case IsInstantQuery(r.URL.Path): + // If spinning off subqueries is enabled, we need to pass the full range handler to the instant query handler. + // Range queries that are spun off will be executed by the full range handler. + if cfg.SpinOffInstantSubqueries { + var fullRangeHandler MetricsQueryHandler = queryrangeInitial + r = r.WithContext(context.WithValue(r.Context(), fullRangeHandlerContextKey, fullRangeHandler)) + } return instant.RoundTrip(r) case IsCardinalityQuery(r.URL.Path): return cardinality.RoundTrip(r) @@ -346,6 +357,7 @@ func newQueryMiddlewares( cacheKeyGenerator CacheKeyGenerator, cacheExtractor Extractor, engine *promql.Engine, + defaultStepFunc func(rangeMillis int64) int64, registerer prometheus.Registerer, ) (queryRangeMiddleware, queryInstantMiddleware, remoteReadMiddleware []MetricsQueryMiddleware) { // Metric used to keep track of each middleware execution duration. @@ -372,6 +384,14 @@ func newQueryMiddlewares( newStepAlignMiddleware(limits, log, registerer), ) + if cfg.SpinOffInstantSubqueries { + queryInstantMiddleware = append( + queryInstantMiddleware, + newInstrumentMiddleware("spin_off_subqueries", metrics), + newSpinOffSubqueriesMiddleware(limits, log, engine, registerer, defaultStepFunc), + ) + } + if cfg.CacheResults && cfg.CacheErrors { queryRangeMiddleware = append( queryRangeMiddleware, diff --git a/pkg/frontend/querymiddleware/roundtrip_test.go b/pkg/frontend/querymiddleware/roundtrip_test.go index 32f9260ab89..062beb25e04 100644 --- a/pkg/frontend/querymiddleware/roundtrip_test.go +++ b/pkg/frontend/querymiddleware/roundtrip_test.go @@ -525,6 +525,7 @@ func TestMiddlewaresConsistency(t *testing.T) { nil, nil, promql.NewEngine(promql.EngineOpts{}), + defaultStepFunc, nil, ) diff --git a/pkg/frontend/querymiddleware/spin_off_subqueries.go b/pkg/frontend/querymiddleware/spin_off_subqueries.go new file mode 100644 index 00000000000..153edc8d1fd --- /dev/null +++ b/pkg/frontend/querymiddleware/spin_off_subqueries.go @@ -0,0 +1,219 @@ +package querymiddleware + +import ( + "context" + "errors" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + + apierror "github.com/grafana/mimir/pkg/api/error" + "github.com/grafana/mimir/pkg/frontend/querymiddleware/astmapper" + "github.com/grafana/mimir/pkg/querier/stats" + "github.com/grafana/mimir/pkg/storage/lazyquery" + "github.com/grafana/mimir/pkg/util/spanlogger" +) + +const ( + subquerySpinoffSkippedReasonParsingFailed = "parsing-failed" + subquerySpinoffSkippedReasonMappingFailed = "mapping-failed" + subquerySpinoffSkippedReasonNoSubqueries = "no-subqueries" + subquerySpinoffSkippedReasonDownstreamQueries = "too-many-downstream-queries" +) + +type spinOffSubqueriesMiddleware struct { + next MetricsQueryHandler + limits Limits + logger log.Logger + + engine *promql.Engine + defaultStepFunc func(int64) int64 + + metrics spinOffSubqueriesMetrics +} + +type spinOffSubqueriesMetrics struct { + spinOffAttempts prometheus.Counter + spinOffSuccesses prometheus.Counter + spinOffSkipped *prometheus.CounterVec + spunOffSubqueries prometheus.Counter + spunOffSubqueriesPerQuery prometheus.Histogram +} + +func newSpinOffSubqueriesMetrics(registerer prometheus.Registerer) spinOffSubqueriesMetrics { + m := spinOffSubqueriesMetrics{ + spinOffAttempts: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_frontend_subquery_spinoff_attempts_total", + Help: "Total number of queries the query-frontend attempted to spin off subqueries from.", + }), + spinOffSuccesses: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_frontend_subquery_spinoff_successes_total", + Help: "Total number of queries the query-frontend successfully spun off subqueries from.", + }), + spinOffSkipped: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_frontend_subquery_spinoff_skipped_total", + Help: "Total number of queries the query-frontend skipped or failed to spin off subqueries from.", + }, []string{"reason"}), + spunOffSubqueries: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_frontend_spun_off_subqueries_total", + Help: "Total number of subqueries that were spun off.", + }), + spunOffSubqueriesPerQuery: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_frontend_spun_off_subqueries_per_query", + Help: "Number of subqueries spun off from a single query.", + Buckets: prometheus.ExponentialBuckets(2, 2, 10), + }), + } + + // Initialize known label values. + for _, reason := range []string{ + subquerySpinoffSkippedReasonParsingFailed, + subquerySpinoffSkippedReasonMappingFailed, + subquerySpinoffSkippedReasonNoSubqueries, + subquerySpinoffSkippedReasonDownstreamQueries, + } { + m.spinOffSkipped.WithLabelValues(reason) + } + + return m +} + +func newSpinOffSubqueriesMiddleware( + limits Limits, + logger log.Logger, + engine *promql.Engine, + registerer prometheus.Registerer, + defaultStepFunc func(int64) int64, +) MetricsQueryMiddleware { + metrics := newSpinOffSubqueriesMetrics(registerer) + + return MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler { + return &spinOffSubqueriesMiddleware{ + next: next, + limits: limits, + logger: logger, + engine: engine, + metrics: metrics, + defaultStepFunc: defaultStepFunc, + } + }) +} + +func (s *spinOffSubqueriesMiddleware) Do(ctx context.Context, req MetricsQueryRequest) (Response, error) { + // Log the instant query and its timestamp in every error log, so that we have more information for debugging failures. + logger := log.With(s.logger, "query", req.GetQuery(), "query_timestamp", req.GetStart()) + + spanLog, ctx := spanlogger.NewWithLogger(ctx, logger, "spinOffSubqueriesMiddleware.Do") + defer spanLog.Span.Finish() + + var fullRangeHandler MetricsQueryHandler + if v, ok := ctx.Value(fullRangeHandlerContextKey).(MetricsQueryHandler); ok { + fullRangeHandler = v + } else { + level.Warn(spanLog).Log("range query roundtripper not set", nil) + return s.next.Do(ctx, req) + } + + // Increment total number of instant queries attempted to split metrics + s.metrics.spinOffAttempts.Inc() + + mapperStats := astmapper.NewSubquerySpinOffMapperStats() + mapperCtx, cancel := context.WithTimeout(ctx, shardingTimeout) + defer cancel() + mapper := astmapper.NewSubquerySpinOffMapper(mapperCtx, spanLog, mapperStats) + + expr, err := parser.ParseExpr(req.GetQuery()) + if err != nil { + level.Warn(spanLog).Log("msg", "failed to parse query", "err", err) + s.metrics.spinOffSkipped.WithLabelValues(subquerySpinoffSkippedReasonParsingFailed).Inc() + return nil, apierror.New(apierror.TypeBadData, DecorateWithParamName(err, "query").Error()) + } + + spinOffQuery, err := mapper.Map(expr) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) && ctx.Err() == nil { + level.Error(spanLog).Log("msg", "timeout while splitting query by instant interval, please fill in a bug report with this query, falling back to try executing without splitting", "err", err) + } else { + level.Error(spanLog).Log("msg", "failed to map the input query, falling back to try executing without splitting", "err", err) + } + s.metrics.spinOffSkipped.WithLabelValues(subquerySpinoffSkippedReasonMappingFailed).Inc() + return s.next.Do(ctx, req) + } + + if mapperStats.SpunOffSubqueries() == 0 { + // the query cannot be split, so continue + spanLog.DebugLog("msg", "input query resulted in a no operation, falling back to try executing without spinning off subqueries") + s.metrics.spinOffSkipped.WithLabelValues(subquerySpinoffSkippedReasonNoSubqueries).Inc() + return s.next.Do(ctx, req) + } + + if mapperStats.DownstreamQueries() > mapperStats.SpunOffSubqueries() { + // the query cannot be split, so continue + spanLog.DebugLog("msg", "input query resulted in more downstream queries than subqueries, falling back to try executing without spinning off subqueries") + s.metrics.spinOffSkipped.WithLabelValues(subquerySpinoffSkippedReasonDownstreamQueries).Inc() + return s.next.Do(ctx, req) + } + + spanLog.DebugLog("msg", "instant query has been rewritten to spin off subqueries", "rewritten", spinOffQuery, "regular_downstream_queries", mapperStats.DownstreamQueries(), "subqueries_spun_off", mapperStats.SpunOffSubqueries()) + + // Update query stats. + queryStats := stats.FromContext(ctx) + queryStats.AddSpunOffSubqueries(uint32(mapperStats.SpunOffSubqueries())) + + // Update metrics. + s.metrics.spinOffSuccesses.Inc() + s.metrics.spunOffSubqueries.Add(float64(mapperStats.SpunOffSubqueries())) + s.metrics.spunOffSubqueriesPerQuery.Observe(float64(mapperStats.SpunOffSubqueries())) + + // Send hint with number of embedded queries to the sharding middleware + req, err = req.WithExpr(spinOffQuery) + if err != nil { + return nil, err + } + + annotationAccumulator := NewAnnotationAccumulator() + + queryable := newSpinOffSubqueriesQueryable(req, annotationAccumulator, s.next, fullRangeHandler, s.defaultStepFunc) + + qry, err := newQuery(ctx, req, s.engine, lazyquery.NewLazyQueryable(queryable)) + if err != nil { + level.Warn(spanLog).Log("msg", "failed to create new query from splittable request", "err", err) + return nil, apierror.New(apierror.TypeBadData, err.Error()) + } + + res := qry.Exec(ctx) + extracted, err := promqlResultToSamples(res) + if err != nil { + level.Warn(spanLog).Log("msg", "failed to execute split instant query", "err", err) + return nil, mapEngineError(err) + } + + // Note that the positions based on the original query may be wrong as the rewritten + // query which is actually used is different, but the user does not see the rewritten + // query, so we pass in an empty string as the query so the positions will be hidden. + warn, info := res.Warnings.AsStrings("", 0, 0) + + // Add any annotations returned by the sharded queries, and remove any duplicates. + // We remove any position information for the same reason as above: the position information + // relates to the rewritten expression sent to queriers, not the original expression provided by the user. + accumulatedWarnings, accumulatedInfos := annotationAccumulator.getAll() + warn = append(warn, removeAllAnnotationPositionInformation(accumulatedWarnings)...) + info = append(info, removeAllAnnotationPositionInformation(accumulatedInfos)...) + warn = removeDuplicates(warn) + info = removeDuplicates(info) + + return &PrometheusResponse{ + Status: statusSuccess, + Data: &PrometheusData{ + ResultType: string(res.Value.Type()), + Result: extracted, + }, + Headers: queryable.getResponseHeaders(), + Warnings: warn, + Infos: info, + }, nil +} diff --git a/pkg/frontend/querymiddleware/spin_off_subqueries_queryable.go b/pkg/frontend/querymiddleware/spin_off_subqueries_queryable.go new file mode 100644 index 00000000000..e53017fae89 --- /dev/null +++ b/pkg/frontend/querymiddleware/spin_off_subqueries_queryable.go @@ -0,0 +1,203 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/querier/queryrange/value.go +// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/querier/queryrange/queryable.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Cortex Authors. + +package querymiddleware + +import ( + "context" + "time" + + "github.com/grafana/dskit/concurrency" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" + + "github.com/grafana/mimir/pkg/frontend/querymiddleware/astmapper" +) + +// spinOffSubqueriesQueryable is an implementor of the Queryable interface. +type spinOffSubqueriesQueryable struct { + req MetricsQueryRequest + annotationAccumulator *AnnotationAccumulator + responseHeaders *responseHeadersTracker + handler MetricsQueryHandler + upstreamRangeHandler MetricsQueryHandler + defaultStepFunc func(rangeMillis int64) int64 +} + +func newSpinOffSubqueriesQueryable(req MetricsQueryRequest, annotationAccumulator *AnnotationAccumulator, next MetricsQueryHandler, upstreamRangeHandler MetricsQueryHandler, defaultStepFunc func(rangeMillis int64) int64) *spinOffSubqueriesQueryable { + return &spinOffSubqueriesQueryable{ + req: req, + annotationAccumulator: annotationAccumulator, + handler: next, + responseHeaders: newResponseHeadersTracker(), + upstreamRangeHandler: upstreamRangeHandler, + defaultStepFunc: defaultStepFunc, + } +} + +func (q *spinOffSubqueriesQueryable) Querier(_, _ int64) (storage.Querier, error) { + return &spinOffSubqueriesQuerier{req: q.req, annotationAccumulator: q.annotationAccumulator, handler: q.handler, responseHeaders: q.responseHeaders, upstreamRangeHandler: q.upstreamRangeHandler, defaultStepFunc: q.defaultStepFunc}, nil +} + +// getResponseHeaders returns the merged response headers received by the downstream +// when running the embedded queries. +func (q *spinOffSubqueriesQueryable) getResponseHeaders() []*PrometheusHeader { + return q.responseHeaders.getHeaders() +} + +type spinOffSubqueriesQuerier struct { + req MetricsQueryRequest + annotationAccumulator *AnnotationAccumulator + handler MetricsQueryHandler + upstreamRangeHandler MetricsQueryHandler + defaultStepFunc func(rangeMillis int64) int64 + + // Keep track of response headers received when running embedded queries. + responseHeaders *responseHeadersTracker +} + +func (q *spinOffSubqueriesQuerier) Select(ctx context.Context, _ bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + var name string + values := map[string]string{} + for _, matcher := range matchers { + if matcher.Name == labels.MetricName { + name = matcher.Value + } else { + values[matcher.Name] = matcher.Value + } + } + + switch name { + case astmapper.DownstreamQueryMetricName: + downstreamReq, err := q.req.WithQuery(astmapper.DownstreamQueryLabelName) + if err != nil { + return storage.ErrSeriesSet(err) + } + resp, err := q.handler.Do(ctx, downstreamReq) + if err != nil { + return storage.ErrSeriesSet(err) + } + promRes, ok := resp.(*PrometheusResponse) + if !ok { + return storage.ErrSeriesSet(errors.Errorf("error invalid response type: %T, expected: %T", resp, &PrometheusResponse{})) + } + resStreams, err := ResponseToSamples(promRes) + if err != nil { + return storage.ErrSeriesSet(err) + } + + q.responseHeaders.mergeHeaders(promRes.Headers) + q.annotationAccumulator.addInfos(promRes.Infos) + q.annotationAccumulator.addWarnings(promRes.Warnings) + return newSeriesSetFromEmbeddedQueriesResults([][]SampleStream{resStreams}, hints) + case astmapper.SubqueryMetricName: + // Handle subqueries. + expr := values[astmapper.SubqueryQueryLabelName] + rangeStr := values[astmapper.SubqueryRangeLabelName] + stepStr := values[astmapper.SubqueryStepLabelName] + if expr == "" || rangeStr == "" || stepStr == "" { + return storage.ErrSeriesSet(errors.New("missing required labels for subquery")) + } + + queryExpr, err := parser.ParseExpr(expr) + if err != nil { + return storage.ErrSeriesSet(errors.Wrap(err, "failed to parse subquery")) + } + + queryRange, err := time.ParseDuration(rangeStr) + if err != nil { + return storage.ErrSeriesSet(errors.Wrap(err, "failed to parse subquery range")) + } + queryStep, err := time.ParseDuration(stepStr) + if err != nil { + return storage.ErrSeriesSet(errors.Wrap(err, "failed to parse subquery step")) + } + + end := q.req.GetEnd() + // Align query to absolute time by querying slightly into the future. + step := queryStep.Milliseconds() + if step == 0 { + if q.defaultStepFunc == nil { + return storage.ErrSeriesSet(errors.New("defaultStepFunc is not set")) + } + step = q.defaultStepFunc(queryRange.Milliseconds()) + } + if end%step != 0 { + end += step - (end % step) + } + reqRange := q.req.GetEnd() - q.req.GetStart() + // Calculate the earliest data point we need to query. + start := end - reqRange - queryRange.Milliseconds() + // Split queries into multiple smaller queries if they have more than 10000 datapoints + rangeStart := start + var rangeQueries []MetricsQueryRequest + for { + var rangeEnd int64 + if remainingPoints := (end - start) / step; remainingPoints > 10000 { + rangeEnd = start + 10000*step + } else { + rangeEnd = end + } + headers := q.req.GetHeaders() + headers = append(headers, + &PrometheusHeader{Name: "Content-Type", Values: []string{"application/json"}}, + &PrometheusHeader{Name: "X-Mimir-Spun-Off-Subquery", Values: []string{"true"}}, + ) // Downstream is the querier, which is HTTP req. + newRangeRequest := NewPrometheusRangeQueryRequest("/prometheus"+queryRangePathSuffix, headers, rangeStart, rangeEnd, step, q.req.GetLookbackDelta(), queryExpr, q.req.GetOptions(), q.req.GetHints()) + rangeQueries = append(rangeQueries, newRangeRequest) + if rangeEnd == end { + break + } + } + + // Concurrently run each query. It breaks and cancels each worker context on first error. + streams := make([][]SampleStream, len(rangeQueries)) + err = concurrency.ForEachJob(ctx, len(rangeQueries), len(rangeQueries), func(ctx context.Context, idx int) error { + req := rangeQueries[idx] + + resp, err := q.upstreamRangeHandler.Do(ctx, req) + if err != nil { + return err + } + promRes, ok := resp.(*PrometheusResponse) + if !ok { + return errors.Errorf("error invalid response type: %T, expected: %T", resp, &PrometheusResponse{}) + } + resStreams, err := ResponseToSamples(promRes) + if err != nil { + return err + } + streams[idx] = resStreams // No mutex is needed since each job writes its own index. This is like writing separate variables. + q.annotationAccumulator.addInfos(promRes.Infos) + q.annotationAccumulator.addWarnings(promRes.Warnings) + return nil + }) + if err != nil { + return storage.ErrSeriesSet(err) + } + return newSeriesSetFromEmbeddedQueriesResults(streams, hints) + default: + return storage.ErrSeriesSet(errors.Errorf("invalid metric name for the spin off middleware: %s", name)) + } +} + +// LabelValues implements storage.LabelQuerier. +func (q *spinOffSubqueriesQuerier) LabelValues(context.Context, string, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, errNotImplemented +} + +// LabelNames implements storage.LabelQuerier. +func (q *spinOffSubqueriesQuerier) LabelNames(context.Context, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, errNotImplemented +} + +// Close implements storage.LabelQuerier. +func (q *spinOffSubqueriesQuerier) Close() error { + return nil +} diff --git a/pkg/frontend/querymiddleware/spin_off_subqueries_test.go b/pkg/frontend/querymiddleware/spin_off_subqueries_test.go new file mode 100644 index 00000000000..b714ffd0c2e --- /dev/null +++ b/pkg/frontend/querymiddleware/spin_off_subqueries_test.go @@ -0,0 +1,686 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/querier/queryrange/promql_test.go +// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/querier/queryrange/querysharding_test.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Cortex Authors. + +package querymiddleware + +import ( + "context" + "fmt" + "math" + "sort" + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/user" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/mimir/pkg/util" +) + +func TestSubquerySpinOff_Correctness(t *testing.T) { + var ( + numSeries = 1000 + numStaleSeries = 100 + numConvHistograms = 1000 + numStaleConvHistograms = 100 + histogramBuckets = []float64{1.0, 2.0, 4.0, 10.0, 100.0, math.Inf(1)} + numNativeHistograms = 1000 + numStaleNativeHistograms = 100 + ) + + tests := map[string]struct { + query string + expectedSkippedReason string + expectedSpunOffSubqueries int + expectedDownstreamQueries int + expectSpecificOrder bool + }{ + "sum() no grouping": { + query: `sum(metric_counter)`, + expectedSkippedReason: "no-subquery", + }, + "sum() offset": { + query: `sum(metric_counter offset 5s)`, + expectedSkippedReason: "no-subquery", + }, + "sum() negative offset": { + query: `sum(metric_counter offset -5s)`, + expectedSkippedReason: "no-subquery", + }, + "sum() grouping 'by'": { + query: `sum by(group_1) (metric_counter)`, + expectedSkippedReason: "no-subquery", + }, + "sum() grouping 'without'": { + query: `sum without(unique) (metric_counter)`, + expectedSkippedReason: "no-subquery", + }, + "sum(rate()) no grouping": { + query: `sum(rate(metric_counter[1m]))`, + expectedSkippedReason: "no-subquery", + }, + "sum(rate()) grouping 'by'": { + query: `sum by(group_1) (rate(metric_counter[1m]))`, + expectedSkippedReason: "no-subquery", + }, + "sum(rate()) grouping 'without'": { + query: `sum without(unique) (rate(metric_counter[1m]))`, + expectedSkippedReason: "no-subquery", + }, + "sum(rate()) with no effective grouping because all groups have 1 series": { + query: `sum by(unique) (rate(metric_counter{group_1="0"}[1m]))`, + expectedSkippedReason: "no-subquery", + }, + `group by (group_1) (metric_counter)`: { + query: `group by (group_1) (metric_counter)`, + expectedSkippedReason: "no-subquery", + }, + `group by (group_1) (group by (group_1, group_2) (metric_counter))`: { + query: `group by (group_1) (group by (group_1, group_2) (metric_counter))`, + expectedSkippedReason: "no-subquery", + }, + `count by (group_1) (group by (group_1, group_2) (metric_counter))`: { + query: `count by (group_1) (group by (group_1, group_2) (metric_counter))`, + expectedSkippedReason: "no-subquery", + }, + "histogram_quantile() grouping only 'by' le": { + query: `histogram_quantile(0.5, sum by(le) (rate(metric_histogram_bucket[1m])))`, + expectedSkippedReason: "no-subquery", + }, + "histogram_quantile() grouping 'by'": { + query: `histogram_quantile(0.5, sum by(group_1, le) (rate(metric_histogram_bucket[1m])))`, + expectedSkippedReason: "no-subquery", + }, + "histogram_quantile() grouping 'without'": { + query: `histogram_quantile(0.5, sum without(group_1, group_2, unique) (rate(metric_histogram_bucket[1m])))`, + expectedSkippedReason: "no-subquery", + }, + "histogram_quantile() with no effective grouping because all groups have 1 series": { + query: `histogram_quantile(0.5, sum by(unique, le) (rate(metric_histogram_bucket{group_1="0"}[1m])))`, + expectedSkippedReason: "no-subquery", + }, + "min() no grouping": { + query: `min(metric_counter{group_1="0"})`, + expectedSkippedReason: "no-subquery", + }, + "min() grouping 'by'": { + query: `min by(group_2) (metric_counter{group_1="0"})`, + expectedSkippedReason: "no-subquery", + }, + "min() grouping 'without'": { + query: `min without(unique) (metric_counter{group_1="0"})`, + expectedSkippedReason: "no-subquery", + }, + "max() no grouping": { + query: `max(metric_counter{group_1="0"})`, + expectedSkippedReason: "no-subquery", + }, + "max() grouping 'by'": { + query: `max by(group_2) (metric_counter{group_1="0"})`, + expectedSkippedReason: "no-subquery", + }, + "max() grouping 'without'": { + query: `max without(unique) (metric_counter{group_1="0"})`, + expectedSkippedReason: "no-subquery", + }, + "count() no grouping": { + query: `count(metric_counter)`, + expectedSkippedReason: "no-subquery", + }, + "count() grouping 'by'": { + query: `count by(group_2) (metric_counter)`, + expectedSkippedReason: "no-subquery", + }, + "count() grouping 'without'": { + query: `count without(unique) (metric_counter)`, + expectedSkippedReason: "no-subquery", + }, + "sum(count())": { + query: `sum(count by(group_1) (metric_counter))`, + expectedSkippedReason: "no-subquery", + }, + "avg() no grouping": { + query: `avg(metric_counter)`, + expectedSkippedReason: "no-subquery", // avg() is parallelized as sum()/count(). + }, + "avg() grouping 'by'": { + query: `avg by(group_2) (metric_counter)`, + expectedSkippedReason: "no-subquery", // avg() is parallelized as sum()/count(). + }, + "avg() grouping 'without'": { + query: `avg without(unique) (metric_counter)`, + expectedSkippedReason: "no-subquery", // avg() is parallelized as sum()/count(). + }, + "sum(min_over_time())": { + query: `sum by (group_1, group_2) (min_over_time(metric_counter{const="fixed"}[2m]))`, + expectedSkippedReason: "no-subquery", + }, + "sum(max_over_time())": { + query: `sum by (group_1, group_2) (max_over_time(metric_counter{const="fixed"}[2m]))`, + expectedSkippedReason: "no-subquery", + }, + "sum(avg_over_time())": { + query: `sum by (group_1, group_2) (avg_over_time(metric_counter{const="fixed"}[2m]))`, + expectedSkippedReason: "no-subquery", + }, + "or": { + query: `sum(rate(metric_counter{group_1="0"}[1m])) or sum(rate(metric_counter{group_1="1"}[1m]))`, + expectedSkippedReason: "no-subquery", + }, + "and": { + query: ` + sum without(unique) (rate(metric_counter{group_1="0"}[1m])) + and + max without(unique) (metric_counter) > 0`, + expectedSkippedReason: "no-subquery", + }, + "sum(rate()) > avg(rate())": { + query: ` + sum(rate(metric_counter[1m])) + > + avg(rate(metric_counter[1m]))`, + expectedSkippedReason: "no-subquery", // avg() is parallelized as sum()/count(). + }, + "sum by(unique) * on (unique) group_left (group_1) avg by (unique, group_1)": { + // ensure that avg transformation into sum/count does not break label matching in previous binop. + query: ` + sum by(unique) (metric_counter) + * + on (unique) group_left (group_1) + avg by (unique, group_1) (metric_counter)`, + expectedSkippedReason: "no-subquery", + }, + "sum by (rate()) / 2 ^ 2": { + query: ` + sum by (group_1) (rate(metric_counter[1m])) / 2 ^ 2`, + expectedSkippedReason: "no-subquery", + }, + "sum by (rate()) / time() *2": { + query: ` + sum by (group_1) (rate(metric_counter[1m])) / time() *2`, + expectedSkippedReason: "no-subquery", + }, + "sum(rate()) / vector(3) ^ month()": { + query: `sum(rate(metric_counter[1m])) / vector(3) ^ month()`, + expectedSkippedReason: "no-subquery", + }, + "sum(rate(metric_counter[1m])) / vector(3) ^ vector(2) + sum(ln(metric_counter))": { + query: `sum(rate(metric_counter[1m])) / vector(3) ^ vector(2) + sum(ln(metric_counter))`, + expectedSkippedReason: "no-subquery", + }, + "nested count()": { + query: `sum( + count( + count(metric_counter) by (group_1, group_2) + ) by (group_1) + )`, + expectedSkippedReason: "no-subquery", + }, + "subquery max": { + query: `max_over_time( + rate(metric_counter[1m]) + [5m:1m] + )`, + expectedSpunOffSubqueries: 1, + }, + "subquery min": { + query: `min_over_time( + rate(metric_counter[1m]) + [5m:1m] + )`, + expectedSpunOffSubqueries: 1, + }, + "sum of subquery min": { + query: `sum by(group_1) (min_over_time((changes(metric_counter[5m]))[10m:2m]))`, + expectedSpunOffSubqueries: 1, + }, + "triple subquery": { + query: `max_over_time( + stddev_over_time( + deriv( + rate(metric_counter[10m]) + [5m:1m]) + [2m:]) + [10m:])`, + expectedSpunOffSubqueries: 1, + }, + "double subquery deriv": { + query: `max_over_time( deriv( rate(metric_counter[10m])[5m:1m] )[10m:] )`, + expectedSpunOffSubqueries: 1, + }, + "@ modifier": { + query: `sum by (group_1)(rate(metric_counter[1h] @ end())) + sum by (group_1)(rate(metric_counter[1h] @ start()))`, + expectedSkippedReason: "no-subquery", + }, + "@ modifier and offset": { + query: `sum by (group_1)(rate(metric_counter[1h] @ end() offset 1m))`, + expectedSkippedReason: "no-subquery", + }, + "@ modifier and negative offset": { + query: `sum by (group_1)(rate(metric_counter[1h] @ start() offset -1m))`, + expectedSkippedReason: "no-subquery", + }, + "label_replace": { + query: `sum by (foo)( + label_replace( + rate(metric_counter{group_1="0"}[1m]), + "foo", "bar$1", "group_2", "(.*)" + ) + )`, + expectedSkippedReason: "no-subquery", + }, + "label_join": { + query: `sum by (foo)( + label_join( + rate(metric_counter{group_1="0"}[1m]), + "foo", ",", "group_1", "group_2", "const" + ) + )`, + expectedSkippedReason: "no-subquery", + }, + `query with sort() expects specific order`: { + query: `sort(sum(metric_histogram_bucket) by (le))`, + expectedSkippedReason: "no-subquery", + expectSpecificOrder: true, + }, + "scalar(aggregation)": { + query: `scalar(sum(metric_counter))`, + expectedSkippedReason: "no-subquery", + }, + `filtering binary operation with constant scalar`: { + query: `count(metric_counter > 0)`, + expectedSkippedReason: "no-subquery", + }, + `filtering binary operation of a function result with scalar`: { + query: `max_over_time(metric_counter[5m]) > 0`, + expectedSkippedReason: "no-subquery", + }, + `binary operation with an aggregation on one hand`: { + query: `sum(metric_counter) > 1`, + expectedSkippedReason: "no-subquery", + }, + `binary operation with an aggregation on the other hand`: { + query: `0 < sum(metric_counter)`, + expectedSkippedReason: "no-subquery", + }, + `binary operation with an aggregation by some label on one hand`: { + query: `count by (unique) (metric_counter) > 0`, + expectedSkippedReason: "no-subquery", + }, + `filtering binary operation with non constant`: { + query: `max by(unique) (max_over_time(metric_counter[5m])) > scalar(min(metric_counter))`, + expectedSkippedReason: "no-subquery", + }, + // + // The following queries are not expected to be shardable. + // + "subquery min_over_time with aggr": { + query: `min_over_time( + sum by(group_1) ( + rate(metric_counter[5m]) + )[10m:] + )`, + expectedSpunOffSubqueries: 1, + }, + "outer subquery on top of sum": { + query: `sum(metric_counter) by (group_1)[5m:1m]`, + expectedSkippedReason: "no-subquery", + }, + "outer subquery on top of avg": { + query: `avg(metric_counter) by (group_1)[5m:1m]`, + expectedSkippedReason: "no-subquery", + }, + "stddev()": { + query: `stddev(metric_counter{const="fixed"})`, + expectedSkippedReason: "no-subquery", + }, + "stdvar()": { + query: `stdvar(metric_counter{const="fixed"})`, + expectedSkippedReason: "no-subquery", + }, + "topk()": { + query: `topk(2, metric_counter{const="fixed"})`, + expectedSkippedReason: "no-subquery", + }, + "bottomk()": { + query: `bottomk(2, metric_counter{const="fixed"})`, + expectedSkippedReason: "no-subquery", + }, + "vector()": { + query: `vector(1)`, + expectedSkippedReason: "no-subquery", + }, + "scalar(single metric)": { + query: `scalar(metric_counter{unique="1"})`, // Select a single metric. + expectedSkippedReason: "no-subquery", + }, + "histogram_quantile no grouping": { + query: fmt.Sprintf(`histogram_quantile(0.99, metric_histogram_bucket{unique="%d"})`, numSeries+10), // Select a single histogram metric. + expectedSkippedReason: "no-subquery", + }, + "histogram_quantile with inner aggregation": { + query: `sum by (group_1) (histogram_quantile(0.9, rate(metric_histogram_bucket[1m])))`, + expectedSkippedReason: "no-subquery", + }, + "histogram_quantile without aggregation": { + query: `histogram_quantile(0.5, rate(metric_histogram_bucket{group_1="0"}[1m]))`, + expectedSkippedReason: "no-subquery", + }, + `subqueries with non parallelizable function in children`: { + query: `max_over_time( + absent_over_time( + deriv( + rate(metric_counter[1m]) + [5m:1m]) + [2m:1m]) + [10m:1m] offset 25m)`, + expectedSkippedReason: "no-subquery", + }, + "string literal": { + query: `"test"`, + expectedSkippedReason: "no-subquery", + }, + "day_of_month() >= 1 and day_of_month()": { + query: `day_of_month() >= 1 and day_of_month()`, + expectedSkippedReason: "no-subquery", + }, + "month() >= 1 and month()": { + query: `month() >= 1 and month()`, + expectedSkippedReason: "no-subquery", + }, + "vector(1) > 0 and vector(1)": { + query: `vector(1) > 0 and vector(1)`, + expectedSkippedReason: "no-subquery", + }, + "sum(metric_counter) > 0 and vector(1)": { + query: `sum(metric_counter) > 0 and vector(1)`, + expectedSkippedReason: "no-subquery", + }, + "vector(1)": { + query: `vector(1)`, + expectedSkippedReason: "no-subquery", + }, + "time()": { + query: `time()`, + expectedSkippedReason: "no-subquery", + }, + "month(sum(metric_counter))": { + query: `month(sum(metric_counter))`, + expectedSkippedReason: "no-subquery", // Sharded because the contents of `sum()` is sharded. + }, + "month(sum(metric_counter)) > 0 and vector(1)": { + query: `month(sum(metric_counter)) > 0 and vector(1)`, + expectedSkippedReason: "no-subquery", // Sharded because the contents of `sum()` is sharded. + }, + "0 < bool 1": { + query: `0 < bool 1`, + expectedSkippedReason: "no-subquery", + }, + "scalar(metric_counter{const=\"fixed\"}) < bool 1": { + query: `scalar(metric_counter{const="fixed"}) < bool 1`, + expectedSkippedReason: "no-subquery", + }, + "scalar(sum(metric_counter)) < bool 1": { + query: `scalar(sum(metric_counter)) < bool 1`, + expectedSkippedReason: "no-subquery", + }, + // Summing floats and native histograms together makes no sense, see + // https://prometheus.io/docs/prometheus/latest/querying/operators/#operators-for-native-histograms + // so we exclude native histograms here and in some subsequent tests + `sum({__name__!=""}) excluding native histograms`: { + query: `sum({__name__!="",__name__!="metric_native_histogram"})`, + expectedSkippedReason: "no-subquery", + }, + `sum by (group_1) ({__name__!=""}) excluding native histograms`: { + query: `sum by (group_1) ({__name__!="",__name__!="metric_native_histogram"})`, + expectedSkippedReason: "no-subquery", + }, + `sum by (group_1) (count_over_time({__name__!=""}[1m])) excluding native histograms`: { + query: `sum by (group_1) (count_over_time({__name__!="",__name__!="metric_native_histogram"}[1m]))`, + expectedSkippedReason: "no-subquery", + }, + `sum(metric_native_histogram)`: { + query: `sum(metric_native_histogram)`, + expectedSkippedReason: "no-subquery", + }, + `sum by (group_1) (metric_native_histogram)`: { + query: `sum by (group_1) (metric_native_histogram)`, + expectedSkippedReason: "no-subquery", + }, + `sum by (group_1) (count_over_time(metric_native_histogram[1m]))`: { + query: `sum by (group_1) (count_over_time(metric_native_histogram[1m]))`, + expectedSkippedReason: "no-subquery", + }, + `count(metric_native_histogram)`: { + query: `count(metric_native_histogram)`, + expectedSkippedReason: "no-subquery", + }, + `count by (group_1) (metric_native_histogram)`: { + query: `count by (group_1) (metric_native_histogram)`, + expectedSkippedReason: "no-subquery", + }, + `count by (group_1) (count_over_time(metric_native_histogram[1m]))`: { + query: `count by (group_1) (count_over_time(metric_native_histogram[1m]))`, + expectedSkippedReason: "no-subquery", + }, + `histogram_sum(sum(metric_native_histogram))`: { + query: `histogram_sum(sum(metric_native_histogram))`, + expectedSkippedReason: "no-subquery", + }, + `histogram_count(sum(metric_native_histogram))`: { + query: `histogram_count(sum(metric_native_histogram))`, + expectedSkippedReason: "no-subquery", + }, + `histogram_quantile(0.5, sum(metric_native_histogram))`: { + query: `histogram_quantile(0.5, sum(metric_native_histogram))`, + expectedSkippedReason: "no-subquery", + }, + `histogram_fraction(0, 0.5, sum(metric_native_histogram))`: { + query: `histogram_fraction(0, 0.5, sum(metric_native_histogram))`, + expectedSkippedReason: "no-subquery", + }, + `histogram_stdvar`: { + query: `histogram_stdvar(metric_native_histogram)`, + expectedSkippedReason: "no-subquery", + }, + `histogram_stdvar on sum of metrics`: { + query: `histogram_stdvar(sum(metric_native_histogram))`, + expectedSkippedReason: "no-subquery", + }, + `histogram_stddev`: { + query: `histogram_stddev(metric_native_histogram)`, + expectedSkippedReason: "no-subquery", + }, + `histogram_stddev on sum of metrics`: { + query: `histogram_stddev(sum(metric_native_histogram))`, + expectedSkippedReason: "no-subquery", + }, + } + + series := make([]storage.Series, 0, numSeries+(numConvHistograms*len(histogramBuckets))+numNativeHistograms) + seriesID := 0 + + // Add counter series. + for i := 0; i < numSeries; i++ { + gen := factor(float64(i) * 0.1) + if i >= numSeries-numStaleSeries { + // Wrap the generator to inject the staleness marker between minute 10 and 20. + gen = stale(start.Add(10*time.Minute), start.Add(20*time.Minute), gen) + } + + series = append(series, newSeries(newTestCounterLabels(seriesID), start.Add(-lookbackDelta), end, step, gen)) + seriesID++ + } + + // Add a special series whose data points end earlier than the end of the queried time range + // and has NO stale marker. + series = append(series, newSeries(newTestCounterLabels(seriesID), + start.Add(-lookbackDelta), end.Add(-5*time.Minute), step, factor(2))) + seriesID++ + + // Add a special series whose data points end earlier than the end of the queried time range + // and HAS a stale marker at the end. + series = append(series, newSeries(newTestCounterLabels(seriesID), + start.Add(-lookbackDelta), end.Add(-5*time.Minute), step, stale(end.Add(-6*time.Minute), end.Add(-4*time.Minute), factor(2)))) + seriesID++ + + // Add a special series whose data points start later than the start of the queried time range. + series = append(series, newSeries(newTestCounterLabels(seriesID), + start.Add(5*time.Minute), end, step, factor(2))) + seriesID++ + + // Add conventional histogram series. + for i := 0; i < numConvHistograms; i++ { + for bucketIdx, bucketLe := range histogramBuckets { + // We expect each bucket to have a value higher than the previous one. + gen := factor(float64(i) * float64(bucketIdx) * 0.1) + if i >= numConvHistograms-numStaleConvHistograms { + // Wrap the generator to inject the staleness marker between minute 10 and 20. + gen = stale(start.Add(10*time.Minute), start.Add(20*time.Minute), gen) + } + + series = append(series, newSeries(newTestConventionalHistogramLabels(seriesID, bucketLe), + start.Add(-lookbackDelta), end, step, gen)) + } + + // Increase the series ID after all per-bucket series have been created. + seriesID++ + } + + // Add native histogram series. + for i := 0; i < numNativeHistograms; i++ { + gen := factor(float64(i) * 0.1) + if i >= numNativeHistograms-numStaleNativeHistograms { + // Wrap the generator to inject the staleness marker between minute 10 and 20. + gen = stale(start.Add(10*time.Minute), start.Add(20*time.Minute), gen) + } + + series = append(series, newNativeHistogramSeries(newTestNativeHistogramLabels(seriesID), start.Add(-lookbackDelta), end, step, gen)) + seriesID++ + } + + // Create a queryable on the fixtures. + queryable := storageSeriesQueryable(series) + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + t.Parallel() + + req := &PrometheusInstantQueryRequest{ + path: "/query", + time: util.TimeToMillis(end), + queryExpr: parseQuery(t, testData.query), + } + + engine := newEngine() + downstream := &downstreamHandler{ + engine: engine, + queryable: queryable, + } + + // Run the query without subquery spin-off. + expectedRes, err := downstream.Do(context.Background(), req) + require.Nil(t, err) + expectedPrometheusRes := expectedRes.(*PrometheusResponse) + if !testData.expectSpecificOrder { + sort.Sort(byLabels(expectedPrometheusRes.Data.Result)) + } + + // Ensure the query produces some results. + require.NotEmpty(t, expectedPrometheusRes.Data.Result) + requireValidSamples(t, expectedPrometheusRes.Data.Result) + + if testData.expectedSpunOffSubqueries > 0 { + // Remove position information from annotations, to mirror what we expect from the sharded queries below. + removeAllAnnotationPositionInformation(expectedPrometheusRes.Infos) + removeAllAnnotationPositionInformation(expectedPrometheusRes.Warnings) + } + + reg := prometheus.NewPedanticRegistry() + spinoffMiddleware := newSpinOffSubqueriesMiddleware( + mockLimits{}, + log.NewNopLogger(), + engine, + reg, + defaultStepFunc, + ) + + // Run the query with subquery spin-off. + ctx := user.InjectOrgID(context.Background(), "test") + + ctx = context.WithValue(ctx, fullRangeHandlerContextKey, downstream) + spinoffRes, err := spinoffMiddleware.Wrap(downstream).Do(ctx, req) + require.Nil(t, err) + + // Ensure the two results matches (float precision can slightly differ, there's no guarantee in PromQL engine too + // if you rerun the same query twice). + shardedPrometheusRes := spinoffRes.(*PrometheusResponse) + if !testData.expectSpecificOrder { + sort.Sort(byLabels(shardedPrometheusRes.Data.Result)) + } + approximatelyEquals(t, expectedPrometheusRes, shardedPrometheusRes) + + var noSubqueries int + if testData.expectedSkippedReason == "no-subquery" { + noSubqueries = 1 + } + + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` +# HELP cortex_frontend_spun_off_subqueries_total Total number of subqueries that were spun off. +# TYPE cortex_frontend_spun_off_subqueries_total counter +cortex_frontend_spun_off_subqueries_total %d +# HELP cortex_frontend_subquery_spinoff_attempts_total Total number of queries the query-frontend attempted to spin off subqueries from. +# TYPE cortex_frontend_subquery_spinoff_attempts_total counter +cortex_frontend_subquery_spinoff_attempts_total 1 +# HELP cortex_frontend_subquery_spinoff_skipped_total Total number of queries the query-frontend skipped or failed to spin off subqueries from. +# TYPE cortex_frontend_subquery_spinoff_skipped_total counter +cortex_frontend_subquery_spinoff_skipped_total{reason="mapping-failed"} 0 +cortex_frontend_subquery_spinoff_skipped_total{reason="no-subqueries"} %d +cortex_frontend_subquery_spinoff_skipped_total{reason="parsing-failed"} 0 +cortex_frontend_subquery_spinoff_skipped_total{reason="too-many-downstream-queries"} 0 +# HELP cortex_frontend_subquery_spinoff_successes_total Total number of queries the query-frontend successfully spun off subqueries from. +# TYPE cortex_frontend_subquery_spinoff_successes_total counter +cortex_frontend_subquery_spinoff_successes_total %d + `, testData.expectedSpunOffSubqueries, noSubqueries, testData.expectedSpunOffSubqueries)), + "cortex_frontend_subquery_spinoff_attempts_total", + "cortex_frontend_subquery_spinoff_successes_total", + "cortex_frontend_subquery_spinoff_skipped_total", + "cortex_frontend_spun_off_subqueries_total")) + }) + + } +} + +func TestSubquerySpinOff_ShouldReturnErrorOnDownstreamHandlerFailure(t *testing.T) { + req := &PrometheusInstantQueryRequest{ + path: "/query", + time: util.TimeToMillis(end), + queryExpr: parseQuery(t, "vector(1)"), + } + + spinoffMiddleware := newSpinOffSubqueriesMiddleware(mockLimits{}, log.NewNopLogger(), newEngine(), nil, defaultStepFunc) + + // Mock the downstream handler to always return error. + downstreamErr := errors.Errorf("some err") + downstream := mockHandlerWith(nil, downstreamErr) + + // Run the query with subquery spin-off middleware wrapping the downstream one. + // We expect to get the downstream error. + _, err := spinoffMiddleware.Wrap(downstream).Do(user.InjectOrgID(context.Background(), "test"), req) + require.Error(t, err) + assert.Equal(t, downstreamErr, err) +} + +var defaultStepFunc = func(int64) int64 { + return (1 * time.Minute).Milliseconds() +} diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index ca210588510..00fa098a030 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -329,6 +329,7 @@ func (f *Handler) reportQueryStats( "fetched_index_bytes", numIndexBytes, "sharded_queries", stats.LoadShardedQueries(), "split_queries", stats.LoadSplitQueries(), + "spun_off_subqueries", stats.LoadSpunOffSubqueries(), "estimated_series_count", stats.GetEstimatedSeriesCount(), "queue_time_seconds", stats.LoadQueueTime().Seconds(), "encode_time_seconds", stats.LoadEncodeTime().Seconds(), diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index 24587c71a8a..e46087b34cf 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -219,6 +219,20 @@ func (s *Stats) LoadSamplesProcessed() uint64 { return atomic.LoadUint64(&s.SamplesProcessed) } +func (s *Stats) AddSpunOffSubqueries(num uint32) { + if s == nil { + return + } + atomic.AddUint32(&s.SpunOffSubqueries, num) +} + +func (s *Stats) LoadSpunOffSubqueries() uint32 { + if s == nil { + return 0 + } + return atomic.LoadUint32(&s.SpunOffSubqueries) +} + // Merge the provided Stats into this one. func (s *Stats) Merge(other *Stats) { if s == nil || other == nil { @@ -236,6 +250,7 @@ func (s *Stats) Merge(other *Stats) { s.AddQueueTime(other.LoadQueueTime()) s.AddEncodeTime(other.LoadEncodeTime()) s.AddSamplesProcessed(other.LoadSamplesProcessed()) + s.AddSpunOffSubqueries(other.LoadSpunOffSubqueries()) } // Copy returns a copy of the stats. Use this rather than regular struct assignment diff --git a/pkg/querier/stats/stats.pb.go b/pkg/querier/stats/stats.pb.go index 60c208a2a4a..1941b4479c5 100644 --- a/pkg/querier/stats/stats.pb.go +++ b/pkg/querier/stats/stats.pb.go @@ -52,6 +52,8 @@ type Stats struct { EncodeTime time.Duration `protobuf:"bytes,10,opt,name=encode_time,json=encodeTime,proto3,stdduration" json:"encode_time"` // TotalSamples represents the total number of samples scanned while evaluating a query. SamplesProcessed uint64 `protobuf:"varint,11,opt,name=samples_processed,json=samplesProcessed,proto3" json:"samples_processed,omitempty"` + // The number of subqueries that were spun off as actual range queries in order to execute the full query + SpunOffSubqueries uint32 `protobuf:"varint,12,opt,name=spun_off_subqueries,json=spunOffSubqueries,proto3" json:"spun_off_subqueries,omitempty"` } func (m *Stats) Reset() { *m = Stats{} } @@ -163,6 +165,13 @@ func (m *Stats) GetSamplesProcessed() uint64 { return 0 } +func (m *Stats) GetSpunOffSubqueries() uint32 { + if m != nil { + return m.SpunOffSubqueries + } + return 0 +} + func init() { proto.RegisterType((*Stats)(nil), "stats.Stats") } @@ -170,34 +179,35 @@ func init() { func init() { proto.RegisterFile("stats.proto", fileDescriptor_b4756a0aec8b9d44) } var fileDescriptor_b4756a0aec8b9d44 = []byte{ - // 422 bytes of a gzipped FileDescriptorProto + // 447 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0xb1, 0x8e, 0xd3, 0x40, - 0x10, 0x86, 0xbd, 0x90, 0x1c, 0xc9, 0x9a, 0x03, 0xce, 0x44, 0xc8, 0x5c, 0xb1, 0x17, 0x41, 0x41, - 0x24, 0x24, 0x07, 0x01, 0x1d, 0x0d, 0xf2, 0xa5, 0xa1, 0x83, 0x84, 0x8a, 0xc6, 0x72, 0xec, 0x89, - 0x63, 0x61, 0x7b, 0x1d, 0xef, 0x5a, 0x40, 0xc7, 0x23, 0x50, 0xf2, 0x08, 0x3c, 0x4a, 0xca, 0x94, - 0xa9, 0x80, 0x38, 0x0d, 0x65, 0x1e, 0x01, 0x79, 0x76, 0x1d, 0x25, 0x57, 0xa5, 0xf3, 0xce, 0x37, - 0xdf, 0xce, 0xaf, 0x1d, 0x53, 0x53, 0x48, 0x5f, 0x0a, 0x27, 0x2f, 0xb8, 0xe4, 0x56, 0x1b, 0x0f, - 0x97, 0xbd, 0x88, 0x47, 0x1c, 0x2b, 0xc3, 0xfa, 0x4b, 0xc1, 0x4b, 0x16, 0x71, 0x1e, 0x25, 0x30, - 0xc4, 0xd3, 0xb4, 0x9c, 0x0d, 0xc3, 0xb2, 0xf0, 0x65, 0xcc, 0x33, 0xc5, 0x9f, 0x2c, 0x5b, 0xb4, - 0x3d, 0xa9, 0x7d, 0xeb, 0x2d, 0xed, 0x7e, 0xf1, 0x93, 0xc4, 0x93, 0x71, 0x0a, 0x36, 0xe9, 0x93, - 0x81, 0xf9, 0xf2, 0xb1, 0xa3, 0x6c, 0xa7, 0xb1, 0x9d, 0x91, 0xb6, 0xdd, 0xce, 0xf2, 0xf7, 0x95, - 0xf1, 0xf3, 0xcf, 0x15, 0x19, 0x77, 0x6a, 0xeb, 0x63, 0x9c, 0x82, 0xf5, 0x82, 0xf6, 0x66, 0x20, - 0x83, 0x39, 0x84, 0x9e, 0x80, 0x22, 0x06, 0xe1, 0x05, 0xbc, 0xcc, 0xa4, 0x7d, 0xab, 0x4f, 0x06, - 0xad, 0xb1, 0xa5, 0xd9, 0x04, 0xd1, 0x75, 0x4d, 0x2c, 0x87, 0x3e, 0x6c, 0x8c, 0x60, 0x5e, 0x66, - 0x9f, 0xbd, 0xe9, 0x37, 0x09, 0xc2, 0xbe, 0x8d, 0xc2, 0x85, 0x46, 0xd7, 0x35, 0x71, 0x6b, 0x70, - 0x38, 0x01, 0xfb, 0x9b, 0x09, 0xad, 0xa3, 0x09, 0x28, 0xe8, 0x09, 0xcf, 0xe8, 0x7d, 0x31, 0xf7, - 0x8b, 0x10, 0x42, 0x6f, 0x51, 0xe2, 0x64, 0xbb, 0xdd, 0x27, 0x83, 0xf3, 0xf1, 0x3d, 0x5d, 0xfe, - 0xa0, 0xaa, 0xd6, 0x53, 0x7a, 0x2e, 0xf2, 0x24, 0x96, 0xfb, 0xb6, 0x33, 0x6c, 0xbb, 0x8b, 0xc5, - 0xa6, 0xe9, 0x20, 0x6f, 0x9c, 0x85, 0xf0, 0x55, 0xe7, 0xbd, 0x73, 0x94, 0xf7, 0x5d, 0x4d, 0x54, - 0xde, 0xd7, 0xf4, 0x11, 0x08, 0x19, 0xa7, 0xbe, 0xbc, 0xf9, 0x26, 0x1d, 0x54, 0x7a, 0x7b, 0x7a, - 0xf8, 0x2a, 0x2e, 0xa5, 0x8b, 0x12, 0x4a, 0x50, 0xab, 0xe8, 0x9e, 0xbe, 0x8a, 0x2e, 0x6a, 0xb8, - 0x8b, 0x11, 0x35, 0x21, 0x0b, 0x78, 0xa8, 0x2f, 0xa1, 0xa7, 0x5f, 0x42, 0x95, 0x87, 0xb7, 0x3c, - 0xa7, 0x17, 0xc2, 0x4f, 0xf3, 0x04, 0x84, 0x97, 0x17, 0x3c, 0x00, 0x21, 0x20, 0xb4, 0x4d, 0x8c, - 0xfe, 0x40, 0x83, 0xf7, 0x4d, 0xdd, 0x7d, 0xb3, 0xda, 0x30, 0x63, 0xbd, 0x61, 0xc6, 0x6e, 0xc3, - 0xc8, 0xf7, 0x8a, 0x91, 0x5f, 0x15, 0x23, 0xcb, 0x8a, 0x91, 0x55, 0xc5, 0xc8, 0xdf, 0x8a, 0x91, - 0x7f, 0x15, 0x33, 0x76, 0x15, 0x23, 0x3f, 0xb6, 0xcc, 0x58, 0x6d, 0x99, 0xb1, 0xde, 0x32, 0xe3, - 0x93, 0xfa, 0x7b, 0xa7, 0x67, 0x18, 0xe9, 0xd5, 0xff, 0x00, 0x00, 0x00, 0xff, 0xff, 0x44, 0x02, - 0xa0, 0xdb, 0xda, 0x02, 0x00, 0x00, + 0x10, 0x40, 0xbd, 0x90, 0x1c, 0xc9, 0xe6, 0x0e, 0x88, 0x89, 0x90, 0xb9, 0x62, 0x2f, 0x82, 0x82, + 0x48, 0x48, 0x0e, 0x02, 0x3a, 0x1a, 0x94, 0xbb, 0x86, 0x0a, 0x48, 0xa8, 0x68, 0x2c, 0xc7, 0x1e, + 0x27, 0x16, 0xb6, 0xd7, 0xe7, 0xdd, 0x15, 0xd0, 0xf1, 0x09, 0x94, 0x7c, 0x02, 0x9f, 0x72, 0x65, + 0xca, 0xab, 0x80, 0x38, 0x0d, 0xe5, 0x95, 0x94, 0x68, 0x67, 0xd7, 0x51, 0x8e, 0x2a, 0x5d, 0x76, + 0xde, 0xbc, 0x99, 0xc9, 0x8c, 0x69, 0x4f, 0xc8, 0x50, 0x0a, 0xbf, 0xac, 0xb8, 0xe4, 0x6e, 0x1b, + 0x1f, 0xc7, 0x83, 0x05, 0x5f, 0x70, 0x8c, 0x8c, 0xf5, 0x2f, 0x03, 0x8f, 0xd9, 0x82, 0xf3, 0x45, + 0x06, 0x63, 0x7c, 0xcd, 0x55, 0x32, 0x8e, 0x55, 0x15, 0xca, 0x94, 0x17, 0x86, 0x3f, 0xfc, 0xdb, + 0xa2, 0xed, 0x99, 0xf6, 0xdd, 0x57, 0xb4, 0xfb, 0x29, 0xcc, 0xb2, 0x40, 0xa6, 0x39, 0x78, 0x64, + 0x48, 0x46, 0xbd, 0x67, 0x0f, 0x7c, 0x63, 0xfb, 0x8d, 0xed, 0x9f, 0x59, 0x7b, 0xd2, 0xb9, 0xf8, + 0x79, 0xe2, 0x7c, 0xff, 0x75, 0x42, 0xa6, 0x1d, 0x6d, 0xbd, 0x4f, 0x73, 0x70, 0x9f, 0xd2, 0x41, + 0x02, 0x32, 0x5a, 0x42, 0x1c, 0x08, 0xa8, 0x52, 0x10, 0x41, 0xc4, 0x55, 0x21, 0xbd, 0x1b, 0x43, + 0x32, 0x6a, 0x4d, 0x5d, 0xcb, 0x66, 0x88, 0x4e, 0x35, 0x71, 0x7d, 0x7a, 0xaf, 0x31, 0xa2, 0xa5, + 0x2a, 0x3e, 0x06, 0xf3, 0x2f, 0x12, 0x84, 0x77, 0x13, 0x85, 0xbe, 0x45, 0xa7, 0x9a, 0x4c, 0x34, + 0xd8, 0xed, 0x80, 0xf9, 0x4d, 0x87, 0xd6, 0xb5, 0x0e, 0x28, 0xd8, 0x0e, 0x8f, 0xe9, 0x1d, 0xb1, + 0x0c, 0xab, 0x18, 0xe2, 0xe0, 0x5c, 0x61, 0x67, 0xaf, 0x3d, 0x24, 0xa3, 0xa3, 0xe9, 0x6d, 0x1b, + 0x7e, 0x67, 0xa2, 0xee, 0x23, 0x7a, 0x24, 0xca, 0x2c, 0x95, 0xdb, 0xb4, 0x03, 0x4c, 0x3b, 0xc4, + 0x60, 0x93, 0xb4, 0x33, 0x6f, 0x5a, 0xc4, 0xf0, 0xd9, 0xce, 0x7b, 0xeb, 0xda, 0xbc, 0xaf, 0x35, + 0x31, 0xf3, 0xbe, 0xa0, 0xf7, 0x41, 0xc8, 0x34, 0x0f, 0xe5, 0xff, 0x3b, 0xe9, 0xa0, 0x32, 0xd8, + 0xd2, 0xdd, 0xad, 0x4c, 0x28, 0x3d, 0x57, 0xa0, 0xc0, 0x9c, 0xa2, 0xbb, 0xff, 0x29, 0xba, 0xa8, + 0xe1, 0x2d, 0xce, 0x68, 0x0f, 0x8a, 0x88, 0xc7, 0xb6, 0x08, 0xdd, 0xbf, 0x08, 0x35, 0x1e, 0x56, + 0x79, 0x42, 0xfb, 0x22, 0xcc, 0xcb, 0x0c, 0x44, 0x50, 0x56, 0x3c, 0x02, 0x21, 0x20, 0xf6, 0x7a, + 0x38, 0xfa, 0x5d, 0x0b, 0xde, 0x36, 0x71, 0xbd, 0x1c, 0x51, 0xaa, 0x22, 0xe0, 0x49, 0x12, 0x08, + 0x35, 0x6f, 0xf6, 0x78, 0x88, 0x7b, 0xec, 0x6b, 0xf4, 0x26, 0x49, 0x66, 0x5b, 0x30, 0x79, 0xb9, + 0x5a, 0x33, 0xe7, 0x72, 0xcd, 0x9c, 0xab, 0x35, 0x23, 0x5f, 0x6b, 0x46, 0x7e, 0xd4, 0x8c, 0x5c, + 0xd4, 0x8c, 0xac, 0x6a, 0x46, 0x7e, 0xd7, 0x8c, 0xfc, 0xa9, 0x99, 0x73, 0x55, 0x33, 0xf2, 0x6d, + 0xc3, 0x9c, 0xd5, 0x86, 0x39, 0x97, 0x1b, 0xe6, 0x7c, 0x30, 0x5f, 0xfb, 0xfc, 0x00, 0xff, 0xc2, + 0xf3, 0x7f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x1f, 0x46, 0x5f, 0x3b, 0x0a, 0x03, 0x00, 0x00, } func (this *Stats) Equal(that interface{}) bool { @@ -252,13 +262,16 @@ func (this *Stats) Equal(that interface{}) bool { if this.SamplesProcessed != that1.SamplesProcessed { return false } + if this.SpunOffSubqueries != that1.SpunOffSubqueries { + return false + } return true } func (this *Stats) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 15) + s := make([]string, 0, 16) s = append(s, "&stats.Stats{") s = append(s, "WallTime: "+fmt.Sprintf("%#v", this.WallTime)+",\n") s = append(s, "FetchedSeriesCount: "+fmt.Sprintf("%#v", this.FetchedSeriesCount)+",\n") @@ -271,6 +284,7 @@ func (this *Stats) GoString() string { s = append(s, "QueueTime: "+fmt.Sprintf("%#v", this.QueueTime)+",\n") s = append(s, "EncodeTime: "+fmt.Sprintf("%#v", this.EncodeTime)+",\n") s = append(s, "SamplesProcessed: "+fmt.Sprintf("%#v", this.SamplesProcessed)+",\n") + s = append(s, "SpunOffSubqueries: "+fmt.Sprintf("%#v", this.SpunOffSubqueries)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -302,6 +316,11 @@ func (m *Stats) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.SpunOffSubqueries != 0 { + i = encodeVarintStats(dAtA, i, uint64(m.SpunOffSubqueries)) + i-- + dAtA[i] = 0x60 + } if m.SamplesProcessed != 0 { i = encodeVarintStats(dAtA, i, uint64(m.SamplesProcessed)) i-- @@ -416,6 +435,9 @@ func (m *Stats) Size() (n int) { if m.SamplesProcessed != 0 { n += 1 + sovStats(uint64(m.SamplesProcessed)) } + if m.SpunOffSubqueries != 0 { + n += 1 + sovStats(uint64(m.SpunOffSubqueries)) + } return n } @@ -441,6 +463,7 @@ func (this *Stats) String() string { `QueueTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.QueueTime), "Duration", "durationpb.Duration", 1), `&`, ``, 1) + `,`, `EncodeTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.EncodeTime), "Duration", "durationpb.Duration", 1), `&`, ``, 1) + `,`, `SamplesProcessed:` + fmt.Sprintf("%v", this.SamplesProcessed) + `,`, + `SpunOffSubqueries:` + fmt.Sprintf("%v", this.SpunOffSubqueries) + `,`, `}`, }, "") return s @@ -733,6 +756,25 @@ func (m *Stats) Unmarshal(dAtA []byte) error { break } } + case 12: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SpunOffSubqueries", wireType) + } + m.SpunOffSubqueries = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.SpunOffSubqueries |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipStats(dAtA[iNdEx:]) diff --git a/pkg/querier/stats/stats.proto b/pkg/querier/stats/stats.proto index 35cb2c9303a..bda3956f04d 100644 --- a/pkg/querier/stats/stats.proto +++ b/pkg/querier/stats/stats.proto @@ -38,4 +38,6 @@ message Stats { google.protobuf.Duration encode_time = 10 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false]; // TotalSamples represents the total number of samples scanned while evaluating a query. uint64 samples_processed = 11; + // The number of subqueries that were spun off as actual range queries in order to execute the full query + uint32 spun_off_subqueries = 12; }