Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query-tee): improvements to skip samples outside comparison window #15794

Merged
merged 9 commits into from
Jan 22, 2025
2 changes: 2 additions & 0 deletions cmd/querytee/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"os"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/log"
Expand Down Expand Up @@ -62,6 +63,7 @@ func lokiReadRoutes(cfg Config) []querytee.Route {
Tolerance: cfg.ProxyConfig.ValueComparisonTolerance,
UseRelativeError: cfg.ProxyConfig.UseRelativeError,
SkipRecentSamples: cfg.ProxyConfig.SkipRecentSamples,
SkipSamplesBefore: time.Time(cfg.ProxyConfig.SkipSamplesBefore),
})

return []querytee.Route{
Expand Down
7 changes: 5 additions & 2 deletions tools/querytee/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)
Expand All @@ -34,6 +35,7 @@ type ProxyConfig struct {
UseRelativeError bool
PassThroughNonRegisteredRoutes bool
SkipRecentSamples time.Duration
SkipSamplesBefore flagext.Time
RequestURLFilter *regexp.Regexp
InstrumentCompares bool
}
Expand All @@ -48,10 +50,11 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
f.Float64Var(&cfg.ValueComparisonTolerance, "proxy.value-comparison-tolerance", 0.000001, "The tolerance to apply when comparing floating point values in the responses. 0 to disable tolerance and require exact match (not recommended).")
f.BoolVar(&cfg.UseRelativeError, "proxy.compare-use-relative-error", false, "Use relative error tolerance when comparing floating point values.")
f.DurationVar(&cfg.SkipRecentSamples, "proxy.compare-skip-recent-samples", 60*time.Second, "The window from now to skip comparing samples. 0 to disable.")
f.Var(&cfg.SkipSamplesBefore, "proxy.compare-skip-samples-before", "Skip the samples before the given time for comparison. The time can be in RFC3339 format (or) RFC3339 without the timezone and seconds (or) date only.")
f.BoolVar(&cfg.PassThroughNonRegisteredRoutes, "proxy.passthrough-non-registered-routes", false, "Passthrough requests for non-registered routes to preferred backend.")
f.Func("backend.filter", "A request filter as a regular expression. Only matches are proxied to non-preferred backends.", func(raw string) error{
f.Func("backend.filter", "A request filter as a regular expression. Only matches are proxied to non-preferred backends.", func(raw string) error {
var err error
cfg.RequestURLFilter, err = regexp.Compile(raw)
cfg.RequestURLFilter, err = regexp.Compile(raw)
return err
})
f.BoolVar(&cfg.InstrumentCompares, "proxy.compare-instrument", false, "Reports metrics on comparisons of responses between preferred and non-preferred endpoints for supported routes.")
Expand Down
21 changes: 16 additions & 5 deletions tools/querytee/proxy_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (
)

type ResponsesComparator interface {
Compare(expected, actual []byte) (*ComparisonSummary, error)
Compare(expected, actual []byte, queryEvaluationTime time.Time) (*ComparisonSummary, error)
}

type ComparisonSummary struct {
skipped bool
missingMetrics int
}

Expand Down Expand Up @@ -175,13 +176,15 @@ func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *back
actualResponse := responses[i]

result := comparisonSuccess
summary, err := p.compareResponses(expectedResponse, actualResponse)
summary, err := p.compareResponses(expectedResponse, actualResponse, time.Now().UTC())
if err != nil {
level.Error(p.logger).Log("msg", "response comparison failed",
"backend-name", p.backends[i].name,
"route-name", p.routeName,
"query", r.URL.RawQuery, "err", err)
result = comparisonFailed
} else if summary != nil && summary.skipped {
result = comparisonSkipped
}

if p.instrumentCompares && summary != nil {
Expand Down Expand Up @@ -227,10 +230,18 @@ func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *backendResp
return responses[0]
}

func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backendResponse) (*ComparisonSummary, error) {
func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backendResponse, queryEvalTime time.Time) (*ComparisonSummary, error) {
if expectedResponse.err != nil {
return &ComparisonSummary{skipped: true}, nil
}

if actualResponse.err != nil {
return nil, fmt.Errorf("skipped comparison of response because the request to the secondary backend failed: %w", actualResponse.err)
}

// compare response body only if we get a 200
if expectedResponse.status != 200 {
return nil, fmt.Errorf("skipped comparison of response because we got status code %d from preferred backend's response", expectedResponse.status)
return &ComparisonSummary{skipped: true}, nil
}

if actualResponse.status != 200 {
Expand All @@ -241,7 +252,7 @@ func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backe
return nil, fmt.Errorf("expected status code %d but got %d", expectedResponse.status, actualResponse.status)
}

return p.comparator.Compare(expectedResponse.body, actualResponse.body)
return p.comparator.Compare(expectedResponse.body, actualResponse.body, queryEvalTime)
}

type backendResponse struct {
Expand Down
2 changes: 1 addition & 1 deletion tools/querytee/proxy_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,6 @@ func Test_backendResponse_statusCode(t *testing.T) {

type mockComparator struct{}

func (c *mockComparator) Compare(_, _ []byte) (*ComparisonSummary, error) {
func (c *mockComparator) Compare(_, _ []byte, _ time.Time) (*ComparisonSummary, error) {
return &ComparisonSummary{missingMetrics: 12}, nil
}
1 change: 1 addition & 0 deletions tools/querytee/proxy_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
const (
comparisonSuccess = "success"
comparisonFailed = "fail"
comparisonSkipped = "skipped"

unknownIssuer = "unknown"
canaryIssuer = "loki-canary"
Expand Down
2 changes: 1 addition & 1 deletion tools/querytee/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var testWriteRoutes = []Route{}

type testComparator struct{}

func (testComparator) Compare(_, _ []byte) (*ComparisonSummary, error) { return nil, nil }
func (testComparator) Compare(_, _ []byte, _ time.Time) (*ComparisonSummary, error) { return nil, nil }

func Test_NewProxy(t *testing.T) {
cfg := ProxyConfig{}
Expand Down
Loading
Loading