From 1e8d8a93d9c2dc4881d175d4649d8f2a54e96697 Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Wed, 22 Jan 2025 10:47:09 +0530 Subject: [PATCH] feat(query-tee): improvements to skip samples outside comparison window (#15794) --- cmd/querytee/main.go | 2 + tools/querytee/proxy.go | 7 +- tools/querytee/proxy_endpoint.go | 21 ++- tools/querytee/proxy_endpoint_test.go | 2 +- tools/querytee/proxy_metrics.go | 1 + tools/querytee/proxy_test.go | 2 +- tools/querytee/response_comparator.go | 186 +++++++++++++++--- tools/querytee/response_comparator_test.go | 207 ++++++++++++++++++++- 8 files changed, 380 insertions(+), 48 deletions(-) diff --git a/cmd/querytee/main.go b/cmd/querytee/main.go index 5acebfed85179..92f4187bb1383 100644 --- a/cmd/querytee/main.go +++ b/cmd/querytee/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "os" + "time" "github.com/go-kit/log/level" "github.com/grafana/dskit/log" @@ -62,6 +63,7 @@ func lokiReadRoutes(cfg Config) []querytee.Route { Tolerance: cfg.ProxyConfig.ValueComparisonTolerance, UseRelativeError: cfg.ProxyConfig.UseRelativeError, SkipRecentSamples: cfg.ProxyConfig.SkipRecentSamples, + SkipSamplesBefore: time.Time(cfg.ProxyConfig.SkipSamplesBefore), }) return []querytee.Route{ diff --git a/tools/querytee/proxy.go b/tools/querytee/proxy.go index d5d842e11c3c5..392bd008a439a 100644 --- a/tools/querytee/proxy.go +++ b/tools/querytee/proxy.go @@ -17,6 +17,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gorilla/mux" + "github.com/grafana/dskit/flagext" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -34,6 +35,7 @@ type ProxyConfig struct { UseRelativeError bool PassThroughNonRegisteredRoutes bool SkipRecentSamples time.Duration + SkipSamplesBefore flagext.Time RequestURLFilter *regexp.Regexp InstrumentCompares bool } @@ -48,10 +50,11 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&cfg.ValueComparisonTolerance, "proxy.value-comparison-tolerance", 0.000001, "The tolerance to apply when comparing floating point values in the responses. 0 to disable tolerance and require exact match (not recommended).") f.BoolVar(&cfg.UseRelativeError, "proxy.compare-use-relative-error", false, "Use relative error tolerance when comparing floating point values.") f.DurationVar(&cfg.SkipRecentSamples, "proxy.compare-skip-recent-samples", 60*time.Second, "The window from now to skip comparing samples. 0 to disable.") + f.Var(&cfg.SkipSamplesBefore, "proxy.compare-skip-samples-before", "Skip the samples before the given time for comparison. The time can be in RFC3339 format (or) RFC3339 without the timezone and seconds (or) date only.") f.BoolVar(&cfg.PassThroughNonRegisteredRoutes, "proxy.passthrough-non-registered-routes", false, "Passthrough requests for non-registered routes to preferred backend.") - f.Func("backend.filter", "A request filter as a regular expression. Only matches are proxied to non-preferred backends.", func(raw string) error{ + f.Func("backend.filter", "A request filter as a regular expression. Only matches are proxied to non-preferred backends.", func(raw string) error { var err error - cfg.RequestURLFilter, err = regexp.Compile(raw) + cfg.RequestURLFilter, err = regexp.Compile(raw) return err }) f.BoolVar(&cfg.InstrumentCompares, "proxy.compare-instrument", false, "Reports metrics on comparisons of responses between preferred and non-preferred endpoints for supported routes.") diff --git a/tools/querytee/proxy_endpoint.go b/tools/querytee/proxy_endpoint.go index 7a6779b6ad880..e2fd53e52f4f4 100644 --- a/tools/querytee/proxy_endpoint.go +++ b/tools/querytee/proxy_endpoint.go @@ -15,10 +15,11 @@ import ( ) type ResponsesComparator interface { - Compare(expected, actual []byte) (*ComparisonSummary, error) + Compare(expected, actual []byte, queryEvaluationTime time.Time) (*ComparisonSummary, error) } type ComparisonSummary struct { + skipped bool missingMetrics int } @@ -175,13 +176,15 @@ func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *back actualResponse := responses[i] result := comparisonSuccess - summary, err := p.compareResponses(expectedResponse, actualResponse) + summary, err := p.compareResponses(expectedResponse, actualResponse, time.Now().UTC()) if err != nil { level.Error(p.logger).Log("msg", "response comparison failed", "backend-name", p.backends[i].name, "route-name", p.routeName, "query", r.URL.RawQuery, "err", err) result = comparisonFailed + } else if summary != nil && summary.skipped { + result = comparisonSkipped } if p.instrumentCompares && summary != nil { @@ -227,10 +230,18 @@ func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *backendResp return responses[0] } -func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backendResponse) (*ComparisonSummary, error) { +func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backendResponse, queryEvalTime time.Time) (*ComparisonSummary, error) { + if expectedResponse.err != nil { + return &ComparisonSummary{skipped: true}, nil + } + + if actualResponse.err != nil { + return nil, fmt.Errorf("skipped comparison of response because the request to the secondary backend failed: %w", actualResponse.err) + } + // compare response body only if we get a 200 if expectedResponse.status != 200 { - return nil, fmt.Errorf("skipped comparison of response because we got status code %d from preferred backend's response", expectedResponse.status) + return &ComparisonSummary{skipped: true}, nil } if actualResponse.status != 200 { @@ -241,7 +252,7 @@ func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backe return nil, fmt.Errorf("expected status code %d but got %d", expectedResponse.status, actualResponse.status) } - return p.comparator.Compare(expectedResponse.body, actualResponse.body) + return p.comparator.Compare(expectedResponse.body, actualResponse.body, queryEvalTime) } type backendResponse struct { diff --git a/tools/querytee/proxy_endpoint_test.go b/tools/querytee/proxy_endpoint_test.go index 5cfee42b504dd..3c2553856fe33 100644 --- a/tools/querytee/proxy_endpoint_test.go +++ b/tools/querytee/proxy_endpoint_test.go @@ -421,6 +421,6 @@ func Test_backendResponse_statusCode(t *testing.T) { type mockComparator struct{} -func (c *mockComparator) Compare(_, _ []byte) (*ComparisonSummary, error) { +func (c *mockComparator) Compare(_, _ []byte, _ time.Time) (*ComparisonSummary, error) { return &ComparisonSummary{missingMetrics: 12}, nil } diff --git a/tools/querytee/proxy_metrics.go b/tools/querytee/proxy_metrics.go index eb2284517d47c..9637438be486d 100644 --- a/tools/querytee/proxy_metrics.go +++ b/tools/querytee/proxy_metrics.go @@ -8,6 +8,7 @@ import ( const ( comparisonSuccess = "success" comparisonFailed = "fail" + comparisonSkipped = "skipped" unknownIssuer = "unknown" canaryIssuer = "loki-canary" diff --git a/tools/querytee/proxy_test.go b/tools/querytee/proxy_test.go index 22b122bcc2aa7..6db5caeadb762 100644 --- a/tools/querytee/proxy_test.go +++ b/tools/querytee/proxy_test.go @@ -25,7 +25,7 @@ var testWriteRoutes = []Route{} type testComparator struct{} -func (testComparator) Compare(_, _ []byte) (*ComparisonSummary, error) { return nil, nil } +func (testComparator) Compare(_, _ []byte, _ time.Time) (*ComparisonSummary, error) { return nil, nil } func Test_NewProxy(t *testing.T) { cfg := ProxyConfig{} diff --git a/tools/querytee/response_comparator.go b/tools/querytee/response_comparator.go index 04a28fff85c1d..15b4bedf121aa 100644 --- a/tools/querytee/response_comparator.go +++ b/tools/querytee/response_comparator.go @@ -17,7 +17,7 @@ import ( ) // SamplesComparatorFunc helps with comparing different types of samples coming from /api/v1/query and /api/v1/query_range routes. -type SamplesComparatorFunc func(expected, actual json.RawMessage, opts SampleComparisonOptions) (*ComparisonSummary, error) +type SamplesComparatorFunc func(expected, actual json.RawMessage, evaluationTime time.Time, opts SampleComparisonOptions) (*ComparisonSummary, error) type SamplesResponse struct { Status string @@ -31,6 +31,20 @@ type SampleComparisonOptions struct { Tolerance float64 UseRelativeError bool SkipRecentSamples time.Duration + SkipSamplesBefore time.Time +} + +func (opts *SampleComparisonOptions) SkipSample(sampleTime, evaluationTime time.Time) bool { + // Skip if sample is too old + if !opts.SkipSamplesBefore.IsZero() && sampleTime.Before(opts.SkipSamplesBefore) { + return true + } + + // Skip if sample is too recent + if opts.SkipRecentSamples > 0 && sampleTime.After(evaluationTime.Add(-opts.SkipRecentSamples)) { + return true + } + return false } func NewSamplesComparator(opts SampleComparisonOptions) *SamplesComparator { @@ -55,7 +69,7 @@ func (s *SamplesComparator) RegisterSamplesType(samplesType string, comparator S s.sampleTypesComparator[samplesType] = comparator } -func (s *SamplesComparator) Compare(expectedResponse, actualResponse []byte) (*ComparisonSummary, error) { +func (s *SamplesComparator) Compare(expectedResponse, actualResponse []byte, evaluationTime time.Time) (*ComparisonSummary, error) { var expected, actual SamplesResponse err := json.Unmarshal(expectedResponse, &expected) @@ -81,10 +95,10 @@ func (s *SamplesComparator) Compare(expectedResponse, actualResponse []byte) (*C return nil, fmt.Errorf("resultType %s not registered for comparison", expected.Data.ResultType) } - return comparator(expected.Data.Result, actual.Data.Result, s.opts) + return comparator(expected.Data.Result, actual.Data.Result, evaluationTime, s.opts) } -func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparisonOptions) (*ComparisonSummary, error) { +func compareMatrix(expectedRaw, actualRaw json.RawMessage, evaluationTime time.Time, opts SampleComparisonOptions) (*ComparisonSummary, error) { var expected, actual model.Matrix err := json.Unmarshal(expectedRaw, &expected) @@ -96,7 +110,23 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparison return nil, errors.Wrap(err, "unable to unmarshal actual matrix") } + // Filter out samples outside the comparable window + if !opts.SkipSamplesBefore.IsZero() || opts.SkipRecentSamples > 0 { + expected = filterSamplesOutsideWindow(expected, func(sampleTime time.Time) bool { + return opts.SkipSample(sampleTime, evaluationTime) + }) + actual = filterSamplesOutsideWindow(actual, func(sampleTime time.Time) bool { + return opts.SkipSample(sampleTime, evaluationTime) + }) + } + + // If both matrices are empty after filtering, we can skip comparison + if len(expected) == 0 && len(actual) == 0 { + return &ComparisonSummary{skipped: true}, nil + } + if len(expected) != len(actual) { + // TODO: log the missing metrics return nil, fmt.Errorf("expected %d metrics but got %d", len(expected), len(actual)) } @@ -113,33 +143,64 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparison } actualMetric := actual[actualMetricIndex] - expectedMetricLen := len(expectedMetric.Values) - actualMetricLen := len(actualMetric.Values) - - if expectedMetricLen != actualMetricLen { - err := fmt.Errorf("expected %d samples for metric %s but got %d", expectedMetricLen, - expectedMetric.Metric, actualMetricLen) - if expectedMetricLen > 0 && actualMetricLen > 0 { - level.Error(util_log.Logger).Log("msg", err.Error(), "oldest-expected-ts", expectedMetric.Values[0].Timestamp, - "newest-expected-ts", expectedMetric.Values[expectedMetricLen-1].Timestamp, - "oldest-actual-ts", actualMetric.Values[0].Timestamp, "newest-actual-ts", actualMetric.Values[actualMetricLen-1].Timestamp) - } - return nil, err + + err := compareMatrixSamples(expectedMetric, actualMetric, opts) + if err != nil { + return nil, fmt.Errorf("%w\nExpected result for series:\n%v\n\nActual result for series:\n%v", err, expectedMetric, actualMetric) } + } + + return nil, nil +} + +func compareMatrixSamples(expected, actual *model.SampleStream, opts SampleComparisonOptions) error { + expectedEntriesCount := len(expected.Values) + actualEntriesCount := len(actual.Values) + + if expectedEntriesCount != actualEntriesCount { + err := fmt.Errorf("expected %d samples for metric %s but got %d", expectedEntriesCount, expected.Metric, actualEntriesCount) + if actualEntriesCount > 0 && expectedEntriesCount > 0 { + level.Error(util_log.Logger).Log("msg", err.Error(), + "oldest-expected-ts", expected.Values[0].Timestamp, + "newest-expected-ts", expected.Values[expectedEntriesCount-1].Timestamp, + "oldest-actual-ts", actual.Values[0].Timestamp, + "newest-actual-ts", actual.Values[actualEntriesCount-1].Timestamp) + } + return err + } + + for i := range expected.Values { + err := compareSamplePair(expected.Values[i], actual.Values[i], opts) + if err != nil { + return fmt.Errorf("float sample pair does not match for metric %s: %w", expected.Metric, err) + } + } + + return nil +} - for i, expectedSamplePair := range expectedMetric.Values { - actualSamplePair := actualMetric.Values[i] - err := compareSamplePair(expectedSamplePair, actualSamplePair, opts) - if err != nil { - return nil, errors.Wrapf(err, "sample pair not matching for metric %s", expectedMetric.Metric) +func filterSamplesOutsideWindow(matrix model.Matrix, skipSample func(time.Time) bool) model.Matrix { + result := matrix[:0] // Reuse the original slice capacity while starting with length 0 + + for _, series := range matrix { + // Reuse the original Values slice + filteredValues := series.Values[:0] + for _, sample := range series.Values { + if !skipSample(sample.Timestamp.Time()) { + filteredValues = append(filteredValues, sample) } } + + if len(filteredValues) > 0 { + series.Values = filteredValues + result = append(result, series) + } } - return nil, nil + return result } -func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparisonOptions) (*ComparisonSummary, error) { +func compareVector(expectedRaw, actualRaw json.RawMessage, evaluationTime time.Time, opts SampleComparisonOptions) (*ComparisonSummary, error) { var expected, actual model.Vector err := json.Unmarshal(expectedRaw, &expected) @@ -152,6 +213,29 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparison return nil, errors.Wrap(err, "unable to unmarshal actual vector") } + // Filter out samples outside the comparable windows + if !opts.SkipSamplesBefore.IsZero() || opts.SkipRecentSamples > 0 { + filtered := expected[:0] + for i := range expected { + if !opts.SkipSample(expected[i].Timestamp.Time(), evaluationTime) { + filtered = append(filtered, expected[i]) + } + } + expected = filtered + + filtered = actual[:0] + for i := range actual { + if !opts.SkipSample(actual[i].Timestamp.Time(), evaluationTime) { + filtered = append(filtered, actual[i]) + } + } + actual = filtered + } + + if len(expected) == 0 && len(actual) == 0 { + return &ComparisonSummary{skipped: true}, nil + } + if len(expected) != len(actual) { return nil, fmt.Errorf("expected %d metrics but got %d", len(expected), len(actual)) @@ -198,7 +282,7 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparison return &ComparisonSummary{missingMetrics: len(missingMetrics)}, err } -func compareScalar(expectedRaw, actualRaw json.RawMessage, opts SampleComparisonOptions) (*ComparisonSummary, error) { +func compareScalar(expectedRaw, actualRaw json.RawMessage, evaluationTime time.Time, opts SampleComparisonOptions) (*ComparisonSummary, error) { var expected, actual model.Scalar err := json.Unmarshal(expectedRaw, &expected) if err != nil { @@ -210,6 +294,10 @@ func compareScalar(expectedRaw, actualRaw json.RawMessage, opts SampleComparison return nil, errors.Wrap(err, "unable to actual expected scalar") } + if opts.SkipSample(expected.Timestamp.Time(), evaluationTime) && opts.SkipSample(actual.Timestamp.Time(), evaluationTime) { + return &ComparisonSummary{skipped: true}, nil + } + return nil, compareSamplePair(model.SamplePair{ Timestamp: expected.Timestamp, Value: expected.Value, @@ -223,9 +311,6 @@ func compareSamplePair(expected, actual model.SamplePair, opts SampleComparisonO if expected.Timestamp != actual.Timestamp { return fmt.Errorf("expected timestamp %v but got %v", expected.Timestamp, actual.Timestamp) } - if opts.SkipRecentSamples > 0 && time.Since(expected.Timestamp.Time()) < opts.SkipRecentSamples { - return nil - } if !compareSampleValue(expected.Value, actual.Value, opts) { return fmt.Errorf("expected value %s for timestamp %v but got %s", expected.Value, expected.Timestamp, actual.Value) } @@ -250,7 +335,7 @@ func compareSampleValue(first, second model.SampleValue, opts SampleComparisonOp return math.Abs(f-s) <= opts.Tolerance } -func compareStreams(expectedRaw, actualRaw json.RawMessage, _ SampleComparisonOptions) (*ComparisonSummary, error) { +func compareStreams(expectedRaw, actualRaw json.RawMessage, evaluationTime time.Time, opts SampleComparisonOptions) (*ComparisonSummary, error) { var expected, actual loghttp.Streams err := jsoniter.Unmarshal(expectedRaw, &expected) @@ -262,7 +347,23 @@ func compareStreams(expectedRaw, actualRaw json.RawMessage, _ SampleComparisonOp return nil, errors.Wrap(err, "unable to unmarshal actual streams") } + // Filter out entries outside the comparable window + if !opts.SkipSamplesBefore.IsZero() || opts.SkipRecentSamples > 0 { + expected = filterStreamsOutsideWindow(expected, func(entryTime time.Time) bool { + return opts.SkipSample(entryTime, evaluationTime) + }) + actual = filterStreamsOutsideWindow(actual, func(entryTime time.Time) bool { + return opts.SkipSample(entryTime, evaluationTime) + }) + } + + // If both streams are empty after filtering, we can skip comparison + if len(expected) == 0 && len(actual) == 0 { + return &ComparisonSummary{skipped: true}, nil + } + if len(expected) != len(actual) { + // TODO: log the missing stream return nil, fmt.Errorf("expected %d streams but got %d", len(expected), len(actual)) } @@ -285,9 +386,10 @@ func compareStreams(expectedRaw, actualRaw json.RawMessage, _ SampleComparisonOp err := fmt.Errorf("expected %d values for stream %s but got %d", expectedValuesLen, expectedStream.Labels, actualValuesLen) if expectedValuesLen > 0 && actualValuesLen > 0 { - level.Error(util_log.Logger).Log("msg", err.Error(), "oldest-expected-ts", expectedStream.Entries[0].Timestamp.UnixNano(), - "newest-expected-ts", expectedStream.Entries[expectedValuesLen-1].Timestamp.UnixNano(), - "oldest-actual-ts", actualStream.Entries[0].Timestamp.UnixNano(), "newest-actual-ts", actualStream.Entries[actualValuesLen-1].Timestamp.UnixNano()) + // assuming BACKWARD search since that is the default ordering + level.Error(util_log.Logger).Log("msg", err.Error(), "newest-expected-ts", expectedStream.Entries[0].Timestamp.UnixNano(), + "oldest-expected-ts", expectedStream.Entries[expectedValuesLen-1].Timestamp.UnixNano(), + "newest-actual-ts", actualStream.Entries[0].Timestamp.UnixNano(), "oldest-actual-ts", actualStream.Entries[actualValuesLen-1].Timestamp.UnixNano()) } return nil, err } @@ -307,3 +409,25 @@ func compareStreams(expectedRaw, actualRaw json.RawMessage, _ SampleComparisonOp return nil, nil } + +// filterStreamsOutsideWindow filters out entries that are outside the comparable window +func filterStreamsOutsideWindow(streams loghttp.Streams, skipEntry func(time.Time) bool) loghttp.Streams { + result := streams[:0] // Reuse the original slice capacity while starting with length 0 + + for _, stream := range streams { + // Reuse the original Entries slice + filteredEntries := stream.Entries[:0] + for _, entry := range stream.Entries { + if !skipEntry(entry.Timestamp) { + filteredEntries = append(filteredEntries, entry) + } + } + + if len(filteredEntries) > 0 { + stream.Entries = filteredEntries + result = append(result, stream) + } + } + + return result +} diff --git a/tools/querytee/response_comparator_test.go b/tools/querytee/response_comparator_test.go index ce53fd15ce7ce..183a7f51e83be 100644 --- a/tools/querytee/response_comparator_test.go +++ b/tools/querytee/response_comparator_test.go @@ -70,7 +70,7 @@ func TestCompareMatrix(t *testing.T) { {"metric":{"foo":"bar"},"values":[[1,"1"],[3,"2"]]} ]`), // timestamps are parsed from seconds to ms which are then added to errors as is so adding 3 0s to expected error. - err: errors.New("sample pair not matching for metric {foo=\"bar\"}: expected timestamp 2 but got 3"), + err: errors.New("float sample pair does not match for metric {foo=\"bar\"}: expected timestamp 2 but got 3"), }, { name: "difference in sample value", @@ -80,7 +80,7 @@ func TestCompareMatrix(t *testing.T) { actual: json.RawMessage(`[ {"metric":{"foo":"bar"},"values":[[1,"1"],[2,"3"]]} ]`), - err: errors.New("sample pair not matching for metric {foo=\"bar\"}: expected value 2 for timestamp 2 but got 3"), + err: errors.New("float sample pair does not match for metric {foo=\"bar\"}: expected value 2 for timestamp 2 but got 3"), }, { name: "correct samples", @@ -93,13 +93,113 @@ func TestCompareMatrix(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - _, err := compareMatrix(tc.expected, tc.actual, SampleComparisonOptions{}) + _, err := compareMatrix(tc.expected, tc.actual, time.Now(), SampleComparisonOptions{}) if tc.err == nil { require.NoError(t, err) return } require.Error(t, err) - require.Equal(t, tc.err.Error(), err.Error()) + require.ErrorContains(t, err, tc.err.Error()) + }) + } +} + +func TestCompareMatrix_SamplesOutsideComparableWindow(t *testing.T) { + for _, tc := range []struct { + name string + expected json.RawMessage + actual json.RawMessage + skipSamplesBefore time.Time + skipRecentSamples time.Duration + evaluationTime time.Time + err error + }{ + { + name: "skip samples before window", + expected: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[0,"1"],[5,"2"],[10,"3"],[20,"4"]]} + ]`), + actual: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[5,"1"],[10,"3"],[20,"4"]]} + ]`), + skipSamplesBefore: time.Unix(10, 0), + evaluationTime: time.Unix(100, 0), + }, + { + name: "skip recent samples", + expected: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[5,"1"],[25,"2"],[90,"3"],[94,"4"],[96,"5"]]} + ]`), + actual: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[5,"1"],[25,"2"],[90,"3"],[95, "4"]]} + ]`), + skipRecentSamples: 10 * time.Second, + evaluationTime: time.Unix(100, 0), + }, + { + name: "skip both recent and old samples", + expected: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[5,"1"],[25,"2"],[80,"3"],[94,"4"],[96,"5"]]} + ]`), + actual: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[5,"0"],[25,"2"],[80,"3"],[95, "4"]]} + ]`), + skipSamplesBefore: time.Unix(10, 0), + skipRecentSamples: 10 * time.Second, + evaluationTime: time.Unix(100, 0), + }, + { + name: "mismatch in sample value on the right boundary", + expected: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[5,"1"],[25,"2"],[90,"3"],[94,"4"],[96,"5"]]} + ]`), + actual: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[5,"1"],[25,"2"],[90,"4"],[95, "4"]]} + ]`), + skipSamplesBefore: time.Unix(10, 0), + skipRecentSamples: 10 * time.Second, + evaluationTime: time.Unix(100, 0), + err: errors.New("float sample pair does not match for metric {foo=\"bar\"}: expected value 3 for timestamp 90 but got 4"), + }, + { + name: "mismatch in sample value on the left boundary", + expected: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[10,"1"],[25,"2"],[90,"3"],[94,"4"],[96,"5"]]} + ]`), + actual: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[10,"0"],[25,"2"],[90,"3"],[95, "4"]]} + ]`), + skipSamplesBefore: time.Unix(10, 0), + skipRecentSamples: 10 * time.Second, + evaluationTime: time.Unix(100, 0), + err: errors.New("float sample pair does not match for metric {foo=\"bar\"}: expected value 1 for timestamp 10 but got 0"), + }, + { + name: "skip entire series", + expected: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[50,"1"],[75,"2"]]}, + {"metric":{"foo":"buzz"},"values":[[5,"1"],[9,"4"],[96,"5"]]} + ]`), // skip comparing {"foo":"buzz"} + actual: json.RawMessage(`[ + {"metric":{"foo":"bar"},"values":[[50,"1"],[75,"2"],[95,"3"]]} + ]`), + skipSamplesBefore: time.Unix(10, 0), + skipRecentSamples: 10 * time.Second, + evaluationTime: time.Unix(100, 0), + }, + } { + t.Run(tc.name, func(t *testing.T) { + _, err := compareMatrix(tc.expected, tc.actual, tc.evaluationTime, SampleComparisonOptions{ + SkipSamplesBefore: tc.skipSamplesBefore, + SkipRecentSamples: tc.skipRecentSamples, + }) + + if tc.err == nil { + require.NoError(t, err) + return + } + require.Error(t, err) + require.ErrorContains(t, err, tc.err.Error()) }) } } @@ -176,7 +276,7 @@ func TestCompareVector(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - _, err := compareVector(tc.expected, tc.actual, SampleComparisonOptions{}) + _, err := compareVector(tc.expected, tc.actual, time.Now(), SampleComparisonOptions{}) if tc.err == nil { require.NoError(t, err) return @@ -213,7 +313,7 @@ func TestCompareScalar(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - _, err := compareScalar(tc.expected, tc.actual, SampleComparisonOptions{}) + _, err := compareScalar(tc.expected, tc.actual, time.Now(), SampleComparisonOptions{}) if tc.err == nil { require.NoError(t, err) return @@ -408,7 +508,7 @@ func TestCompareSamplesResponse(t *testing.T) { UseRelativeError: tc.useRelativeError, SkipRecentSamples: tc.skipRecentSamples, }) - _, err := samplesComparator.Compare(tc.expected, tc.actual) + _, err := samplesComparator.Compare(tc.expected, tc.actual, time.Now()) if tc.err == nil { require.NoError(t, err) return @@ -501,9 +601,100 @@ func TestCompareStreams(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - _, err := compareStreams(tc.expected, tc.actual, SampleComparisonOptions{Tolerance: 0}) + _, err := compareStreams(tc.expected, tc.actual, time.Now(), SampleComparisonOptions{Tolerance: 0}) + if tc.err == nil { + require.NoError(t, err) + return + } + require.Error(t, err) + require.Equal(t, tc.err.Error(), err.Error()) + }) + } +} + +func TestCompareStreams_SamplesOutsideComparableWindow(t *testing.T) { + for _, tc := range []struct { + name string + expected json.RawMessage + actual json.RawMessage + skipSamplesBefore time.Time + skipRecentSamples time.Duration + evaluationTime time.Time + err error + }{ + // stream entry timestamp is in ns + { + name: "skip samples before window", + expected: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["5","1"],["10","2"],["50","3"],["95","4"]]} + ]`), + actual: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["2","0"],["10","2"],["50","3"],["95","4"]]} + ]`), + skipSamplesBefore: time.Unix(0, 10), + evaluationTime: time.Unix(0, 100), + }, + { + name: "skip recent samples", + expected: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["5","1"],["15","2"],["90","3"],["95","4"]]} + ]`), + actual: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["5","1"],["15","2"],["90","3"]]} + ]`), + skipRecentSamples: 10 * time.Nanosecond, + evaluationTime: time.Unix(0, 100), + }, + { + name: "skip both recent and old samples", + expected: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["5","1"],["15","2"],["50","3"],["95","4"]]} + ]`), + actual: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["15","2"],["50","3"]]} + ]`), + skipRecentSamples: 10 * time.Nanosecond, + skipSamplesBefore: time.Unix(0, 10), + evaluationTime: time.Unix(0, 100), + }, + { + name: "mismatch in sample value on the right boundary", + expected: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["5","1"],["15","2"],["50","3"],["90","4"]]} + ]`), + actual: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["15","2"],["50","3"],["90","5"]]} + ]`), + skipRecentSamples: 10 * time.Nanosecond, + skipSamplesBefore: time.Unix(0, 10), + evaluationTime: time.Unix(0, 100), + err: errors.New("expected line 4 for timestamp 90 but got 5 for stream {foo=\"bar\"}"), + }, + { + name: "mismatch in sample value on the left boundary", + expected: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["5","1"],["10","2"],["50","3"],["90","4"]]} + ]`), + actual: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["10","22"],["50","3"],["90","5"]]} + ]`), + skipRecentSamples: 10 * time.Nanosecond, + skipSamplesBefore: time.Unix(0, 10), + evaluationTime: time.Unix(0, 100), + err: errors.New("expected line 2 for timestamp 10 but got 22 for stream {foo=\"bar\"}"), + }, + } { + t.Run(tc.name, func(t *testing.T) { + summary, err := compareStreams(tc.expected, tc.actual, tc.evaluationTime, SampleComparisonOptions{ + SkipSamplesBefore: tc.skipSamplesBefore, + SkipRecentSamples: tc.skipRecentSamples, + }) + if tc.err == nil { require.NoError(t, err) + if summary != nil { + require.True(t, summary.skipped) + } return } require.Error(t, err)