From b4b0e1861920b06f18b67dad10f68967f2ad08c9 Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Thu, 16 Jan 2025 19:09:55 +0530 Subject: [PATCH 1/8] query-tee: improvements to skip recent samples --- tools/querytee/proxy_endpoint.go | 13 ++++- tools/querytee/proxy_metrics.go | 1 + tools/querytee/response_comparator.go | 72 ++++++++++++++++++++++++++- 3 files changed, 84 insertions(+), 2 deletions(-) diff --git a/tools/querytee/proxy_endpoint.go b/tools/querytee/proxy_endpoint.go index 7a6779b6ad880..54ebbb932cd39 100644 --- a/tools/querytee/proxy_endpoint.go +++ b/tools/querytee/proxy_endpoint.go @@ -19,6 +19,7 @@ type ResponsesComparator interface { } type ComparisonSummary struct { + skipped bool missingMetrics int } @@ -182,6 +183,8 @@ func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *back "route-name", p.routeName, "query", r.URL.RawQuery, "err", err) result = comparisonFailed + } else if summary != nil && summary.skipped { + result = comparisonSkipped } if p.instrumentCompares && summary != nil { @@ -228,9 +231,17 @@ func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *backendResp } func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backendResponse) (*ComparisonSummary, error) { + if expectedResponse.err != nil { + return &ComparisonSummary{skipped: true}, nil + } + + if actualResponse.err != nil { + return nil, fmt.Errorf("skipped comparison of response because the request to the secondary backend failed: %w", actualResponse.err) + } + // compare response body only if we get a 200 if expectedResponse.status != 200 { - return nil, fmt.Errorf("skipped comparison of response because we got status code %d from preferred backend's response", expectedResponse.status) + return &ComparisonSummary{skipped: true}, nil } if actualResponse.status != 200 { diff --git a/tools/querytee/proxy_metrics.go b/tools/querytee/proxy_metrics.go index eb2284517d47c..9637438be486d 100644 --- a/tools/querytee/proxy_metrics.go +++ b/tools/querytee/proxy_metrics.go @@ -8,6 +8,7 @@ import ( const ( comparisonSuccess = "success" comparisonFailed = "fail" + comparisonSkipped = "skipped" unknownIssuer = "unknown" canaryIssuer = "loki-canary" diff --git a/tools/querytee/response_comparator.go b/tools/querytee/response_comparator.go index 04a28fff85c1d..499091882e704 100644 --- a/tools/querytee/response_comparator.go +++ b/tools/querytee/response_comparator.go @@ -96,6 +96,10 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparison return nil, errors.Wrap(err, "unable to unmarshal actual matrix") } + if allMatrixSamplesWithinRecentSampleWindow(expected, opts) && allMatrixSamplesWithinRecentSampleWindow(actual, opts) { + return &ComparisonSummary{skipped: true}, nil + } + if len(expected) != len(actual) { return nil, fmt.Errorf("expected %d metrics but got %d", len(expected), len(actual)) @@ -139,6 +143,28 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparison return nil, nil } +func allMatrixSamplesWithinRecentSampleWindow(m model.Matrix, opts SampleComparisonOptions) bool { + if opts.SkipRecentSamples == 0 { + return false + } + + for _, series := range m { + for _, sample := range series.Values { + if time.Since(sample.Timestamp.Time()) > opts.SkipRecentSamples { + return false + } + } + + for _, sample := range series.Histograms { + if time.Since(sample.Timestamp.Time()) > opts.SkipRecentSamples { + return false + } + } + } + + return true +} + func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparisonOptions) (*ComparisonSummary, error) { var expected, actual model.Vector @@ -152,6 +178,10 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparison return nil, errors.Wrap(err, "unable to unmarshal actual vector") } + if allVectorSamplesWithinRecentSampleWindow(expected, opts) && allVectorSamplesWithinRecentSampleWindow(actual, opts) { + return &ComparisonSummary{skipped: true}, nil + } + if len(expected) != len(actual) { return nil, fmt.Errorf("expected %d metrics but got %d", len(expected), len(actual)) @@ -198,6 +228,20 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparison return &ComparisonSummary{missingMetrics: len(missingMetrics)}, err } +func allVectorSamplesWithinRecentSampleWindow(v model.Vector, opts SampleComparisonOptions) bool { + if opts.SkipRecentSamples == 0 { + return false + } + + for _, sample := range v { + if time.Since(sample.Timestamp.Time()) > opts.SkipRecentSamples { + return false + } + } + + return true +} + func compareScalar(expectedRaw, actualRaw json.RawMessage, opts SampleComparisonOptions) (*ComparisonSummary, error) { var expected, actual model.Scalar err := json.Unmarshal(expectedRaw, &expected) @@ -250,7 +294,7 @@ func compareSampleValue(first, second model.SampleValue, opts SampleComparisonOp return math.Abs(f-s) <= opts.Tolerance } -func compareStreams(expectedRaw, actualRaw json.RawMessage, _ SampleComparisonOptions) (*ComparisonSummary, error) { +func compareStreams(expectedRaw, actualRaw json.RawMessage, opts SampleComparisonOptions) (*ComparisonSummary, error) { var expected, actual loghttp.Streams err := jsoniter.Unmarshal(expectedRaw, &expected) @@ -262,6 +306,10 @@ func compareStreams(expectedRaw, actualRaw json.RawMessage, _ SampleComparisonOp return nil, errors.Wrap(err, "unable to unmarshal actual streams") } + if allStreamEntriesWithinRecentSampleWindow(actual, opts) && allStreamEntriesWithinRecentSampleWindow(expected, opts) { + return &ComparisonSummary{skipped: true}, nil + } + if len(expected) != len(actual) { return nil, fmt.Errorf("expected %d streams but got %d", len(expected), len(actual)) } @@ -294,6 +342,12 @@ func compareStreams(expectedRaw, actualRaw json.RawMessage, _ SampleComparisonOp for i, expectedSamplePair := range expectedStream.Entries { actualSamplePair := actualStream.Entries[i] + + // skip recent samples + if opts.SkipRecentSamples > 0 && time.Since(expectedSamplePair.Timestamp) < opts.SkipRecentSamples { + continue + } + if !expectedSamplePair.Timestamp.Equal(actualSamplePair.Timestamp) { return nil, fmt.Errorf("expected timestamp %v but got %v for stream %s", expectedSamplePair.Timestamp.UnixNano(), actualSamplePair.Timestamp.UnixNano(), expectedStream.Labels) @@ -307,3 +361,19 @@ func compareStreams(expectedRaw, actualRaw json.RawMessage, _ SampleComparisonOp return nil, nil } + +func allStreamEntriesWithinRecentSampleWindow(s loghttp.Streams, opts SampleComparisonOptions) bool { + if opts.SkipRecentSamples == 0 { + return false + } + + for _, stream := range s { + for _, sample := range stream.Entries { + if time.Since(sample.Timestamp) > opts.SkipRecentSamples { + return false + } + } + } + + return true +} From d1f0749e9f401c16dabe8106f2b49b5b0ca4c7ee Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Fri, 17 Jan 2025 19:24:05 +0530 Subject: [PATCH 2/8] handle more scenarios --- tools/querytee/response_comparator.go | 247 +++++++++++++++++++------- 1 file changed, 179 insertions(+), 68 deletions(-) diff --git a/tools/querytee/response_comparator.go b/tools/querytee/response_comparator.go index 499091882e704..987b0092fb0ce 100644 --- a/tools/querytee/response_comparator.go +++ b/tools/querytee/response_comparator.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/go-kit/log" "github.com/go-kit/log/level" jsoniter "github.com/json-iterator/go" "github.com/pkg/errors" @@ -96,7 +97,14 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparison return nil, errors.Wrap(err, "unable to unmarshal actual matrix") } - if allMatrixSamplesWithinRecentSampleWindow(expected, opts) && allMatrixSamplesWithinRecentSampleWindow(actual, opts) { + // Filter out series that only contain recent samples + if opts.SkipRecentSamples > 0 { + expected = filterRecentOnlySeries(expected, opts.SkipRecentSamples) + actual = filterRecentOnlySeries(actual, opts.SkipRecentSamples) + } + + // If both matrices are empty after filtering, we can skip comparison + if len(expected) == 0 && len(actual) == 0 { return &ComparisonSummary{skipped: true}, nil } @@ -117,51 +125,113 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparison } actualMetric := actual[actualMetricIndex] - expectedMetricLen := len(expectedMetric.Values) - actualMetricLen := len(actualMetric.Values) - - if expectedMetricLen != actualMetricLen { - err := fmt.Errorf("expected %d samples for metric %s but got %d", expectedMetricLen, - expectedMetric.Metric, actualMetricLen) - if expectedMetricLen > 0 && actualMetricLen > 0 { - level.Error(util_log.Logger).Log("msg", err.Error(), "oldest-expected-ts", expectedMetric.Values[0].Timestamp, - "newest-expected-ts", expectedMetric.Values[expectedMetricLen-1].Timestamp, - "oldest-actual-ts", actualMetric.Values[0].Timestamp, "newest-actual-ts", actualMetric.Values[actualMetricLen-1].Timestamp) - } - return nil, err - } - for i, expectedSamplePair := range expectedMetric.Values { - actualSamplePair := actualMetric.Values[i] - err := compareSamplePair(expectedSamplePair, actualSamplePair, opts) - if err != nil { - return nil, errors.Wrapf(err, "sample pair not matching for metric %s", expectedMetric.Metric) - } + err := compareMatrixSamples(expectedMetric, actualMetric, opts) + if err != nil { + return nil, fmt.Errorf("%w\nExpected result for series:\n%v\n\nActual result for series:\n%v", err, expectedMetric, actualMetric) } } return nil, nil } -func allMatrixSamplesWithinRecentSampleWindow(m model.Matrix, opts SampleComparisonOptions) bool { - if opts.SkipRecentSamples == 0 { - return false +func compareMatrixSamples(expected, actual *model.SampleStream, opts SampleComparisonOptions) error { + expectedSamplesTail, actualSamplesTail, err := comparePairs(expected.Values, actual.Values, func(p1 model.SamplePair, p2 model.SamplePair) error { + err := compareSamplePair(p1, p2, opts) + if err != nil { + return fmt.Errorf("float sample pair does not match for metric %s: %w", expected.Metric, err) + } + return nil + }) + if err != nil { + return err } - for _, series := range m { - for _, sample := range series.Values { - if time.Since(sample.Timestamp.Time()) > opts.SkipRecentSamples { - return false - } + expectedFloatSamplesCount := len(expected.Values) + actualFloatSamplesCount := len(actual.Values) + + if expectedFloatSamplesCount == actualFloatSamplesCount { + return nil + } + + skipAllFloatSamples := canSkipAllSamples(func(p model.SamplePair) bool { + return time.Since(p.Timestamp.Time())-opts.SkipRecentSamples < 0 + }, expectedSamplesTail, actualSamplesTail) + + if skipAllFloatSamples { + return nil + } + + err = fmt.Errorf( + "expected %d float sample(s) for metric %s but got %d float sample(s) ", + len(expected.Values), + expected.Metric, + len(actual.Values), + ) + + shouldLog := false + logger := util_log.Logger + + if expectedFloatSamplesCount > 0 && actualFloatSamplesCount > 0 { + logger = log.With(logger, + "oldest-expected-float-ts", expected.Values[0].Timestamp, + "newest-expected-float-ts", expected.Values[expectedFloatSamplesCount-1].Timestamp, + "oldest-actual-float-ts", actual.Values[0].Timestamp, + "newest-actual-float-ts", actual.Values[actualFloatSamplesCount-1].Timestamp, + ) + shouldLog = true + } + + if shouldLog { + level.Error(logger).Log("msg", err.Error()) + } + return err +} + +// comparePairs runs compare for pairs in s1 and s2. It stops on the first error the compare returns. +// If len(s1) != len(s2) it compares only elements inside the longest prefix of both. If all elements within the prefix match, +// it returns the tail of s1 and s2, and a nil error. +func comparePairs[S ~[]M, M any](s1, s2 S, compare func(M, M) error) (S, S, error) { + var i int + for ; i < len(s1) && i < len(s2); i++ { + err := compare(s1[i], s2[i]) + if err != nil { + return s1, s2, err + } + } + return s1[i:], s2[i:], nil +} + +// trimBeginning returns s without the prefix that satisfies skip(). +func trimBeginning[S ~[]M, M any](s S, skip func(M) bool) S { + var i int + for ; i < len(s); i++ { + if !skip(s[i]) { + break + } + } + return s[i:] +} + +// filterSlice returns a new slice with elements from s removed that satisfy skip(). +func filterSlice[S ~[]M, M any](s S, skip func(M) bool) S { + res := make(S, 0, len(s)) + for i := 0; i < len(s); i++ { + if !skip(s[i]) { + res = append(res, s[i]) } + } + return res +} - for _, sample := range series.Histograms { - if time.Since(sample.Timestamp.Time()) > opts.SkipRecentSamples { +func canSkipAllSamples[S ~[]M, M any](skip func(M) bool, ss ...S) bool { + for _, s := range ss { + for _, p := range s { + if !skip(p) { return false } } } - return true } @@ -264,12 +334,12 @@ func compareScalar(expectedRaw, actualRaw json.RawMessage, opts SampleComparison } func compareSamplePair(expected, actual model.SamplePair, opts SampleComparisonOptions) error { - if expected.Timestamp != actual.Timestamp { - return fmt.Errorf("expected timestamp %v but got %v", expected.Timestamp, actual.Timestamp) - } if opts.SkipRecentSamples > 0 && time.Since(expected.Timestamp.Time()) < opts.SkipRecentSamples { return nil } + if expected.Timestamp != actual.Timestamp { + return fmt.Errorf("expected timestamp %v but got %v", expected.Timestamp, actual.Timestamp) + } if !compareSampleValue(expected.Value, actual.Value, opts) { return fmt.Errorf("expected value %s for timestamp %v but got %s", expected.Value, expected.Timestamp, actual.Value) } @@ -306,7 +376,14 @@ func compareStreams(expectedRaw, actualRaw json.RawMessage, opts SampleCompariso return nil, errors.Wrap(err, "unable to unmarshal actual streams") } - if allStreamEntriesWithinRecentSampleWindow(actual, opts) && allStreamEntriesWithinRecentSampleWindow(expected, opts) { + // Filter out streams that only contain recent entries + if opts.SkipRecentSamples > 0 { + expected = filterRecentOnlyStreams(expected, opts.SkipRecentSamples) + actual = filterRecentOnlyStreams(actual, opts.SkipRecentSamples) + } + + // If both streams are empty after filtering, we can skip comparison + if len(expected) == 0 && len(actual) == 0 { return &ComparisonSummary{skipped: true}, nil } @@ -326,54 +403,88 @@ func compareStreams(expectedRaw, actualRaw json.RawMessage, opts SampleCompariso } actualStream := actual[actualStreamIndex] - expectedValuesLen := len(expectedStream.Entries) - actualValuesLen := len(actualStream.Entries) - - if expectedValuesLen != actualValuesLen { - err := fmt.Errorf("expected %d values for stream %s but got %d", expectedValuesLen, - expectedStream.Labels, actualValuesLen) - if expectedValuesLen > 0 && actualValuesLen > 0 { - level.Error(util_log.Logger).Log("msg", err.Error(), "oldest-expected-ts", expectedStream.Entries[0].Timestamp.UnixNano(), - "newest-expected-ts", expectedStream.Entries[expectedValuesLen-1].Timestamp.UnixNano(), - "oldest-actual-ts", actualStream.Entries[0].Timestamp.UnixNano(), "newest-actual-ts", actualStream.Entries[actualValuesLen-1].Timestamp.UnixNano()) + + expectedEntriesTail, actualEntriesTail, err := comparePairs(expectedStream.Entries, actualStream.Entries, func(e1, e2 loghttp.Entry) error { + if opts.SkipRecentSamples > 0 && time.Since(e1.Timestamp) < opts.SkipRecentSamples { + return nil + } + if !e1.Timestamp.Equal(e2.Timestamp) { + return fmt.Errorf("expected timestamp %v but got %v for stream %s", e1.Timestamp.UnixNano(), + e2.Timestamp.UnixNano(), expectedStream.Labels) + } + if e1.Line != e2.Line { + return fmt.Errorf("expected line %s for timestamp %v but got %s for stream %s", e1.Line, + e1.Timestamp.UnixNano(), e2.Line, expectedStream.Labels) } + return nil + }) + if err != nil { return nil, err } - for i, expectedSamplePair := range expectedStream.Entries { - actualSamplePair := actualStream.Entries[i] + expectedEntriesCount := len(expectedStream.Entries) + actualEntriesCount := len(actualStream.Entries) - // skip recent samples - if opts.SkipRecentSamples > 0 && time.Since(expectedSamplePair.Timestamp) < opts.SkipRecentSamples { - continue - } + if expectedEntriesCount == actualEntriesCount { + continue + } - if !expectedSamplePair.Timestamp.Equal(actualSamplePair.Timestamp) { - return nil, fmt.Errorf("expected timestamp %v but got %v for stream %s", expectedSamplePair.Timestamp.UnixNano(), - actualSamplePair.Timestamp.UnixNano(), expectedStream.Labels) - } - if expectedSamplePair.Line != actualSamplePair.Line { - return nil, fmt.Errorf("expected line %s for timestamp %v but got %s for stream %s", expectedSamplePair.Line, - expectedSamplePair.Timestamp.UnixNano(), actualSamplePair.Line, expectedStream.Labels) - } + skipAllEntries := canSkipAllSamples(func(e loghttp.Entry) bool { + return time.Since(e.Timestamp)-opts.SkipRecentSamples < 0 + }, expectedEntriesTail, actualEntriesTail) + + if skipAllEntries { + continue + } + + err = fmt.Errorf("expected %d entries for stream %s but got %d", expectedEntriesCount, + expectedStream.Labels, actualEntriesCount) + + if expectedEntriesCount > 0 && actualEntriesCount > 0 { + level.Error(util_log.Logger).Log("msg", err.Error(), + "oldest-expected-ts", expectedStream.Entries[0].Timestamp.UnixNano(), + "newest-expected-ts", expectedStream.Entries[expectedEntriesCount-1].Timestamp.UnixNano(), + "oldest-actual-ts", actualStream.Entries[0].Timestamp.UnixNano(), + "newest-actual-ts", actualStream.Entries[actualEntriesCount-1].Timestamp.UnixNano()) } + return nil, err } return nil, nil } -func allStreamEntriesWithinRecentSampleWindow(s loghttp.Streams, opts SampleComparisonOptions) bool { - if opts.SkipRecentSamples == 0 { - return false +// filterRecentOnlySeries removes series that only contain samples within the recent window +func filterRecentOnlySeries(matrix model.Matrix, recentWindow time.Duration) model.Matrix { + result := make(model.Matrix, 0, len(matrix)) + + for _, series := range matrix { + skipAllSamples := canSkipAllSamples(func(p model.SamplePair) bool { + return time.Since(p.Timestamp.Time())-recentWindow < 0 + }, series.Values) + + // Only keep series that have at least one old sample + if !skipAllSamples { + result = append(result, series) + } } - for _, stream := range s { - for _, sample := range stream.Entries { - if time.Since(sample.Timestamp) > opts.SkipRecentSamples { - return false - } + return result +} + +// filterRecentOnlyStreams removes streams that only contain entries within the recent window +func filterRecentOnlyStreams(streams loghttp.Streams, recentWindow time.Duration) loghttp.Streams { + result := make(loghttp.Streams, 0, len(streams)) + + for _, stream := range streams { + skipAllEntries := canSkipAllSamples(func(e loghttp.Entry) bool { + return time.Since(e.Timestamp)-recentWindow < 0 + }, stream.Entries) + + // Only keep streams that have at least one old entry + if !skipAllEntries { + result = append(result, stream) } } - return true + return result } From 6d92784f782f7d7040617e6220a26eec6fe49e05 Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Mon, 20 Jan 2025 15:14:52 +0530 Subject: [PATCH 3/8] filter before comparing --- tools/querytee/response_comparator.go | 305 +++++++++------------ tools/querytee/response_comparator_test.go | 140 +++++++++- 2 files changed, 256 insertions(+), 189 deletions(-) diff --git a/tools/querytee/response_comparator.go b/tools/querytee/response_comparator.go index 987b0092fb0ce..664bfd535c49f 100644 --- a/tools/querytee/response_comparator.go +++ b/tools/querytee/response_comparator.go @@ -7,7 +7,6 @@ import ( "strings" "time" - "github.com/go-kit/log" "github.com/go-kit/log/level" jsoniter "github.com/json-iterator/go" "github.com/pkg/errors" @@ -18,7 +17,7 @@ import ( ) // SamplesComparatorFunc helps with comparing different types of samples coming from /api/v1/query and /api/v1/query_range routes. -type SamplesComparatorFunc func(expected, actual json.RawMessage, opts SampleComparisonOptions) (*ComparisonSummary, error) +type SamplesComparatorFunc func(expected, actual json.RawMessage, evaluationTime time.Time, opts SampleComparisonOptions) (*ComparisonSummary, error) type SamplesResponse struct { Status string @@ -32,6 +31,19 @@ type SampleComparisonOptions struct { Tolerance float64 UseRelativeError bool SkipRecentSamples time.Duration + SkipSamplesBefore time.Duration +} + +func (opts *SampleComparisonOptions) SkipSample(sampleTime, evaluationTime time.Time) bool { + // Skip if sample is too old + if opts.SkipSamplesBefore > 0 && sampleTime.Before(evaluationTime.Add(-opts.SkipSamplesBefore)) { + return true + } + // Skip if sample is too recent + if opts.SkipRecentSamples > 0 && evaluationTime.Sub(sampleTime) < opts.SkipRecentSamples { + return true + } + return false } func NewSamplesComparator(opts SampleComparisonOptions) *SamplesComparator { @@ -56,7 +68,7 @@ func (s *SamplesComparator) RegisterSamplesType(samplesType string, comparator S s.sampleTypesComparator[samplesType] = comparator } -func (s *SamplesComparator) Compare(expectedResponse, actualResponse []byte) (*ComparisonSummary, error) { +func (s *SamplesComparator) Compare(expectedResponse, actualResponse []byte, evaluationTime time.Time) (*ComparisonSummary, error) { var expected, actual SamplesResponse err := json.Unmarshal(expectedResponse, &expected) @@ -82,10 +94,10 @@ func (s *SamplesComparator) Compare(expectedResponse, actualResponse []byte) (*C return nil, fmt.Errorf("resultType %s not registered for comparison", expected.Data.ResultType) } - return comparator(expected.Data.Result, actual.Data.Result, s.opts) + return comparator(expected.Data.Result, actual.Data.Result, evaluationTime, s.opts) } -func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparisonOptions) (*ComparisonSummary, error) { +func compareMatrix(expectedRaw, actualRaw json.RawMessage, evaluationTime time.Time, opts SampleComparisonOptions) (*ComparisonSummary, error) { var expected, actual model.Matrix err := json.Unmarshal(expectedRaw, &expected) @@ -97,10 +109,14 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparison return nil, errors.Wrap(err, "unable to unmarshal actual matrix") } - // Filter out series that only contain recent samples - if opts.SkipRecentSamples > 0 { - expected = filterRecentOnlySeries(expected, opts.SkipRecentSamples) - actual = filterRecentOnlySeries(actual, opts.SkipRecentSamples) + // Filter out samples outside the comparable window + if opts.SkipSamplesBefore > 0 || opts.SkipRecentSamples > 0 { + expected = filterSamplesOutsideWindow(expected, func(sampleTime time.Time) bool { + return opts.SkipSample(sampleTime, evaluationTime) + }) + actual = filterSamplesOutsideWindow(actual, func(sampleTime time.Time) bool { + return opts.SkipSample(sampleTime, evaluationTime) + }) } // If both matrices are empty after filtering, we can skip comparison @@ -109,6 +125,7 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparison } if len(expected) != len(actual) { + // TODO: log the missing metrics return nil, fmt.Errorf("expected %d metrics but got %d", len(expected), len(actual)) } @@ -136,106 +153,53 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparison } func compareMatrixSamples(expected, actual *model.SampleStream, opts SampleComparisonOptions) error { - expectedSamplesTail, actualSamplesTail, err := comparePairs(expected.Values, actual.Values, func(p1 model.SamplePair, p2 model.SamplePair) error { - err := compareSamplePair(p1, p2, opts) - if err != nil { - return fmt.Errorf("float sample pair does not match for metric %s: %w", expected.Metric, err) + expectedEntriesCount := len(expected.Values) + actualEntriesCount := len(actual.Values) + + if expectedEntriesCount != actualEntriesCount { + err := fmt.Errorf("expected %d samples for metric %s but got %d", expectedEntriesCount, expected.Metric, actualEntriesCount) + if actualEntriesCount > 0 && expectedEntriesCount > 0 { + level.Error(util_log.Logger).Log("msg", err.Error(), + "oldest-expected-ts", expected.Values[0].Timestamp, + "newest-expected-ts", expected.Values[expectedEntriesCount-1].Timestamp, + "oldest-actual-ts", actual.Values[0].Timestamp, + "newest-actual-ts", actual.Values[actualEntriesCount-1].Timestamp) } - return nil - }) - if err != nil { return err } - expectedFloatSamplesCount := len(expected.Values) - actualFloatSamplesCount := len(actual.Values) - - if expectedFloatSamplesCount == actualFloatSamplesCount { - return nil - } - - skipAllFloatSamples := canSkipAllSamples(func(p model.SamplePair) bool { - return time.Since(p.Timestamp.Time())-opts.SkipRecentSamples < 0 - }, expectedSamplesTail, actualSamplesTail) - - if skipAllFloatSamples { - return nil - } - - err = fmt.Errorf( - "expected %d float sample(s) for metric %s but got %d float sample(s) ", - len(expected.Values), - expected.Metric, - len(actual.Values), - ) - - shouldLog := false - logger := util_log.Logger - - if expectedFloatSamplesCount > 0 && actualFloatSamplesCount > 0 { - logger = log.With(logger, - "oldest-expected-float-ts", expected.Values[0].Timestamp, - "newest-expected-float-ts", expected.Values[expectedFloatSamplesCount-1].Timestamp, - "oldest-actual-float-ts", actual.Values[0].Timestamp, - "newest-actual-float-ts", actual.Values[actualFloatSamplesCount-1].Timestamp, - ) - shouldLog = true - } - - if shouldLog { - level.Error(logger).Log("msg", err.Error()) - } - return err -} - -// comparePairs runs compare for pairs in s1 and s2. It stops on the first error the compare returns. -// If len(s1) != len(s2) it compares only elements inside the longest prefix of both. If all elements within the prefix match, -// it returns the tail of s1 and s2, and a nil error. -func comparePairs[S ~[]M, M any](s1, s2 S, compare func(M, M) error) (S, S, error) { - var i int - for ; i < len(s1) && i < len(s2); i++ { - err := compare(s1[i], s2[i]) + for i := range expected.Values { + err := compareSamplePair(expected.Values[i], actual.Values[i], opts) if err != nil { - return s1, s2, err + return fmt.Errorf("float sample pair does not match for metric %s: %w", expected.Metric, err) } } - return s1[i:], s2[i:], nil -} -// trimBeginning returns s without the prefix that satisfies skip(). -func trimBeginning[S ~[]M, M any](s S, skip func(M) bool) S { - var i int - for ; i < len(s); i++ { - if !skip(s[i]) { - break - } - } - return s[i:] + return nil } -// filterSlice returns a new slice with elements from s removed that satisfy skip(). -func filterSlice[S ~[]M, M any](s S, skip func(M) bool) S { - res := make(S, 0, len(s)) - for i := 0; i < len(s); i++ { - if !skip(s[i]) { - res = append(res, s[i]) - } - } - return res -} +func filterSamplesOutsideWindow(matrix model.Matrix, skipSample func(time.Time) bool) model.Matrix { + result := matrix[:0] // Reuse the original slice capacity while starting with length 0 -func canSkipAllSamples[S ~[]M, M any](skip func(M) bool, ss ...S) bool { - for _, s := range ss { - for _, p := range s { - if !skip(p) { - return false + for _, series := range matrix { + // Reuse the original Values slice + filteredValues := series.Values[:0] + for _, sample := range series.Values { + if !skipSample(sample.Timestamp.Time()) { + filteredValues = append(filteredValues, sample) } } + + if len(filteredValues) > 0 { + series.Values = filteredValues + result = append(result, series) + } } - return true + + return result } -func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparisonOptions) (*ComparisonSummary, error) { +func compareVector(expectedRaw, actualRaw json.RawMessage, evaluationTime time.Time, opts SampleComparisonOptions) (*ComparisonSummary, error) { var expected, actual model.Vector err := json.Unmarshal(expectedRaw, &expected) @@ -248,7 +212,26 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparison return nil, errors.Wrap(err, "unable to unmarshal actual vector") } - if allVectorSamplesWithinRecentSampleWindow(expected, opts) && allVectorSamplesWithinRecentSampleWindow(actual, opts) { + // Filter out samples outside the comparable windows + if opts.SkipSamplesBefore > 0 || opts.SkipRecentSamples > 0 { + filtered := expected[:0] + for i := range expected { + if !opts.SkipSample(expected[i].Timestamp.Time(), evaluationTime) { + filtered = append(filtered, expected[i]) + } + } + expected = filtered + + filtered = actual[:0] + for i := range actual { + if !opts.SkipSample(actual[i].Timestamp.Time(), evaluationTime) { + filtered = append(filtered, actual[i]) + } + } + actual = filtered + } + + if len(expected) == 0 && len(actual) == 0 { return &ComparisonSummary{skipped: true}, nil } @@ -298,21 +281,7 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparison return &ComparisonSummary{missingMetrics: len(missingMetrics)}, err } -func allVectorSamplesWithinRecentSampleWindow(v model.Vector, opts SampleComparisonOptions) bool { - if opts.SkipRecentSamples == 0 { - return false - } - - for _, sample := range v { - if time.Since(sample.Timestamp.Time()) > opts.SkipRecentSamples { - return false - } - } - - return true -} - -func compareScalar(expectedRaw, actualRaw json.RawMessage, opts SampleComparisonOptions) (*ComparisonSummary, error) { +func compareScalar(expectedRaw, actualRaw json.RawMessage, evaluationTime time.Time, opts SampleComparisonOptions) (*ComparisonSummary, error) { var expected, actual model.Scalar err := json.Unmarshal(expectedRaw, &expected) if err != nil { @@ -324,6 +293,10 @@ func compareScalar(expectedRaw, actualRaw json.RawMessage, opts SampleComparison return nil, errors.Wrap(err, "unable to actual expected scalar") } + if opts.SkipSample(expected.Timestamp.Time(), evaluationTime) || opts.SkipSample(actual.Timestamp.Time(), evaluationTime) { + return nil, nil + } + return nil, compareSamplePair(model.SamplePair{ Timestamp: expected.Timestamp, Value: expected.Value, @@ -334,9 +307,6 @@ func compareScalar(expectedRaw, actualRaw json.RawMessage, opts SampleComparison } func compareSamplePair(expected, actual model.SamplePair, opts SampleComparisonOptions) error { - if opts.SkipRecentSamples > 0 && time.Since(expected.Timestamp.Time()) < opts.SkipRecentSamples { - return nil - } if expected.Timestamp != actual.Timestamp { return fmt.Errorf("expected timestamp %v but got %v", expected.Timestamp, actual.Timestamp) } @@ -364,7 +334,7 @@ func compareSampleValue(first, second model.SampleValue, opts SampleComparisonOp return math.Abs(f-s) <= opts.Tolerance } -func compareStreams(expectedRaw, actualRaw json.RawMessage, opts SampleComparisonOptions) (*ComparisonSummary, error) { +func compareStreams(expectedRaw, actualRaw json.RawMessage, evaluationTime time.Time, opts SampleComparisonOptions) (*ComparisonSummary, error) { var expected, actual loghttp.Streams err := jsoniter.Unmarshal(expectedRaw, &expected) @@ -376,10 +346,14 @@ func compareStreams(expectedRaw, actualRaw json.RawMessage, opts SampleCompariso return nil, errors.Wrap(err, "unable to unmarshal actual streams") } - // Filter out streams that only contain recent entries - if opts.SkipRecentSamples > 0 { - expected = filterRecentOnlyStreams(expected, opts.SkipRecentSamples) - actual = filterRecentOnlyStreams(actual, opts.SkipRecentSamples) + // Filter out entries outside the comparable window + if opts.SkipSamplesBefore > 0 || opts.SkipRecentSamples > 0 { + expected = filterStreamsOutsideWindow(expected, func(entryTime time.Time) bool { + return opts.SkipSample(entryTime, evaluationTime) + }) + actual = filterStreamsOutsideWindow(actual, func(entryTime time.Time) bool { + return opts.SkipSample(entryTime, evaluationTime) + }) } // If both streams are empty after filtering, we can skip comparison @@ -388,6 +362,7 @@ func compareStreams(expectedRaw, actualRaw json.RawMessage, opts SampleCompariso } if len(expected) != len(actual) { + // TODO: log the missing stream return nil, fmt.Errorf("expected %d streams but got %d", len(expected), len(actual)) } @@ -403,85 +378,51 @@ func compareStreams(expectedRaw, actualRaw json.RawMessage, opts SampleCompariso } actualStream := actual[actualStreamIndex] - - expectedEntriesTail, actualEntriesTail, err := comparePairs(expectedStream.Entries, actualStream.Entries, func(e1, e2 loghttp.Entry) error { - if opts.SkipRecentSamples > 0 && time.Since(e1.Timestamp) < opts.SkipRecentSamples { - return nil - } - if !e1.Timestamp.Equal(e2.Timestamp) { - return fmt.Errorf("expected timestamp %v but got %v for stream %s", e1.Timestamp.UnixNano(), - e2.Timestamp.UnixNano(), expectedStream.Labels) - } - if e1.Line != e2.Line { - return fmt.Errorf("expected line %s for timestamp %v but got %s for stream %s", e1.Line, - e1.Timestamp.UnixNano(), e2.Line, expectedStream.Labels) - } - return nil - }) - if err != nil { - return nil, err - } - expectedEntriesCount := len(expectedStream.Entries) actualEntriesCount := len(actualStream.Entries) - if expectedEntriesCount == actualEntriesCount { - continue - } - - skipAllEntries := canSkipAllSamples(func(e loghttp.Entry) bool { - return time.Since(e.Timestamp)-opts.SkipRecentSamples < 0 - }, expectedEntriesTail, actualEntriesTail) - - if skipAllEntries { - continue + if expectedEntriesCount != actualEntriesCount { + err = fmt.Errorf("expected %d values for stream %s but got %d", expectedEntriesCount, expectedStream.Labels, actualEntriesCount) + if actualEntriesCount > 0 && expectedEntriesCount > 0 { + level.Error(util_log.Logger).Log("msg", err.Error(), + "oldest-expected-ts", expectedStream.Entries[0].Timestamp.UnixNano(), + "newest-expected-ts", expectedStream.Entries[expectedEntriesCount-1].Timestamp.UnixNano(), + "oldest-actual-ts", actualStream.Entries[0].Timestamp.UnixNano(), + "newest-actual-ts", actualStream.Entries[actualEntriesCount-1].Timestamp.UnixNano()) + } + return nil, err } - err = fmt.Errorf("expected %d entries for stream %s but got %d", expectedEntriesCount, - expectedStream.Labels, actualEntriesCount) - - if expectedEntriesCount > 0 && actualEntriesCount > 0 { - level.Error(util_log.Logger).Log("msg", err.Error(), - "oldest-expected-ts", expectedStream.Entries[0].Timestamp.UnixNano(), - "newest-expected-ts", expectedStream.Entries[expectedEntriesCount-1].Timestamp.UnixNano(), - "oldest-actual-ts", actualStream.Entries[0].Timestamp.UnixNano(), - "newest-actual-ts", actualStream.Entries[actualEntriesCount-1].Timestamp.UnixNano()) + for i := range expectedStream.Entries { + if !expectedStream.Entries[i].Timestamp.Equal(actualStream.Entries[i].Timestamp) { + return nil, fmt.Errorf("expected timestamp %v but got %v for stream %s", + expectedStream.Entries[i].Timestamp.UnixNano(), actualStream.Entries[i].Timestamp.UnixNano(), expectedStream.Labels) + } + if expectedStream.Entries[i].Line != actualStream.Entries[i].Line { + return nil, fmt.Errorf("expected line %s for timestamp %v but got %s for stream %s", + expectedStream.Entries[i].Line, expectedStream.Entries[i].Timestamp.UnixNano(), actualStream.Entries[i].Line, expectedStream.Labels) + } } - return nil, err } return nil, nil } -// filterRecentOnlySeries removes series that only contain samples within the recent window -func filterRecentOnlySeries(matrix model.Matrix, recentWindow time.Duration) model.Matrix { - result := make(model.Matrix, 0, len(matrix)) - - for _, series := range matrix { - skipAllSamples := canSkipAllSamples(func(p model.SamplePair) bool { - return time.Since(p.Timestamp.Time())-recentWindow < 0 - }, series.Values) - - // Only keep series that have at least one old sample - if !skipAllSamples { - result = append(result, series) - } - } - - return result -} - -// filterRecentOnlyStreams removes streams that only contain entries within the recent window -func filterRecentOnlyStreams(streams loghttp.Streams, recentWindow time.Duration) loghttp.Streams { - result := make(loghttp.Streams, 0, len(streams)) +// filterStreamsOutsideWindow filters out entries that are outside the comparable window +func filterStreamsOutsideWindow(streams loghttp.Streams, skipEntry func(time.Time) bool) loghttp.Streams { + result := streams[:0] // Reuse the original slice capacity while starting with length 0 for _, stream := range streams { - skipAllEntries := canSkipAllSamples(func(e loghttp.Entry) bool { - return time.Since(e.Timestamp)-recentWindow < 0 - }, stream.Entries) + // Reuse the original Entries slice + filteredEntries := stream.Entries[:0] + for _, entry := range stream.Entries { + if !skipEntry(entry.Timestamp) { + filteredEntries = append(filteredEntries, entry) + } + } - // Only keep streams that have at least one old entry - if !skipAllEntries { + if len(filteredEntries) > 0 { + stream.Entries = filteredEntries result = append(result, stream) } } diff --git a/tools/querytee/response_comparator_test.go b/tools/querytee/response_comparator_test.go index ce53fd15ce7ce..26910d027b508 100644 --- a/tools/querytee/response_comparator_test.go +++ b/tools/querytee/response_comparator_test.go @@ -70,7 +70,7 @@ func TestCompareMatrix(t *testing.T) { {"metric":{"foo":"bar"},"values":[[1,"1"],[3,"2"]]} ]`), // timestamps are parsed from seconds to ms which are then added to errors as is so adding 3 0s to expected error. - err: errors.New("sample pair not matching for metric {foo=\"bar\"}: expected timestamp 2 but got 3"), + err: errors.New("float sample pair does not match for metric {foo=\"bar\"}: expected timestamp 2 but got 3"), }, { name: "difference in sample value", @@ -80,7 +80,7 @@ func TestCompareMatrix(t *testing.T) { actual: json.RawMessage(`[ {"metric":{"foo":"bar"},"values":[[1,"1"],[2,"3"]]} ]`), - err: errors.New("sample pair not matching for metric {foo=\"bar\"}: expected value 2 for timestamp 2 but got 3"), + err: errors.New("float sample pair does not match for metric {foo=\"bar\"}: expected value 2 for timestamp 2 but got 3"), }, { name: "correct samples", @@ -93,7 +93,80 @@ func TestCompareMatrix(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - _, err := compareMatrix(tc.expected, tc.actual, SampleComparisonOptions{}) + _, err := compareMatrix(tc.expected, tc.actual, time.Now(), SampleComparisonOptions{}) + if tc.err == nil { + require.NoError(t, err) + return + } + require.Error(t, err) + require.ErrorContains(t, err, tc.err.Error()) + }) + } +} + +func TestCompareMatrix_SamplesOutsideComparableWindow(t *testing.T) { + for _, tc := range []struct { + name string + expected json.RawMessage + actual json.RawMessage + skipSamplesBefore time.Duration + skipRecentSamples time.Duration + evaluationTime time.Time + err error + }{ + { + name: "skip samples before window", + expected: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[0,"1"],[5,"2"],[15,"3"],[20,"4"]]} + ]`), + actual: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[5,"1"],[15,"3"],[20,"4"]]} + ]`), + skipSamplesBefore: 90 * time.Second, + evaluationTime: time.Unix(100, 0), + }, + { + name: "skip recent samples", + expected: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[5,"1"],[25,"2"],[80,"3"],[94,"4"],[96,"5"]]} + ]`), + actual: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[5,"1"],[25,"2"],[80,"3"],[95, "4"]]} + ]`), + skipRecentSamples: 10 * time.Second, + evaluationTime: time.Unix(100, 0), + }, + { + name: "skip both recent and old samples", + expected: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[5,"1"],[25,"2"],[80,"3"],[94,"4"],[96,"5"]]} + ]`), + actual: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[5,"0"],[25,"2"],[80,"3"],[95, "4"]]} + ]`), + skipSamplesBefore: 10 * time.Second, + skipRecentSamples: 10 * time.Second, + evaluationTime: time.Unix(100, 0), + }, + { + name: "skip entire series", + expected: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[5,"1"],[94,"4"],[96,"5"]]} + ]`), + actual: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[5,"0"],[96,"1"]]} + ]`), + skipSamplesBefore: 10 * time.Second, + skipRecentSamples: 10 * time.Second, + evaluationTime: time.Unix(100, 0), + }, + } { + t.Run(tc.name, func(t *testing.T) { + _, err := compareMatrix(tc.expected, tc.actual, tc.evaluationTime, SampleComparisonOptions{ + SkipSamplesBefore: tc.skipSamplesBefore, + SkipRecentSamples: tc.skipRecentSamples, + }) + if tc.err == nil { require.NoError(t, err) return @@ -176,7 +249,7 @@ func TestCompareVector(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - _, err := compareVector(tc.expected, tc.actual, SampleComparisonOptions{}) + _, err := compareVector(tc.expected, tc.actual, time.Now(), SampleComparisonOptions{}) if tc.err == nil { require.NoError(t, err) return @@ -213,7 +286,7 @@ func TestCompareScalar(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - _, err := compareScalar(tc.expected, tc.actual, SampleComparisonOptions{}) + _, err := compareScalar(tc.expected, tc.actual, time.Now(), SampleComparisonOptions{}) if tc.err == nil { require.NoError(t, err) return @@ -408,7 +481,7 @@ func TestCompareSamplesResponse(t *testing.T) { UseRelativeError: tc.useRelativeError, SkipRecentSamples: tc.skipRecentSamples, }) - _, err := samplesComparator.Compare(tc.expected, tc.actual) + _, err := samplesComparator.Compare(tc.expected, tc.actual, time.Now()) if tc.err == nil { require.NoError(t, err) return @@ -501,9 +574,62 @@ func TestCompareStreams(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - _, err := compareStreams(tc.expected, tc.actual, SampleComparisonOptions{Tolerance: 0}) + _, err := compareStreams(tc.expected, tc.actual, time.Now(), SampleComparisonOptions{Tolerance: 0}) + if tc.err == nil { + require.NoError(t, err) + return + } + require.Error(t, err) + require.Equal(t, tc.err.Error(), err.Error()) + }) + } +} + +func TestCompareStreams_SamplesOutsideComparableWindow(t *testing.T) { + for _, tc := range []struct { + name string + expected json.RawMessage + actual json.RawMessage + skipSamplesBefore time.Duration + skipRecentSamples time.Duration + evaluationTime time.Time + err error + }{ + // stream entry timestamp is in ns + { + name: "skip samples before window", + expected: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["5","1"],["15","2"],["50","3"],["95","4"]]} + ]`), + actual: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["2","0"],["15","2"],["50","3"],["95","4"]]} + ]`), + skipSamplesBefore: 90 * time.Nanosecond, + evaluationTime: time.Unix(0, 100), + }, + { + name: "skip recent samples", + expected: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["5","1"],["15","2"],["50","3"],["95","4"]]} + ]`), + actual: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["5","1"],["15","2"],["50","3"]]} + ]`), + skipRecentSamples: 10 * time.Nanosecond, + evaluationTime: time.Unix(0, 100), + }, + } { + t.Run(tc.name, func(t *testing.T) { + summary, err := compareStreams(tc.expected, tc.actual, tc.evaluationTime, SampleComparisonOptions{ + SkipSamplesBefore: tc.skipSamplesBefore, + SkipRecentSamples: tc.skipRecentSamples, + }) + if tc.err == nil { require.NoError(t, err) + if summary != nil { + require.True(t, summary.skipped) + } return } require.Error(t, err) From a7288bf048730159380e8a373ed24eca1afbedce Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Mon, 20 Jan 2025 16:34:01 +0530 Subject: [PATCH 4/8] wire SkipSamplesBefore --- cmd/querytee/main.go | 2 ++ tools/querytee/proxy.go | 7 +++++-- tools/querytee/proxy_endpoint.go | 8 ++++---- tools/querytee/proxy_endpoint_test.go | 2 +- tools/querytee/proxy_test.go | 2 +- tools/querytee/response_comparator.go | 13 +++++++------ tools/querytee/response_comparator_test.go | 12 ++++++------ 7 files changed, 26 insertions(+), 20 deletions(-) diff --git a/cmd/querytee/main.go b/cmd/querytee/main.go index 5acebfed85179..92f4187bb1383 100644 --- a/cmd/querytee/main.go +++ b/cmd/querytee/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "os" + "time" "github.com/go-kit/log/level" "github.com/grafana/dskit/log" @@ -62,6 +63,7 @@ func lokiReadRoutes(cfg Config) []querytee.Route { Tolerance: cfg.ProxyConfig.ValueComparisonTolerance, UseRelativeError: cfg.ProxyConfig.UseRelativeError, SkipRecentSamples: cfg.ProxyConfig.SkipRecentSamples, + SkipSamplesBefore: time.Time(cfg.ProxyConfig.SkipSamplesBefore), }) return []querytee.Route{ diff --git a/tools/querytee/proxy.go b/tools/querytee/proxy.go index d5d842e11c3c5..392bd008a439a 100644 --- a/tools/querytee/proxy.go +++ b/tools/querytee/proxy.go @@ -17,6 +17,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gorilla/mux" + "github.com/grafana/dskit/flagext" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -34,6 +35,7 @@ type ProxyConfig struct { UseRelativeError bool PassThroughNonRegisteredRoutes bool SkipRecentSamples time.Duration + SkipSamplesBefore flagext.Time RequestURLFilter *regexp.Regexp InstrumentCompares bool } @@ -48,10 +50,11 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&cfg.ValueComparisonTolerance, "proxy.value-comparison-tolerance", 0.000001, "The tolerance to apply when comparing floating point values in the responses. 0 to disable tolerance and require exact match (not recommended).") f.BoolVar(&cfg.UseRelativeError, "proxy.compare-use-relative-error", false, "Use relative error tolerance when comparing floating point values.") f.DurationVar(&cfg.SkipRecentSamples, "proxy.compare-skip-recent-samples", 60*time.Second, "The window from now to skip comparing samples. 0 to disable.") + f.Var(&cfg.SkipSamplesBefore, "proxy.compare-skip-samples-before", "Skip the samples before the given time for comparison. The time can be in RFC3339 format (or) RFC3339 without the timezone and seconds (or) date only.") f.BoolVar(&cfg.PassThroughNonRegisteredRoutes, "proxy.passthrough-non-registered-routes", false, "Passthrough requests for non-registered routes to preferred backend.") - f.Func("backend.filter", "A request filter as a regular expression. Only matches are proxied to non-preferred backends.", func(raw string) error{ + f.Func("backend.filter", "A request filter as a regular expression. Only matches are proxied to non-preferred backends.", func(raw string) error { var err error - cfg.RequestURLFilter, err = regexp.Compile(raw) + cfg.RequestURLFilter, err = regexp.Compile(raw) return err }) f.BoolVar(&cfg.InstrumentCompares, "proxy.compare-instrument", false, "Reports metrics on comparisons of responses between preferred and non-preferred endpoints for supported routes.") diff --git a/tools/querytee/proxy_endpoint.go b/tools/querytee/proxy_endpoint.go index 54ebbb932cd39..e2fd53e52f4f4 100644 --- a/tools/querytee/proxy_endpoint.go +++ b/tools/querytee/proxy_endpoint.go @@ -15,7 +15,7 @@ import ( ) type ResponsesComparator interface { - Compare(expected, actual []byte) (*ComparisonSummary, error) + Compare(expected, actual []byte, queryEvaluationTime time.Time) (*ComparisonSummary, error) } type ComparisonSummary struct { @@ -176,7 +176,7 @@ func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *back actualResponse := responses[i] result := comparisonSuccess - summary, err := p.compareResponses(expectedResponse, actualResponse) + summary, err := p.compareResponses(expectedResponse, actualResponse, time.Now().UTC()) if err != nil { level.Error(p.logger).Log("msg", "response comparison failed", "backend-name", p.backends[i].name, @@ -230,7 +230,7 @@ func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *backendResp return responses[0] } -func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backendResponse) (*ComparisonSummary, error) { +func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backendResponse, queryEvalTime time.Time) (*ComparisonSummary, error) { if expectedResponse.err != nil { return &ComparisonSummary{skipped: true}, nil } @@ -252,7 +252,7 @@ func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backe return nil, fmt.Errorf("expected status code %d but got %d", expectedResponse.status, actualResponse.status) } - return p.comparator.Compare(expectedResponse.body, actualResponse.body) + return p.comparator.Compare(expectedResponse.body, actualResponse.body, queryEvalTime) } type backendResponse struct { diff --git a/tools/querytee/proxy_endpoint_test.go b/tools/querytee/proxy_endpoint_test.go index 5cfee42b504dd..3c2553856fe33 100644 --- a/tools/querytee/proxy_endpoint_test.go +++ b/tools/querytee/proxy_endpoint_test.go @@ -421,6 +421,6 @@ func Test_backendResponse_statusCode(t *testing.T) { type mockComparator struct{} -func (c *mockComparator) Compare(_, _ []byte) (*ComparisonSummary, error) { +func (c *mockComparator) Compare(_, _ []byte, _ time.Time) (*ComparisonSummary, error) { return &ComparisonSummary{missingMetrics: 12}, nil } diff --git a/tools/querytee/proxy_test.go b/tools/querytee/proxy_test.go index 22b122bcc2aa7..6db5caeadb762 100644 --- a/tools/querytee/proxy_test.go +++ b/tools/querytee/proxy_test.go @@ -25,7 +25,7 @@ var testWriteRoutes = []Route{} type testComparator struct{} -func (testComparator) Compare(_, _ []byte) (*ComparisonSummary, error) { return nil, nil } +func (testComparator) Compare(_, _ []byte, _ time.Time) (*ComparisonSummary, error) { return nil, nil } func Test_NewProxy(t *testing.T) { cfg := ProxyConfig{} diff --git a/tools/querytee/response_comparator.go b/tools/querytee/response_comparator.go index 664bfd535c49f..8a8998f030a27 100644 --- a/tools/querytee/response_comparator.go +++ b/tools/querytee/response_comparator.go @@ -31,16 +31,17 @@ type SampleComparisonOptions struct { Tolerance float64 UseRelativeError bool SkipRecentSamples time.Duration - SkipSamplesBefore time.Duration + SkipSamplesBefore time.Time } func (opts *SampleComparisonOptions) SkipSample(sampleTime, evaluationTime time.Time) bool { // Skip if sample is too old - if opts.SkipSamplesBefore > 0 && sampleTime.Before(evaluationTime.Add(-opts.SkipSamplesBefore)) { + if !opts.SkipSamplesBefore.IsZero() && sampleTime.Before(opts.SkipSamplesBefore) { return true } + // Skip if sample is too recent - if opts.SkipRecentSamples > 0 && evaluationTime.Sub(sampleTime) < opts.SkipRecentSamples { + if opts.SkipRecentSamples > 0 && sampleTime.After(evaluationTime.Add(-opts.SkipRecentSamples)) { return true } return false @@ -110,7 +111,7 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, evaluationTime time.T } // Filter out samples outside the comparable window - if opts.SkipSamplesBefore > 0 || opts.SkipRecentSamples > 0 { + if !opts.SkipSamplesBefore.IsZero() || opts.SkipRecentSamples > 0 { expected = filterSamplesOutsideWindow(expected, func(sampleTime time.Time) bool { return opts.SkipSample(sampleTime, evaluationTime) }) @@ -213,7 +214,7 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, evaluationTime time.T } // Filter out samples outside the comparable windows - if opts.SkipSamplesBefore > 0 || opts.SkipRecentSamples > 0 { + if !opts.SkipSamplesBefore.IsZero() || opts.SkipRecentSamples > 0 { filtered := expected[:0] for i := range expected { if !opts.SkipSample(expected[i].Timestamp.Time(), evaluationTime) { @@ -347,7 +348,7 @@ func compareStreams(expectedRaw, actualRaw json.RawMessage, evaluationTime time. } // Filter out entries outside the comparable window - if opts.SkipSamplesBefore > 0 || opts.SkipRecentSamples > 0 { + if !opts.SkipSamplesBefore.IsZero() || opts.SkipRecentSamples > 0 { expected = filterStreamsOutsideWindow(expected, func(entryTime time.Time) bool { return opts.SkipSample(entryTime, evaluationTime) }) diff --git a/tools/querytee/response_comparator_test.go b/tools/querytee/response_comparator_test.go index 26910d027b508..aef8525ddcebf 100644 --- a/tools/querytee/response_comparator_test.go +++ b/tools/querytee/response_comparator_test.go @@ -109,7 +109,7 @@ func TestCompareMatrix_SamplesOutsideComparableWindow(t *testing.T) { name string expected json.RawMessage actual json.RawMessage - skipSamplesBefore time.Duration + skipSamplesBefore time.Time skipRecentSamples time.Duration evaluationTime time.Time err error @@ -122,7 +122,7 @@ func TestCompareMatrix_SamplesOutsideComparableWindow(t *testing.T) { actual: json.RawMessage(`[ {"metric":{"foo":"bar"},"values":[[5,"1"],[15,"3"],[20,"4"]]} ]`), - skipSamplesBefore: 90 * time.Second, + skipSamplesBefore: time.Unix(10, 0), evaluationTime: time.Unix(100, 0), }, { @@ -144,7 +144,7 @@ func TestCompareMatrix_SamplesOutsideComparableWindow(t *testing.T) { actual: json.RawMessage(`[ {"metric":{"foo":"bar"},"values":[[5,"0"],[25,"2"],[80,"3"],[95, "4"]]} ]`), - skipSamplesBefore: 10 * time.Second, + skipSamplesBefore: time.Unix(10, 0), skipRecentSamples: 10 * time.Second, evaluationTime: time.Unix(100, 0), }, @@ -156,7 +156,7 @@ func TestCompareMatrix_SamplesOutsideComparableWindow(t *testing.T) { actual: json.RawMessage(`[ {"metric":{"foo":"bar"},"values":[[5,"0"],[96,"1"]]} ]`), - skipSamplesBefore: 10 * time.Second, + skipSamplesBefore: time.Unix(10, 0), skipRecentSamples: 10 * time.Second, evaluationTime: time.Unix(100, 0), }, @@ -590,7 +590,7 @@ func TestCompareStreams_SamplesOutsideComparableWindow(t *testing.T) { name string expected json.RawMessage actual json.RawMessage - skipSamplesBefore time.Duration + skipSamplesBefore time.Time skipRecentSamples time.Duration evaluationTime time.Time err error @@ -604,7 +604,7 @@ func TestCompareStreams_SamplesOutsideComparableWindow(t *testing.T) { actual: json.RawMessage(`[ {"stream":{"foo":"bar"},"values":[["2","0"],["15","2"],["50","3"],["95","4"]]} ]`), - skipSamplesBefore: 90 * time.Nanosecond, + skipSamplesBefore: time.Unix(0, 10), evaluationTime: time.Unix(0, 100), }, { From febe35b706a3b79e3832b3c3ad957182e0d90e25 Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Mon, 20 Jan 2025 16:45:28 +0530 Subject: [PATCH 5/8] fixup! filter before comparing --- tools/querytee/response_comparator.go | 40 +++++++++++++-------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/tools/querytee/response_comparator.go b/tools/querytee/response_comparator.go index 8a8998f030a27..23d06ad0c0a01 100644 --- a/tools/querytee/response_comparator.go +++ b/tools/querytee/response_comparator.go @@ -294,8 +294,8 @@ func compareScalar(expectedRaw, actualRaw json.RawMessage, evaluationTime time.T return nil, errors.Wrap(err, "unable to actual expected scalar") } - if opts.SkipSample(expected.Timestamp.Time(), evaluationTime) || opts.SkipSample(actual.Timestamp.Time(), evaluationTime) { - return nil, nil + if opts.SkipSample(expected.Timestamp.Time(), evaluationTime) && opts.SkipSample(actual.Timestamp.Time(), evaluationTime) { + return &ComparisonSummary{skipped: true}, nil } return nil, compareSamplePair(model.SamplePair{ @@ -379,29 +379,29 @@ func compareStreams(expectedRaw, actualRaw json.RawMessage, evaluationTime time. } actualStream := actual[actualStreamIndex] - expectedEntriesCount := len(expectedStream.Entries) - actualEntriesCount := len(actualStream.Entries) - - if expectedEntriesCount != actualEntriesCount { - err = fmt.Errorf("expected %d values for stream %s but got %d", expectedEntriesCount, expectedStream.Labels, actualEntriesCount) - if actualEntriesCount > 0 && expectedEntriesCount > 0 { - level.Error(util_log.Logger).Log("msg", err.Error(), - "oldest-expected-ts", expectedStream.Entries[0].Timestamp.UnixNano(), - "newest-expected-ts", expectedStream.Entries[expectedEntriesCount-1].Timestamp.UnixNano(), - "oldest-actual-ts", actualStream.Entries[0].Timestamp.UnixNano(), - "newest-actual-ts", actualStream.Entries[actualEntriesCount-1].Timestamp.UnixNano()) + expectedValuesLen := len(expectedStream.Entries) + actualValuesLen := len(actualStream.Entries) + + if expectedValuesLen != actualValuesLen { + err := fmt.Errorf("expected %d values for stream %s but got %d", expectedValuesLen, + expectedStream.Labels, actualValuesLen) + if expectedValuesLen > 0 && actualValuesLen > 0 { + level.Error(util_log.Logger).Log("msg", err.Error(), "oldest-expected-ts", expectedStream.Entries[0].Timestamp.UnixNano(), + "newest-expected-ts", expectedStream.Entries[expectedValuesLen-1].Timestamp.UnixNano(), + "oldest-actual-ts", actualStream.Entries[0].Timestamp.UnixNano(), "newest-actual-ts", actualStream.Entries[actualValuesLen-1].Timestamp.UnixNano()) } return nil, err } - for i := range expectedStream.Entries { - if !expectedStream.Entries[i].Timestamp.Equal(actualStream.Entries[i].Timestamp) { - return nil, fmt.Errorf("expected timestamp %v but got %v for stream %s", - expectedStream.Entries[i].Timestamp.UnixNano(), actualStream.Entries[i].Timestamp.UnixNano(), expectedStream.Labels) + for i, expectedSamplePair := range expectedStream.Entries { + actualSamplePair := actualStream.Entries[i] + if !expectedSamplePair.Timestamp.Equal(actualSamplePair.Timestamp) { + return nil, fmt.Errorf("expected timestamp %v but got %v for stream %s", expectedSamplePair.Timestamp.UnixNano(), + actualSamplePair.Timestamp.UnixNano(), expectedStream.Labels) } - if expectedStream.Entries[i].Line != actualStream.Entries[i].Line { - return nil, fmt.Errorf("expected line %s for timestamp %v but got %s for stream %s", - expectedStream.Entries[i].Line, expectedStream.Entries[i].Timestamp.UnixNano(), actualStream.Entries[i].Line, expectedStream.Labels) + if expectedSamplePair.Line != actualSamplePair.Line { + return nil, fmt.Errorf("expected line %s for timestamp %v but got %s for stream %s", expectedSamplePair.Line, + expectedSamplePair.Timestamp.UnixNano(), actualSamplePair.Line, expectedStream.Labels) } } } From 364463233f493abf08431e951852d230d1700015 Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Mon, 20 Jan 2025 17:02:48 +0530 Subject: [PATCH 6/8] more tests --- tools/querytee/response_comparator_test.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/tools/querytee/response_comparator_test.go b/tools/querytee/response_comparator_test.go index aef8525ddcebf..f179eec8eeed5 100644 --- a/tools/querytee/response_comparator_test.go +++ b/tools/querytee/response_comparator_test.go @@ -151,10 +151,11 @@ func TestCompareMatrix_SamplesOutsideComparableWindow(t *testing.T) { { name: "skip entire series", expected: json.RawMessage(`[ - {"metric":{"foo":"bar"},"values":[[5,"1"],[94,"4"],[96,"5"]]} - ]`), + {"metric":{"foo":"bar"},"values":[[50,"1"],[75,"2"]]}, + {"metric":{"foo":"buzz"},"values":[[5,"1"],[9,"4"],[96,"5"]]} + ]`), // skip comparing {"foo":"buzz"} actual: json.RawMessage(`[ - {"metric":{"foo":"bar"},"values":[[5,"0"],[96,"1"]]} + {"metric":{"foo":"bar"},"values":[[50,"1"],[75,"2"],[95,"3"]]} ]`), skipSamplesBefore: time.Unix(10, 0), skipRecentSamples: 10 * time.Second, @@ -618,6 +619,18 @@ func TestCompareStreams_SamplesOutsideComparableWindow(t *testing.T) { skipRecentSamples: 10 * time.Nanosecond, evaluationTime: time.Unix(0, 100), }, + { + name: "skip both recent and old samples", + expected: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["5","1"],["15","2"],["50","3"],["95","4"]]} + ]`), + actual: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["15","2"],["50","3"]]} + ]`), + skipRecentSamples: 10 * time.Nanosecond, + skipSamplesBefore: time.Unix(0, 10), + evaluationTime: time.Unix(0, 100), + }, } { t.Run(tc.name, func(t *testing.T) { summary, err := compareStreams(tc.expected, tc.actual, tc.evaluationTime, SampleComparisonOptions{ From a2fcc43c973cc2995a426791e115361846b7d02d Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Mon, 20 Jan 2025 18:35:31 +0530 Subject: [PATCH 7/8] assume BACKWARD direction --- tools/querytee/response_comparator.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tools/querytee/response_comparator.go b/tools/querytee/response_comparator.go index 23d06ad0c0a01..15b4bedf121aa 100644 --- a/tools/querytee/response_comparator.go +++ b/tools/querytee/response_comparator.go @@ -386,9 +386,10 @@ func compareStreams(expectedRaw, actualRaw json.RawMessage, evaluationTime time. err := fmt.Errorf("expected %d values for stream %s but got %d", expectedValuesLen, expectedStream.Labels, actualValuesLen) if expectedValuesLen > 0 && actualValuesLen > 0 { - level.Error(util_log.Logger).Log("msg", err.Error(), "oldest-expected-ts", expectedStream.Entries[0].Timestamp.UnixNano(), - "newest-expected-ts", expectedStream.Entries[expectedValuesLen-1].Timestamp.UnixNano(), - "oldest-actual-ts", actualStream.Entries[0].Timestamp.UnixNano(), "newest-actual-ts", actualStream.Entries[actualValuesLen-1].Timestamp.UnixNano()) + // assuming BACKWARD search since that is the default ordering + level.Error(util_log.Logger).Log("msg", err.Error(), "newest-expected-ts", expectedStream.Entries[0].Timestamp.UnixNano(), + "oldest-expected-ts", expectedStream.Entries[expectedValuesLen-1].Timestamp.UnixNano(), + "newest-actual-ts", actualStream.Entries[0].Timestamp.UnixNano(), "oldest-actual-ts", actualStream.Entries[actualValuesLen-1].Timestamp.UnixNano()) } return nil, err } From efebcce67b9911f0cdd60829b8edb5bf2ec37350 Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Tue, 21 Jan 2025 19:42:47 +0530 Subject: [PATCH 8/8] update tests to check for samples on the boundary --- tools/querytee/response_comparator_test.go | 70 +++++++++++++++++++--- 1 file changed, 61 insertions(+), 9 deletions(-) diff --git a/tools/querytee/response_comparator_test.go b/tools/querytee/response_comparator_test.go index f179eec8eeed5..183a7f51e83be 100644 --- a/tools/querytee/response_comparator_test.go +++ b/tools/querytee/response_comparator_test.go @@ -117,10 +117,10 @@ func TestCompareMatrix_SamplesOutsideComparableWindow(t *testing.T) { { name: "skip samples before window", expected: json.RawMessage(`[ - {"metric":{"foo":"bar"},"values":[[0,"1"],[5,"2"],[15,"3"],[20,"4"]]} + {"metric":{"foo":"bar"},"values":[[0,"1"],[5,"2"],[10,"3"],[20,"4"]]} ]`), actual: json.RawMessage(`[ - {"metric":{"foo":"bar"},"values":[[5,"1"],[15,"3"],[20,"4"]]} + {"metric":{"foo":"bar"},"values":[[5,"1"],[10,"3"],[20,"4"]]} ]`), skipSamplesBefore: time.Unix(10, 0), evaluationTime: time.Unix(100, 0), @@ -128,10 +128,10 @@ func TestCompareMatrix_SamplesOutsideComparableWindow(t *testing.T) { { name: "skip recent samples", expected: json.RawMessage(`[ - {"metric":{"foo":"bar"},"values":[[5,"1"],[25,"2"],[80,"3"],[94,"4"],[96,"5"]]} + {"metric":{"foo":"bar"},"values":[[5,"1"],[25,"2"],[90,"3"],[94,"4"],[96,"5"]]} ]`), actual: json.RawMessage(`[ - {"metric":{"foo":"bar"},"values":[[5,"1"],[25,"2"],[80,"3"],[95, "4"]]} + {"metric":{"foo":"bar"},"values":[[5,"1"],[25,"2"],[90,"3"],[95, "4"]]} ]`), skipRecentSamples: 10 * time.Second, evaluationTime: time.Unix(100, 0), @@ -148,6 +148,32 @@ func TestCompareMatrix_SamplesOutsideComparableWindow(t *testing.T) { skipRecentSamples: 10 * time.Second, evaluationTime: time.Unix(100, 0), }, + { + name: "mismatch in sample value on the right boundary", + expected: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[5,"1"],[25,"2"],[90,"3"],[94,"4"],[96,"5"]]} + ]`), + actual: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[5,"1"],[25,"2"],[90,"4"],[95, "4"]]} + ]`), + skipSamplesBefore: time.Unix(10, 0), + skipRecentSamples: 10 * time.Second, + evaluationTime: time.Unix(100, 0), + err: errors.New("float sample pair does not match for metric {foo=\"bar\"}: expected value 3 for timestamp 90 but got 4"), + }, + { + name: "mismatch in sample value on the left boundary", + expected: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[10,"1"],[25,"2"],[90,"3"],[94,"4"],[96,"5"]]} + ]`), + actual: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[10,"0"],[25,"2"],[90,"3"],[95, "4"]]} + ]`), + skipSamplesBefore: time.Unix(10, 0), + skipRecentSamples: 10 * time.Second, + evaluationTime: time.Unix(100, 0), + err: errors.New("float sample pair does not match for metric {foo=\"bar\"}: expected value 1 for timestamp 10 but got 0"), + }, { name: "skip entire series", expected: json.RawMessage(`[ @@ -173,7 +199,7 @@ func TestCompareMatrix_SamplesOutsideComparableWindow(t *testing.T) { return } require.Error(t, err) - require.Equal(t, tc.err.Error(), err.Error()) + require.ErrorContains(t, err, tc.err.Error()) }) } } @@ -600,10 +626,10 @@ func TestCompareStreams_SamplesOutsideComparableWindow(t *testing.T) { { name: "skip samples before window", expected: json.RawMessage(`[ - {"stream":{"foo":"bar"},"values":[["5","1"],["15","2"],["50","3"],["95","4"]]} + {"stream":{"foo":"bar"},"values":[["5","1"],["10","2"],["50","3"],["95","4"]]} ]`), actual: json.RawMessage(`[ - {"stream":{"foo":"bar"},"values":[["2","0"],["15","2"],["50","3"],["95","4"]]} + {"stream":{"foo":"bar"},"values":[["2","0"],["10","2"],["50","3"],["95","4"]]} ]`), skipSamplesBefore: time.Unix(0, 10), evaluationTime: time.Unix(0, 100), @@ -611,10 +637,10 @@ func TestCompareStreams_SamplesOutsideComparableWindow(t *testing.T) { { name: "skip recent samples", expected: json.RawMessage(`[ - {"stream":{"foo":"bar"},"values":[["5","1"],["15","2"],["50","3"],["95","4"]]} + {"stream":{"foo":"bar"},"values":[["5","1"],["15","2"],["90","3"],["95","4"]]} ]`), actual: json.RawMessage(`[ - {"stream":{"foo":"bar"},"values":[["5","1"],["15","2"],["50","3"]]} + {"stream":{"foo":"bar"},"values":[["5","1"],["15","2"],["90","3"]]} ]`), skipRecentSamples: 10 * time.Nanosecond, evaluationTime: time.Unix(0, 100), @@ -631,6 +657,32 @@ func TestCompareStreams_SamplesOutsideComparableWindow(t *testing.T) { skipSamplesBefore: time.Unix(0, 10), evaluationTime: time.Unix(0, 100), }, + { + name: "mismatch in sample value on the right boundary", + expected: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["5","1"],["15","2"],["50","3"],["90","4"]]} + ]`), + actual: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["15","2"],["50","3"],["90","5"]]} + ]`), + skipRecentSamples: 10 * time.Nanosecond, + skipSamplesBefore: time.Unix(0, 10), + evaluationTime: time.Unix(0, 100), + err: errors.New("expected line 4 for timestamp 90 but got 5 for stream {foo=\"bar\"}"), + }, + { + name: "mismatch in sample value on the left boundary", + expected: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["5","1"],["10","2"],["50","3"],["90","4"]]} + ]`), + actual: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["10","22"],["50","3"],["90","5"]]} + ]`), + skipRecentSamples: 10 * time.Nanosecond, + skipSamplesBefore: time.Unix(0, 10), + evaluationTime: time.Unix(0, 100), + err: errors.New("expected line 2 for timestamp 10 but got 22 for stream {foo=\"bar\"}"), + }, } { t.Run(tc.name, func(t *testing.T) { summary, err := compareStreams(tc.expected, tc.actual, tc.evaluationTime, SampleComparisonOptions{