Skip to content

Commit

Permalink
Make tests faster, put the SpinOffQueryHandler into its own test
Browse files Browse the repository at this point in the history
  • Loading branch information
julienduchesne committed Jan 29, 2025
1 parent 8d0e0ba commit afa71ae
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 39 deletions.
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/spin_off_subqueries.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (s *spinOffQueryHandler) Do(ctx context.Context, req MetricsQueryRequest) (
defer resp.Body.Close()
decoded, err := s.codec.DecodeMetricsQueryResponse(ctx, resp, req, s.logger)
if err != nil {
return nil, fmt.Errorf("error decoding response: %v", err)
return nil, fmt.Errorf("error decoding response: %v. Status: %s", err, resp.Status)
}
promRes, ok := decoded.(*PrometheusResponse)
if !ok {
Expand Down
135 changes: 97 additions & 38 deletions pkg/frontend/querymiddleware/spin_off_subqueries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
)

func TestSubquerySpinOff_Correctness(t *testing.T) {
t.Parallel()
var (
numSeries = 1000
numStaleSeries = 100
Expand Down Expand Up @@ -172,42 +173,9 @@ func TestSubquerySpinOff_Correctness(t *testing.T) {
queryable: queryable,
}

codec := newTestPrometheusCodec()
// Create a local server that handles queries.
httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/prometheus/api/v1/query_range" {
http.Error(w, "invalid path", http.StatusNotFound)
return
}

metricsReq, err := codec.DecodeMetricsQueryRequest(r.Context(), r)
if err != nil {
http.Error(w, errors.Wrap(err, "failed to decode request").Error(), http.StatusBadRequest)
return
}

resp, err := downstream.Do(r.Context(), metricsReq)
if err != nil {
http.Error(w, errors.Wrap(err, "failed to execute request").Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
httpResp, err := codec.EncodeMetricsQueryResponse(r.Context(), r, resp)
if err != nil {
http.Error(w, errors.Wrap(err, "failed to encode response").Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", httpResp.Header.Get("Content-Type"))
w.Header().Set("Content-Length", httpResp.Header.Get("Content-Length"))
io.Copy(w, httpResp.Body)
httpResp.Body.Close()
}))
t.Cleanup(httpServer.Close)

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
t.Parallel()
for _, queryOffset := range []time.Duration{-10 * time.Minute, -33 * time.Second, 0, 33 * time.Second, 10 * time.Minute, 1 * time.Hour} {
t.Run(fmt.Sprintf("queryOffset=%s", queryOffset), func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -238,17 +206,14 @@ func TestSubquerySpinOff_Correctness(t *testing.T) {
removeAllAnnotationPositionInformation(expectedPrometheusRes.Warnings)
}

spinOffQueryHandler, err := newSpinOffQueryHandler(codec, log.NewLogfmtLogger(os.Stdout), httpServer.URL+"/prometheus/api/v1/query_range")
require.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
spinoffMiddleware := newSpinOffSubqueriesMiddleware(
mockLimits{
instantQueriesWithSubquerySpinOff: []string{".*"},
},
log.NewNopLogger(),
engine,
spinOffQueryHandler,
downstream,
reg,
defaultStepFunc,
)
Expand Down Expand Up @@ -319,6 +284,100 @@ func TestSubquerySpinOff_ShouldReturnErrorOnDownstreamHandlerFailure(t *testing.
assert.Equal(t, downstreamErr, err)
}

func TestSpinOffQueryHandler(t *testing.T) {
now := time.Now()

numSeries := 5
series := make([]storage.Series, 0, numSeries)
// Add counter series.
for i := 0; i < numSeries; i++ {
gen := factor(float64(i) * 0.1)
series = append(series, newSeries(newTestCounterLabels(len(series)), now.Add(-5*time.Minute), now, 30*time.Second, gen))
}
// Create a queryable on the fixtures.
queryable := storageSeriesQueryable(series)

engine := newEngine()
downstream := &downstreamHandler{
engine: engine,
queryable: queryable,
}

gotRequestCt := 0
codec := newTestPrometheusCodec()
// Create a local server that handles queries.
httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/prometheus/api/v1/query_range" {
http.Error(w, "invalid path", http.StatusNotFound)
return
}

if org := r.Header.Get(user.OrgIDHeaderName); org != "test" {
http.Error(w, "invalid org", http.StatusUnauthorized)
return
}

metricsReq, err := codec.DecodeMetricsQueryRequest(r.Context(), r)
if err != nil {
http.Error(w, errors.Wrap(err, "failed to decode request").Error(), http.StatusBadRequest)
return
}

resp, err := downstream.Do(r.Context(), metricsReq)
if err != nil {
http.Error(w, errors.Wrap(err, "failed to execute request").Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
httpResp, err := codec.EncodeMetricsQueryResponse(r.Context(), r, resp)
if err != nil {
http.Error(w, errors.Wrap(err, "failed to encode response").Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", httpResp.Header.Get("Content-Type"))
w.Header().Set("Content-Length", httpResp.Header.Get("Content-Length"))
io.Copy(w, httpResp.Body)
httpResp.Body.Close()
gotRequestCt++
}))
t.Cleanup(httpServer.Close)

spinOffQueryHandler, err := newSpinOffQueryHandler(codec, log.NewLogfmtLogger(os.Stdout), httpServer.URL+"/prometheus/api/v1/query_range")
require.NoError(t, err)

// Ensure we have had no requests yet.
require.Equal(t, 0, gotRequestCt)

req := &PrometheusRangeQueryRequest{
path: "/query_range",
start: util.TimeToMillis(now.Add(-5 * time.Minute)),
end: util.TimeToMillis(now),
step: 30 * time.Second.Milliseconds(),
queryExpr: parseQuery(t, "max_over_time(rate(metric_counter[1m])[5m:1m])"),
}
ctx := user.InjectOrgID(context.Background(), "test")
resp, err := spinOffQueryHandler.Do(ctx, req)
require.NoError(t, err)

// Ensure we got the expected number of requests.
require.Equal(t, 1, gotRequestCt)

// Ensure the query produces some results.
require.NotEmpty(t, resp.(*PrometheusResponse).Data.Result)
requireValidSamples(t, resp.(*PrometheusResponse).Data.Result)

// Ensure the result is the same as the one produced by the downstream handler.
expectedRes, err := downstream.Do(context.Background(), req)
require.Nil(t, err)

expectedPrometheusRes := expectedRes.(*PrometheusResponse)
sort.Sort(byLabels(expectedPrometheusRes.Data.Result))
sort.Sort(byLabels(resp.(*PrometheusResponse).Data.Result))
approximatelyEquals(t, expectedPrometheusRes, resp.(*PrometheusResponse))
}

var defaultStepFunc = func(int64) int64 {
return (1 * time.Minute).Milliseconds()
}

0 comments on commit afa71ae

Please sign in to comment.