Skip to content

Commit

Permalink
feat(query-tee): improvements to skip samples outside comparison wind…
Browse files Browse the repository at this point in the history
…ow (#15794)
  • Loading branch information
ashwanthgoli authored Jan 22, 2025
1 parent 2a2b528 commit 1e8d8a9
Show file tree
Hide file tree
Showing 8 changed files with 380 additions and 48 deletions.
2 changes: 2 additions & 0 deletions cmd/querytee/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"os"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/log"
Expand Down Expand Up @@ -62,6 +63,7 @@ func lokiReadRoutes(cfg Config) []querytee.Route {
Tolerance: cfg.ProxyConfig.ValueComparisonTolerance,
UseRelativeError: cfg.ProxyConfig.UseRelativeError,
SkipRecentSamples: cfg.ProxyConfig.SkipRecentSamples,
SkipSamplesBefore: time.Time(cfg.ProxyConfig.SkipSamplesBefore),
})

return []querytee.Route{
Expand Down
7 changes: 5 additions & 2 deletions tools/querytee/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)
Expand All @@ -34,6 +35,7 @@ type ProxyConfig struct {
UseRelativeError bool
PassThroughNonRegisteredRoutes bool
SkipRecentSamples time.Duration
SkipSamplesBefore flagext.Time
RequestURLFilter *regexp.Regexp
InstrumentCompares bool
}
Expand All @@ -48,10 +50,11 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
f.Float64Var(&cfg.ValueComparisonTolerance, "proxy.value-comparison-tolerance", 0.000001, "The tolerance to apply when comparing floating point values in the responses. 0 to disable tolerance and require exact match (not recommended).")
f.BoolVar(&cfg.UseRelativeError, "proxy.compare-use-relative-error", false, "Use relative error tolerance when comparing floating point values.")
f.DurationVar(&cfg.SkipRecentSamples, "proxy.compare-skip-recent-samples", 60*time.Second, "The window from now to skip comparing samples. 0 to disable.")
f.Var(&cfg.SkipSamplesBefore, "proxy.compare-skip-samples-before", "Skip the samples before the given time for comparison. The time can be in RFC3339 format (or) RFC3339 without the timezone and seconds (or) date only.")
f.BoolVar(&cfg.PassThroughNonRegisteredRoutes, "proxy.passthrough-non-registered-routes", false, "Passthrough requests for non-registered routes to preferred backend.")
f.Func("backend.filter", "A request filter as a regular expression. Only matches are proxied to non-preferred backends.", func(raw string) error{
f.Func("backend.filter", "A request filter as a regular expression. Only matches are proxied to non-preferred backends.", func(raw string) error {
var err error
cfg.RequestURLFilter, err = regexp.Compile(raw)
cfg.RequestURLFilter, err = regexp.Compile(raw)
return err
})
f.BoolVar(&cfg.InstrumentCompares, "proxy.compare-instrument", false, "Reports metrics on comparisons of responses between preferred and non-preferred endpoints for supported routes.")
Expand Down
21 changes: 16 additions & 5 deletions tools/querytee/proxy_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (
)

type ResponsesComparator interface {
Compare(expected, actual []byte) (*ComparisonSummary, error)
Compare(expected, actual []byte, queryEvaluationTime time.Time) (*ComparisonSummary, error)
}

type ComparisonSummary struct {
skipped bool
missingMetrics int
}

Expand Down Expand Up @@ -175,13 +176,15 @@ func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *back
actualResponse := responses[i]

result := comparisonSuccess
summary, err := p.compareResponses(expectedResponse, actualResponse)
summary, err := p.compareResponses(expectedResponse, actualResponse, time.Now().UTC())
if err != nil {
level.Error(p.logger).Log("msg", "response comparison failed",
"backend-name", p.backends[i].name,
"route-name", p.routeName,
"query", r.URL.RawQuery, "err", err)
result = comparisonFailed
} else if summary != nil && summary.skipped {
result = comparisonSkipped
}

if p.instrumentCompares && summary != nil {
Expand Down Expand Up @@ -227,10 +230,18 @@ func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *backendResp
return responses[0]
}

func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backendResponse) (*ComparisonSummary, error) {
func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backendResponse, queryEvalTime time.Time) (*ComparisonSummary, error) {
if expectedResponse.err != nil {
return &ComparisonSummary{skipped: true}, nil
}

if actualResponse.err != nil {
return nil, fmt.Errorf("skipped comparison of response because the request to the secondary backend failed: %w", actualResponse.err)
}

// compare response body only if we get a 200
if expectedResponse.status != 200 {
return nil, fmt.Errorf("skipped comparison of response because we got status code %d from preferred backend's response", expectedResponse.status)
return &ComparisonSummary{skipped: true}, nil
}

if actualResponse.status != 200 {
Expand All @@ -241,7 +252,7 @@ func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backe
return nil, fmt.Errorf("expected status code %d but got %d", expectedResponse.status, actualResponse.status)
}

return p.comparator.Compare(expectedResponse.body, actualResponse.body)
return p.comparator.Compare(expectedResponse.body, actualResponse.body, queryEvalTime)
}

type backendResponse struct {
Expand Down
2 changes: 1 addition & 1 deletion tools/querytee/proxy_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,6 @@ func Test_backendResponse_statusCode(t *testing.T) {

type mockComparator struct{}

func (c *mockComparator) Compare(_, _ []byte) (*ComparisonSummary, error) {
func (c *mockComparator) Compare(_, _ []byte, _ time.Time) (*ComparisonSummary, error) {
return &ComparisonSummary{missingMetrics: 12}, nil
}
1 change: 1 addition & 0 deletions tools/querytee/proxy_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
const (
comparisonSuccess = "success"
comparisonFailed = "fail"
comparisonSkipped = "skipped"

unknownIssuer = "unknown"
canaryIssuer = "loki-canary"
Expand Down
2 changes: 1 addition & 1 deletion tools/querytee/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var testWriteRoutes = []Route{}

type testComparator struct{}

func (testComparator) Compare(_, _ []byte) (*ComparisonSummary, error) { return nil, nil }
func (testComparator) Compare(_, _ []byte, _ time.Time) (*ComparisonSummary, error) { return nil, nil }

func Test_NewProxy(t *testing.T) {
cfg := ProxyConfig{}
Expand Down
Loading

0 comments on commit 1e8d8a9

Please sign in to comment.