Skip to content

Commit

Permalink
Add retries to the spun-off range queries
Browse files Browse the repository at this point in the history
Haven't seen any failures so far from my testing in Kubernetes, but it _will_ happen without some retries
This makes use of the regular retry middleware, so the configured retry settings will apply to these spun-off queries as well

Also, add some more tests
  • Loading branch information
julienduchesne committed Jan 29, 2025
1 parent c12a1b3 commit 48f27c8
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 17 deletions.
6 changes: 4 additions & 2 deletions pkg/frontend/querymiddleware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ func newQueryMiddlewares(
queryBlockerMiddleware := newQueryBlockerMiddleware(limits, log, registerer)
queryStatsMiddleware := newQueryStatsMiddleware(registerer, engine)
prom2CompatMiddleware := newProm2RangeCompatMiddleware(limits, log, registerer)
retryMiddlewareMetrics := newRetryMiddlewareMetrics(registerer)

remoteReadMiddleware = append(remoteReadMiddleware,
// Track query range statistics. Added first before any subsequent middleware modifies the request.
Expand All @@ -383,7 +384,9 @@ func newQueryMiddlewares(

if spinOffURL := cfg.SpinOffInstantSubqueriesToURL; spinOffURL != "" {
// Spin-off subqueries to a remote URL (or localhost)
spinOffQueryHandler, err := newSpinOffQueryHandler(codec, log, spinOffURL)
// Add the retry middleware to the spin-off query handler.
// Spun-off queries are terminated in that handler (they don't call "next" so the retry middleware has to be added here).
spinOffQueryHandler, err := newSpinOffQueryHandler(codec, log, spinOffURL, cfg.MaxRetries, retryMiddlewareMetrics)
if err != nil {
level.Error(log).Log("msg", "failed to create spin-off query handler", "error", err)
} else {
Expand Down Expand Up @@ -494,7 +497,6 @@ func newQueryMiddlewares(
}

if cfg.MaxRetries > 0 {
retryMiddlewareMetrics := newRetryMiddlewareMetrics(registerer)
queryRangeMiddleware = append(queryRangeMiddleware, newInstrumentMiddleware("retry", metrics), newRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics))
queryInstantMiddleware = append(queryInstantMiddleware, newInstrumentMiddleware("retry", metrics), newRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics))
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/frontend/querymiddleware/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ func TestMiddlewaresConsistency(t *testing.T) {
cfg.ShardedQueries = true
cfg.PrunedQueries = true
cfg.BlockPromQLExperimentalFunctions = true
cfg.SpinOffInstantSubqueriesToURL = "http://localhost"

// Ensure all features are enabled, so that we assert on all middlewares.
require.NotZero(t, cfg.CacheResults)
Expand Down Expand Up @@ -627,8 +628,11 @@ func TestMiddlewaresConsistency(t *testing.T) {
exceptions: []string{"splitAndCacheMiddleware", "stepAlignMiddleware"},
},
"range query": {
instances: queryRangeMiddlewares,
exceptions: []string{"splitInstantQueryByIntervalMiddleware"},
instances: queryRangeMiddlewares,
exceptions: []string{
"splitInstantQueryByIntervalMiddleware",
"spinOffSubqueriesMiddleware", // This middleware is only for instant queries.
},
},
"remote read": {
instances: remoteReadMiddlewares,
Expand All @@ -642,6 +646,7 @@ func TestMiddlewaresConsistency(t *testing.T) {
"pruneMiddleware", // No query pruning support.
"experimentalFunctionsMiddleware", // No blocking for PromQL experimental functions as it is executed remotely.
"prom2RangeCompatHandler", // No rewriting Prometheus 2 subqueries to Prometheus 3
"spinOffSubqueriesMiddleware", // This middleware is only for instant queries.
},
},
}
Expand Down
25 changes: 15 additions & 10 deletions pkg/frontend/querymiddleware/spin_off_subqueries.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,24 +255,29 @@ func (s *spinOffSubqueriesMiddleware) Do(ctx context.Context, req MetricsQueryRe
}, nil
}

func newSpinOffQueryHandler(codec Codec, logger log.Logger, sendURL string) (MetricsQueryHandler, error) {
// spinOffQueryHandler is a query handler that takes a request and sends it to a remote endpoint.
type spinOffQueryHandler struct {
codec Codec
logger log.Logger
rangeQueryURL *url.URL
}

func newSpinOffQueryHandler(codec Codec, logger log.Logger, sendURL string, maxRetries int, retryMiddlewareMetrics prometheus.Observer) (MetricsQueryHandler, error) {
rangeQueryURL, err := url.Parse(sendURL)
if err != nil {
return nil, fmt.Errorf("invalid spin-off URL: %w", err)
}

return &spinOffQueryHandler{
var handler MetricsQueryHandler = &spinOffQueryHandler{
codec: codec,
logger: logger,
rangeQueryURL: rangeQueryURL,
}, nil
}
}

// spinOffQueryHandler is a query handler that takes a request and sends it to a remote endpoint.
type spinOffQueryHandler struct {
codec Codec
logger log.Logger
rangeQueryURL *url.URL
if maxRetries > 0 {
handler = newRetryMiddleware(logger, maxRetries, retryMiddlewareMetrics).Wrap(handler)
}

return handler, nil
}

func (s *spinOffQueryHandler) Do(ctx context.Context, req MetricsQueryRequest) (Response, error) {
Expand Down
13 changes: 10 additions & 3 deletions pkg/frontend/querymiddleware/spin_off_subqueries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,13 @@ func TestSpinOffQueryHandler(t *testing.T) {
codec := newTestPrometheusCodec()
// Create a local server that handles queries.
httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotRequestCt++
if gotRequestCt == 1 {
// Test a failure case where the first request fails.
http.Error(w, "unexpected request", http.StatusInternalServerError)
return
}

if r.URL.Path != "/prometheus/api/v1/query_range" {
http.Error(w, "invalid path", http.StatusNotFound)
return
Expand Down Expand Up @@ -360,11 +367,11 @@ func TestSpinOffQueryHandler(t *testing.T) {
w.Header().Set("Content-Length", httpResp.Header.Get("Content-Length"))
io.Copy(w, httpResp.Body)
httpResp.Body.Close()
gotRequestCt++
}))
t.Cleanup(httpServer.Close)

spinOffQueryHandler, err := newSpinOffQueryHandler(codec, log.NewLogfmtLogger(os.Stdout), httpServer.URL+"/prometheus/api/v1/query_range")
spinOffQueryHandler, err := newSpinOffQueryHandler(
codec, log.NewLogfmtLogger(os.Stdout), httpServer.URL+"/prometheus/api/v1/query_range", 3, &mockRetryMetrics{})
require.NoError(t, err)

// Ensure we have had no requests yet.
Expand All @@ -382,7 +389,7 @@ func TestSpinOffQueryHandler(t *testing.T) {
require.NoError(t, err)

// Ensure we got the expected number of requests.
require.Equal(t, 1, gotRequestCt)
require.Equal(t, 2, gotRequestCt)

// Ensure the query produces some results.
require.NotEmpty(t, resp.(*PrometheusResponse).Data.Result)
Expand Down

0 comments on commit 48f27c8

Please sign in to comment.