Skip to content

Commit

Permalink
Test full spin-off!
Browse files Browse the repository at this point in the history
  • Loading branch information
julienduchesne committed Jan 23, 2025
1 parent dee098e commit 1b3db5e
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 20 deletions.
23 changes: 15 additions & 8 deletions pkg/frontend/querymiddleware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/cache"
"github.com/grafana/dskit/tenant"
"github.com/grafana/regexp"
Expand Down Expand Up @@ -68,7 +69,7 @@ type Config struct {
TargetSeriesPerShard uint64 `yaml:"query_sharding_target_series_per_shard" category:"advanced"`
ShardActiveSeriesQueries bool `yaml:"shard_active_series_queries" category:"experimental"`
UseActiveSeriesDecoder bool `yaml:"use_active_series_decoder" category:"experimental"`
SpinOffInstantSubqueries bool `yaml:"spin_off_instant_subqueries" category:"experimental"`
SpinOffInstantSubqueriesToURL string `yaml:"spin_off_instant_subqueries_to_url" category:"experimental"`

// CacheKeyGenerator allows to inject a CacheKeyGenerator to use for generating cache keys.
// If nil, the querymiddleware package uses a DefaultCacheKeyGenerator with SplitQueriesByInterval.
Expand Down Expand Up @@ -101,7 +102,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.QueryResultResponseFormat, "query-frontend.query-result-response-format", formatProtobuf, fmt.Sprintf("Format to use when retrieving query results from queriers. Supported values: %s", strings.Join(allFormats, ", ")))
f.BoolVar(&cfg.ShardActiveSeriesQueries, "query-frontend.shard-active-series-queries", false, "True to enable sharding of active series queries.")
f.BoolVar(&cfg.UseActiveSeriesDecoder, "query-frontend.use-active-series-decoder", false, "Set to true to use the zero-allocation response decoder for active series queries.")
f.BoolVar(&cfg.SpinOffInstantSubqueries, "query-frontend.spin-off-instant-subqueries", false, "Set to true to enable spinning off subqueries as range queries in instant queries. This makes use of range query optimizations and may improve performance for expensive subqueries.")
f.StringVar(&cfg.SpinOffInstantSubqueriesToURL, "query-frontend.spin-off-instant-subqueries-to-url", "", "If set, subqueries in instant queries will be spun off as range queries and sent to the given URL. This makes use of range query optimizations and may improve performance for expensive subqueries.")
cfg.ResultsCacheConfig.RegisterFlags(f)
}

Expand Down Expand Up @@ -380,12 +381,18 @@ func newQueryMiddlewares(
newStepAlignMiddleware(limits, log, registerer),
)

