diff --git a/pkg/frontend/querymiddleware/spin_off_subqueries.go b/pkg/frontend/querymiddleware/spin_off_subqueries.go index 5cf8adbdb6..e332689457 100644 --- a/pkg/frontend/querymiddleware/spin_off_subqueries.go +++ b/pkg/frontend/querymiddleware/spin_off_subqueries.go @@ -298,7 +298,7 @@ func (s *spinOffQueryHandler) Do(ctx context.Context, req MetricsQueryRequest) ( defer resp.Body.Close() decoded, err := s.codec.DecodeMetricsQueryResponse(ctx, resp, req, s.logger) if err != nil { - return nil, fmt.Errorf("error decoding response: %v", err) + return nil, fmt.Errorf("error decoding response: %v. Status: %s", err, resp.Status) } promRes, ok := decoded.(*PrometheusResponse) if !ok { diff --git a/pkg/frontend/querymiddleware/spin_off_subqueries_test.go b/pkg/frontend/querymiddleware/spin_off_subqueries_test.go index 78643ba814..9c921dfbdd 100644 --- a/pkg/frontend/querymiddleware/spin_off_subqueries_test.go +++ b/pkg/frontend/querymiddleware/spin_off_subqueries_test.go @@ -27,6 +27,7 @@ import ( ) func TestSubquerySpinOff_Correctness(t *testing.T) { + t.Parallel() var ( numSeries = 1000 numStaleSeries = 100 @@ -172,42 +173,9 @@ func TestSubquerySpinOff_Correctness(t *testing.T) { queryable: queryable, } - codec := newTestPrometheusCodec() - // Create a local server that handles queries. - httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/prometheus/api/v1/query_range" { - http.Error(w, "invalid path", http.StatusNotFound) - return - } - - metricsReq, err := codec.DecodeMetricsQueryRequest(r.Context(), r) - if err != nil { - http.Error(w, errors.Wrap(err, "failed to decode request").Error(), http.StatusBadRequest) - return - } - - resp, err := downstream.Do(r.Context(), metricsReq) - if err != nil { - http.Error(w, errors.Wrap(err, "failed to execute request").Error(), http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/json") - httpResp, err := codec.EncodeMetricsQueryResponse(r.Context(), r, resp) - if err != nil { - http.Error(w, errors.Wrap(err, "failed to encode response").Error(), http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", httpResp.Header.Get("Content-Type")) - w.Header().Set("Content-Length", httpResp.Header.Get("Content-Length")) - io.Copy(w, httpResp.Body) - httpResp.Body.Close() - })) - t.Cleanup(httpServer.Close) - for testName, testData := range tests { t.Run(testName, func(t *testing.T) { + t.Parallel() for _, queryOffset := range []time.Duration{-10 * time.Minute, -33 * time.Second, 0, 33 * time.Second, 10 * time.Minute, 1 * time.Hour} { t.Run(fmt.Sprintf("queryOffset=%s", queryOffset), func(t *testing.T) { t.Parallel() @@ -238,9 +206,6 @@ func TestSubquerySpinOff_Correctness(t *testing.T) { removeAllAnnotationPositionInformation(expectedPrometheusRes.Warnings) } - spinOffQueryHandler, err := newSpinOffQueryHandler(codec, log.NewLogfmtLogger(os.Stdout), httpServer.URL+"/prometheus/api/v1/query_range") - require.NoError(t, err) - reg := prometheus.NewPedanticRegistry() spinoffMiddleware := newSpinOffSubqueriesMiddleware( mockLimits{ @@ -248,7 +213,7 @@ func TestSubquerySpinOff_Correctness(t *testing.T) { }, log.NewNopLogger(), engine, - spinOffQueryHandler, + downstream, reg, defaultStepFunc, ) @@ -319,6 +284,100 @@ func TestSubquerySpinOff_ShouldReturnErrorOnDownstreamHandlerFailure(t *testing. assert.Equal(t, downstreamErr, err) } +func TestSpinOffQueryHandler(t *testing.T) { + now := time.Now() + + numSeries := 5 + series := make([]storage.Series, 0, numSeries) + // Add counter series. + for i := 0; i < numSeries; i++ { + gen := factor(float64(i) * 0.1) + series = append(series, newSeries(newTestCounterLabels(len(series)), now.Add(-5*time.Minute), now, 30*time.Second, gen)) + } + // Create a queryable on the fixtures. + queryable := storageSeriesQueryable(series) + + engine := newEngine() + downstream := &downstreamHandler{ + engine: engine, + queryable: queryable, + } + + gotRequestCt := 0 + codec := newTestPrometheusCodec() + // Create a local server that handles queries. + httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/prometheus/api/v1/query_range" { + http.Error(w, "invalid path", http.StatusNotFound) + return + } + + if org := r.Header.Get(user.OrgIDHeaderName); org != "test" { + http.Error(w, "invalid org", http.StatusUnauthorized) + return + } + + metricsReq, err := codec.DecodeMetricsQueryRequest(r.Context(), r) + if err != nil { + http.Error(w, errors.Wrap(err, "failed to decode request").Error(), http.StatusBadRequest) + return + } + + resp, err := downstream.Do(r.Context(), metricsReq) + if err != nil { + http.Error(w, errors.Wrap(err, "failed to execute request").Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + httpResp, err := codec.EncodeMetricsQueryResponse(r.Context(), r, resp) + if err != nil { + http.Error(w, errors.Wrap(err, "failed to encode response").Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", httpResp.Header.Get("Content-Type")) + w.Header().Set("Content-Length", httpResp.Header.Get("Content-Length")) + io.Copy(w, httpResp.Body) + httpResp.Body.Close() + gotRequestCt++ + })) + t.Cleanup(httpServer.Close) + + spinOffQueryHandler, err := newSpinOffQueryHandler(codec, log.NewLogfmtLogger(os.Stdout), httpServer.URL+"/prometheus/api/v1/query_range") + require.NoError(t, err) + + // Ensure we have had no requests yet. + require.Equal(t, 0, gotRequestCt) + + req := &PrometheusRangeQueryRequest{ + path: "/query_range", + start: util.TimeToMillis(now.Add(-5 * time.Minute)), + end: util.TimeToMillis(now), + step: 30 * time.Second.Milliseconds(), + queryExpr: parseQuery(t, "max_over_time(rate(metric_counter[1m])[5m:1m])"), + } + ctx := user.InjectOrgID(context.Background(), "test") + resp, err := spinOffQueryHandler.Do(ctx, req) + require.NoError(t, err) + + // Ensure we got the expected number of requests. + require.Equal(t, 1, gotRequestCt) + + // Ensure the query produces some results. + require.NotEmpty(t, resp.(*PrometheusResponse).Data.Result) + requireValidSamples(t, resp.(*PrometheusResponse).Data.Result) + + // Ensure the result is the same as the one produced by the downstream handler. + expectedRes, err := downstream.Do(context.Background(), req) + require.Nil(t, err) + + expectedPrometheusRes := expectedRes.(*PrometheusResponse) + sort.Sort(byLabels(expectedPrometheusRes.Data.Result)) + sort.Sort(byLabels(resp.(*PrometheusResponse).Data.Result)) + approximatelyEquals(t, expectedPrometheusRes, resp.(*PrometheusResponse)) +} + var defaultStepFunc = func(int64) int64 { return (1 * time.Minute).Milliseconds() }