if cfg.SpinOffInstantSubqueries {
queryInstantMiddleware = append(
queryInstantMiddleware,
newInstrumentMiddleware("spin_off_subqueries", metrics),
newSpinOffSubqueriesMiddleware(limits, log, engine, queryRangeMiddleware, registerer, defaultStepFunc),
)
if spinOffURL := cfg.SpinOffInstantSubqueriesToURL; spinOffURL != "" {
// Spin off subqueries to a remote URL (or localhost)
spinOffQueryHandler, err := newSpinOffQueryHandler(codec, log, spinOffURL)
if err != nil {
level.Error(log).Log("failed to create spin off query handler", "error", err)
} else {
queryInstantMiddleware = append(
queryInstantMiddleware,
newInstrumentMiddleware("spin_off_subqueries", metrics),
newSpinOffSubqueriesMiddleware(limits, log, engine, spinOffQueryHandler, registerer, defaultStepFunc),
)
}
}

if cfg.CacheResults && cfg.CacheErrors {
Expand Down
57 changes: 55 additions & 2 deletions pkg/frontend/querymiddleware/spin_off_subqueries.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package querymiddleware
import (
"context"
"errors"
"fmt"
"net/http"
"net/url"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -13,6 +16,7 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/dskit/user"
apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/frontend/querymiddleware/astmapper"
"github.com/grafana/mimir/pkg/querier/stats"
Expand Down Expand Up @@ -89,7 +93,7 @@ func newSpinOffSubqueriesMiddleware(
limits Limits,
logger log.Logger,
engine *promql.Engine,
rangeMiddlewares []MetricsQueryMiddleware,
rangeHandler MetricsQueryHandler,
registerer prometheus.Registerer,
defaultStepFunc func(int64) int64,
) MetricsQueryMiddleware {
Expand All @@ -101,7 +105,7 @@ func newSpinOffSubqueriesMiddleware(
limits: limits,
logger: logger,
engine: engine,
rangeHandler: MergeMetricsQueryMiddlewares(rangeMiddlewares...).Wrap(next),
rangeHandler: rangeHandler,
metrics: metrics,
defaultStepFunc: defaultStepFunc,
}
Expand Down Expand Up @@ -214,3 +218,52 @@ func (s *spinOffSubqueriesMiddleware) Do(ctx context.Context, req MetricsQueryRe
Infos: info,
}, nil
}

func newSpinOffQueryHandler(codec Codec, logger log.Logger, sendURL string) (MetricsQueryHandler, error) {
baseURL, err := url.Parse(sendURL)
if err != nil {
return nil, fmt.Errorf("invalid spin off URL: %w", err)
}

return &spinOffQueryHandler{
codec: codec,
logger: logger,
baseURL: baseURL,
}, nil
}

// spinOffQueryHandler is a query handler that takes a request and sends it to a remote endpoint.
type spinOffQueryHandler struct {
codec Codec
logger log.Logger
baseURL *url.URL
}

func (s *spinOffQueryHandler) Do(ctx context.Context, req MetricsQueryRequest) (Response, error) {
httpReq, err := s.codec.EncodeMetricsQueryRequest(ctx, req)
if err != nil {
return nil, fmt.Errorf("error encoding request: %w", err)
}
httpReq.RequestURI = "" // Reset RequestURI to force URL to be used in the request.
httpReq.URL = s.baseURL.ResolveReference(httpReq.URL)

if err := user.InjectOrgIDIntoHTTPRequest(ctx, httpReq); err != nil {
return nil, fmt.Errorf("error injecting org ID into request: %v", err)
}

client := http.DefaultClient
resp, err := client.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("error sending request: %v", err)
}
defer resp.Body.Close()
decoded, err := s.codec.DecodeMetricsQueryResponse(ctx, resp, req, s.logger)
if err != nil {
return nil, fmt.Errorf("error decoding response: %v", err)
}
promRes, ok := decoded.(*PrometheusResponse)
if !ok {
return nil, fmt.Errorf("expected PrometheusResponse, got %T", decoded)
}
return promRes, nil
}
63 changes: 53 additions & 10 deletions pkg/frontend/querymiddleware/spin_off_subqueries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ package querymiddleware
import (
"context"
"fmt"
"io"
"math"
"net/http"
"net/http/httptest"
"os"
"sort"
"strings"
"testing"
Expand All @@ -19,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/dskit/user"
"github.com/grafana/mimir/pkg/util"
)

Expand Down Expand Up @@ -578,6 +583,46 @@ func TestSubquerySpinOff_Correctness(t *testing.T) {
// Create a queryable on the fixtures.
queryable := storageSeriesQueryable(series)

engine := newEngine()
downstream := &downstreamHandler{
engine: engine,
queryable: queryable,
}

codec := newTestPrometheusCodec()
// Create a local server that handles queries.
httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/prometheus/api/v1/query_range" {
http.Error(w, "invalid path", http.StatusNotFound)
return
}

metricsReq, err := codec.DecodeMetricsQueryRequest(r.Context(), r)
if err != nil {
http.Error(w, errors.Wrap(err, "failed to decode request").Error(), http.StatusBadRequest)
return
}

resp, err := downstream.Do(r.Context(), metricsReq)
if err != nil {
http.Error(w, errors.Wrap(err, "failed to execute request").Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
httpResp, err := codec.EncodeMetricsQueryResponse(r.Context(), r, resp)
if err != nil {
http.Error(w, errors.Wrap(err, "failed to encode response").Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", httpResp.Header.Get("Content-Type"))
w.Header().Set("Content-Length", httpResp.Header.Get("Content-Length"))
io.Copy(w, httpResp.Body)
httpResp.Body.Close()
}))
t.Cleanup(httpServer.Close)

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
t.Parallel()
Expand All @@ -588,12 +633,6 @@ func TestSubquerySpinOff_Correctness(t *testing.T) {
queryExpr: parseQuery(t, testData.query),
}

engine := newEngine()
downstream := &downstreamHandler{
engine: engine,
queryable: queryable,
}

// Run the query without subquery spin-off.
expectedRes, err := downstream.Do(context.Background(), req)
require.Nil(t, err)
Expand All @@ -612,17 +651,21 @@ func TestSubquerySpinOff_Correctness(t *testing.T) {
removeAllAnnotationPositionInformation(expectedPrometheusRes.Warnings)
}

spinOffQueryHandler, err := newSpinOffQueryHandler(codec, log.NewLogfmtLogger(os.Stdout), httpServer.URL)
require.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
spinoffMiddleware := newSpinOffSubqueriesMiddleware(
mockLimits{},
log.NewNopLogger(),
engine,
[]MetricsQueryMiddleware{},
spinOffQueryHandler,
reg,
defaultStepFunc,
)

spinoffRes, err := spinoffMiddleware.Wrap(downstream).Do(context.Background(), req)
ctx := user.InjectOrgID(context.Background(), "test")
spinoffRes, err := spinoffMiddleware.Wrap(downstream).Do(ctx, req)
require.Nil(t, err)

// Ensure the two results matches (float precision can slightly differ, there's no guarantee in PromQL engine too
Expand Down Expand Up @@ -671,12 +714,12 @@ func TestSubquerySpinOff_ShouldReturnErrorOnDownstreamHandlerFailure(t *testing.
queryExpr: parseQuery(t, "vector(1)"),
}

spinoffMiddleware := newSpinOffSubqueriesMiddleware(mockLimits{}, log.NewNopLogger(), newEngine(), []MetricsQueryMiddleware{}, nil, defaultStepFunc)

// Mock the downstream handler to always return error.
downstreamErr := errors.Errorf("some err")
downstream := mockHandlerWith(nil, downstreamErr)

spinoffMiddleware := newSpinOffSubqueriesMiddleware(mockLimits{}, log.NewNopLogger(), newEngine(), downstream, nil, defaultStepFunc)

// Run the query with subquery spin-off middleware wrapping the downstream one.
// We expect to get the downstream error.
_, err := spinoffMiddleware.Wrap(downstream).Do(context.Background(), req)
Expand Down

0 comments on commit 1b3db5e

Please sign in to comment.