From 0b5de2aecd601f8f36c5e38568095f2ca927eed0 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 9 Aug 2020 14:22:28 -0400 Subject: [PATCH] adding a bunch of tests fixing up how the MockQuerier works. --- pkg/logcli/query/query.go | 63 +++++-- pkg/logcli/query/query_test.go | 332 +++++++++++++++++++++++++++++---- pkg/logql/test_utils.go | 22 ++- 3 files changed, 367 insertions(+), 50 deletions(-) diff --git a/pkg/logcli/query/query.go b/pkg/logcli/query/query.go index 8a2d5582c3dd9..d83287cc99736 100644 --- a/pkg/logcli/query/query.go +++ b/pkg/logcli/query/query.go @@ -88,13 +88,16 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) total := 0 start := q.Start end := q.End - var lastEntry *loghttp.Entry + var lastEntry []*loghttp.Entry for total < q.Limit { bs := q.BatchSize + // We want to truncate the batch size if the remaining number + // of items needed to reach the limit is less than the batch size if q.Limit-total < q.BatchSize { - // Have to add one because of the timestamp overlap described below, the first result - // will always be the last result of the last batch. - bs = q.Limit - total + 1 + // Truncated batchsize is q.Limit - total, however we add to this + // the length of the overlap from the last query to make sure we get the + // correct amount of new logs knowing there will be some overlapping logs returned. + bs = q.Limit - total + len(lastEntry) } resp, err = c.QueryRange(q.QueryString, bs, start, end, d, q.Step, q.Interval, q.Quiet) if err != nil { @@ -111,13 +114,21 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) break } // Also no result, wouldn't expect to hit this. - if lastEntry == nil { + if lastEntry == nil || len(lastEntry) == 0 { break } // Can only happen if all the results return in one request if resultLength == q.Limit { break } + if len(lastEntry) >= q.BatchSize { + log.Fatalf("Invalid batch size %v, the next query will have %v overlapping entries "+ + "(there will always be 1 overlapping entry but Loki allows multiple entries to have "+ + "the same timestamp, so when a batch ends in this scenario the next query will include "+ + "all the overlapping entries again). Please increase your batch size to at least %v to account "+ + "for overlapping entryes\n", q.BatchSize, len(lastEntry), len(lastEntry)+1) + } + // Batching works by taking the timestamp of the last query and using it in the next query, // because Loki supports multiple entries with the same timestamp it's possible for a batch to have // fallen in the middle of a list of entries for the same time, so to make sure we get all entries @@ -127,12 +138,13 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) // to get the desired limit. total += resultLength // Based on the query direction we either set the start or end for the next query. + // If there are multiple entries in `lastEntry` they have to have the same timestamp so we can pick just the first if q.Forward { - start = lastEntry.Timestamp + start = lastEntry[0].Timestamp } else { // The end timestamp is exclusive on a backward query, so to make sure we get back an overlapping result // fudge the timestamp forward in time to make sure to get the last entry from this batch in the next query - end = lastEntry.Timestamp.Add(1 * time.Nanosecond) + end = lastEntry[0].Timestamp.Add(1 * time.Nanosecond) } } @@ -140,9 +152,9 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) } -func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry *loghttp.Entry) (int, *loghttp.Entry) { +func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) { length := -1 - var entry *loghttp.Entry + var entry []*loghttp.Entry switch value.Type() { case logql.ValueTypeStreams: length, entry = q.printStream(value.(loghttp.Streams), out, lastEntry) @@ -241,7 +253,7 @@ func (q *Query) isInstant() bool { return q.Start == q.End } -func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry *loghttp.Entry) (int, *loghttp.Entry) { +func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) { common := commonLabels(streams) // Remove the labels we want to show from common @@ -304,14 +316,39 @@ func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastE printed := 0 for _, e := range allEntries { // Skip the last entry if it overlaps, this happens because batching includes the last entry from the last batch - if lastEntry != nil && e.entry.Timestamp == lastEntry.Timestamp && e.entry.Line == lastEntry.Line { - continue + if lastEntry != nil && len(lastEntry) > 0 && e.entry.Timestamp == lastEntry[0].Timestamp { + skip := false + // Because many logs can share a timestamp in the unlucky event a batch ends with a timestamp + // shared by multiple entries we have to check all that were stored to see if we've already + // printed them. + for _, le := range lastEntry { + if e.entry.Line == le.Line { + skip = true + } + } + if skip { + continue + } } out.FormatAndPrintln(e.entry.Timestamp, e.labels, maxLabelsLen, e.entry.Line) printed++ } - return printed, &allEntries[len(allEntries)-1].entry + // Loki allows multiple entries at the same timestamp, this is a bit of a mess if a batch ends + // with an entry that shared multiple timestamps, so we need to keep a list of all these entries + // because the next query is going to contain them too and we want to not duplicate anything already + // printed. + lel := []*loghttp.Entry{} + // Start with the timestamp of the last entry + le := allEntries[len(allEntries)-1].entry + for i, e := range allEntries { + // Save any entry which has this timestamp (most of the time this will only be the single last entry) + if e.entry.Timestamp.Equal(le.Timestamp) { + lel = append(lel, &allEntries[i].entry) + } + } + + return printed, lel } func (q *Query) printMatrix(matrix loghttp.Matrix) { diff --git a/pkg/logcli/query/query_test.go b/pkg/logcli/query/query_test.go index cab1325fcea44..49a4be61bab7d 100644 --- a/pkg/logcli/query/query_test.go +++ b/pkg/logcli/query/query_test.go @@ -5,6 +5,7 @@ import ( "context" "log" "reflect" + "strings" "testing" "time" @@ -151,48 +152,304 @@ func Test_subtract(t *testing.T) { func Test_batch(t *testing.T) { tests := []struct { - name string - streams []logproto.Stream - start, end time.Time - limit, batch int - labelMatcher string - forward bool - expected []string + name string + streams []logproto.Stream + start, end time.Time + limit, batch int + labelMatcher string + forward bool + expectedCalls int + expected []string }{ { name: "super simple forward", streams: []logproto.Stream{ logproto.Stream{ - Labels: "{test=\"simple\"}", + Labels: "{test=\"simple\"}", Entries: []logproto.Entry{ - logproto.Entry{ - Timestamp: time.Unix(1, 0), - Line: "line1", - }, - logproto.Entry{ - Timestamp: time.Unix(2, 0), - Line: "line2", - }, - logproto.Entry{ - Timestamp: time.Unix(3, 0), - Line: "line3", - }, + logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"}, + logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"}, + logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, // End timestmap is exclusive }, }, }, - start: time.Unix(1, 0), - end: time.Unix(3, 0), - limit: 10, - batch: 10, - labelMatcher: "{test=\"simple\"}", - forward: true, + start: time.Unix(1, 0), + end: time.Unix(3, 0), + limit: 10, + batch: 10, + labelMatcher: "{test=\"simple\"}", + forward: true, + expectedCalls: 2, // Client doesn't know if the server hit a limit or there were no results so we have to query until there is no results, in this case 2 calls expected: []string{ "line1", "line2", }, - }, - + { + name: "super simple backward", + streams: []logproto.Stream{ + logproto.Stream{ + Labels: "{test=\"simple\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"}, + logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"}, + logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, // End timestmap is exclusive + }, + }, + }, + start: time.Unix(1, 0), + end: time.Unix(3, 0), + limit: 10, + batch: 10, + labelMatcher: "{test=\"simple\"}", + forward: false, + expectedCalls: 2, + expected: []string{ + "line2", + "line1", + }, + }, + { + name: "single stream forward batch", + streams: []logproto.Stream{ + logproto.Stream{ + Labels: "{test=\"simple\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"}, + logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"}, + logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, + logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"}, + logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"}, + logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"}, + logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"}, + logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"}, + logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"}, + }, + }, + }, + start: time.Unix(1, 0), + end: time.Unix(11, 0), + limit: 9, + batch: 2, + labelMatcher: "{test=\"simple\"}", + forward: true, + // Our batchsize is 2 but each query will also return the overlapping last element from the + // previous batch, as such we only get one item per call so we make a lot of calls + // Call one: line1 line2 + // Call two: line2 line3 + // Call three: line3 line4 + // Call four: line4 line5 + // Call five: line5 line6 + // Call six: line6 line7 + // Call seven: line7 line8 + // Call eight: line8 line9 + expectedCalls: 8, + expected: []string{ + "line1", "line2", "line3", "line4", "line5", "line6", "line7", "line8", "line9", + }, + }, + { + name: "single stream backward batch", + streams: []logproto.Stream{ + logproto.Stream{ + Labels: "{test=\"simple\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"}, + logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"}, + logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, + logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"}, + logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"}, + logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"}, + logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"}, + logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"}, + logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"}, + }, + }, + }, + start: time.Unix(1, 0), + end: time.Unix(11, 0), + limit: 9, + batch: 2, + labelMatcher: "{test=\"simple\"}", + forward: false, + expectedCalls: 8, + expected: []string{ + "line10", "line9", "line8", "line7", "line6", "line5", "line4", "line3", "line2", + }, + }, + { + name: "two streams forward batch", + streams: []logproto.Stream{ + logproto.Stream{ + Labels: "{test=\"one\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"}, + logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"}, + logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, + logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"}, + logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"}, + logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"}, + logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"}, + logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"}, + logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"}, + }, + }, + logproto.Stream{ + Labels: "{test=\"two\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 1000), Line: "s2line1"}, + logproto.Entry{Timestamp: time.Unix(2, 1000), Line: "s2line2"}, + logproto.Entry{Timestamp: time.Unix(3, 1000), Line: "s2line3"}, + logproto.Entry{Timestamp: time.Unix(4, 1000), Line: "s2line4"}, + logproto.Entry{Timestamp: time.Unix(5, 1000), Line: "s2line5"}, + logproto.Entry{Timestamp: time.Unix(6, 1000), Line: "s2line6"}, + logproto.Entry{Timestamp: time.Unix(7, 1000), Line: "s2line7"}, + logproto.Entry{Timestamp: time.Unix(8, 1000), Line: "s2line8"}, + logproto.Entry{Timestamp: time.Unix(9, 1000), Line: "s2line9"}, + logproto.Entry{Timestamp: time.Unix(10, 1000), Line: "s2line10"}, + }, + }, + }, + start: time.Unix(1, 0), + end: time.Unix(11, 0), + limit: 12, + batch: 3, + labelMatcher: "{test=~\"one|two\"}", + forward: true, + // Six calls + // 1 line1, s2line1, line2 + // 2 line2, s2line2, line3 + // 3 line3, s2line3, line4 + // 4 line4, s2line4, line5 + // 5 line5, s2line5, line6 + // 6 line6, s2line6 + expectedCalls: 6, + expected: []string{ + "line1", "s2line1", "line2", "s2line2", "line3", "s2line3", "line4", "s2line4", "line5", "s2line5", "line6", "s2line6", + }, + }, + { + name: "two streams backward batch", + streams: []logproto.Stream{ + logproto.Stream{ + Labels: "{test=\"one\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"}, + logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"}, + logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, + logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"}, + logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"}, + logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"}, + logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"}, + logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"}, + logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"}, + }, + }, + logproto.Stream{ + Labels: "{test=\"two\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 1000), Line: "s2line1"}, + logproto.Entry{Timestamp: time.Unix(2, 1000), Line: "s2line2"}, + logproto.Entry{Timestamp: time.Unix(3, 1000), Line: "s2line3"}, + logproto.Entry{Timestamp: time.Unix(4, 1000), Line: "s2line4"}, + logproto.Entry{Timestamp: time.Unix(5, 1000), Line: "s2line5"}, + logproto.Entry{Timestamp: time.Unix(6, 1000), Line: "s2line6"}, + logproto.Entry{Timestamp: time.Unix(7, 1000), Line: "s2line7"}, + logproto.Entry{Timestamp: time.Unix(8, 1000), Line: "s2line8"}, + logproto.Entry{Timestamp: time.Unix(9, 1000), Line: "s2line9"}, + logproto.Entry{Timestamp: time.Unix(10, 1000), Line: "s2line10"}, + }, + }, + }, + start: time.Unix(1, 0), + end: time.Unix(11, 0), + limit: 12, + batch: 3, + labelMatcher: "{test=~\"one|two\"}", + forward: false, + expectedCalls: 6, + expected: []string{ + "s2line10", "line10", "s2line9", "line9", "s2line8", "line8", "s2line7", "line7", "s2line6", "line6", "s2line5", "line5", + }, + }, + { + name: "single stream forward batch identical timestamps", + streams: []logproto.Stream{ + logproto.Stream{ + Labels: "{test=\"simple\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"}, + logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"}, + logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, + logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"}, + logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6a"}, + logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"}, + logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"}, + logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"}, + logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"}, + }, + }, + }, + start: time.Unix(1, 0), + end: time.Unix(11, 0), + limit: 9, + batch: 4, + labelMatcher: "{test=\"simple\"}", + forward: true, + // Our batchsize is 2 but each query will also return the overlapping last element from the + // previous batch, as such we only get one item per call so we make a lot of calls + // Call one: line1 line2 line3 line4 + // Call two: line4 line5 line6 line6a + // Call three: line6 line6a line7 line8 <- notice line 6 and 6a share the same timestamp so they get returned as overlap in the next query. + expectedCalls: 3, + expected: []string{ + "line1", "line2", "line3", "line4", "line5", "line6", "line6a", "line7", "line8", + }, + }, + { + name: "single stream backward batch identical timestamps", + streams: []logproto.Stream{ + logproto.Stream{ + Labels: "{test=\"simple\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"}, + logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"}, + logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, + logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"}, + logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6a"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6b"}, + logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"}, + logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"}, + logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"}, + logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"}, + }, + }, + }, + start: time.Unix(1, 0), + end: time.Unix(11, 0), + limit: 11, + batch: 4, + labelMatcher: "{test=\"simple\"}", + forward: false, + // Our batchsize is 2 but each query will also return the overlapping last element from the + // previous batch, as such we only get one item per call so we make a lot of calls + // Call one: line10 line9 line8 line7 + // Call two: line7 line6b line6a line6 + // Call three: line6b line6a line6 line5 + // Call four: line5 line5 line3 line2 + expectedCalls: 4, + expected: []string{ + "line10", "line9", "line8", "line7", "line6b", "line6a", "line6", "line5", "line4", "line3", "line2", + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -216,7 +473,14 @@ func Test_batch(t *testing.T) { LocalConfig: "", } q.DoQuery(tc, out, false) - assert.Equal(t, tt.expected, writer.String()) + split := strings.Split(writer.String(), "\n") + // Remove the last entry because there is always a newline after the last line which + // leaves an entry element in the list of lines. + if len(split) > 0 { + split = split[:len(split)-1] + } + assert.Equal(t, tt.expected, split) + assert.Equal(t, tt.expectedCalls, tc.queryRangeCalls) }) } } @@ -232,13 +496,17 @@ func mustParseLabels(s string) loghttp.LabelSet { } type testQueryClient struct { - engine *logql.Engine + engine *logql.Engine + queryRangeCalls int } func newTestQueryClient(testStreams ...logproto.Stream) *testQueryClient { q := logql.NewMockQuerier(0, testStreams) e := logql.NewEngine(logql.EngineOpts{}, q) - return &testQueryClient{engine: e} + return &testQueryClient{ + engine: e, + queryRangeCalls: 0, + } } func (t *testQueryClient) Query(queryStr string, limit int, time time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) { @@ -267,7 +535,7 @@ func (t *testQueryClient) QueryRange(queryStr string, limit int, from, through t Statistics: v.Statistics, }, } - + t.queryRangeCalls++ return q, nil } diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index ecaa17bd1f694..35b2eeb333428 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -91,11 +91,23 @@ outer: } - return iter.NewTimeRangedIterator( - iter.NewStreamsIterator(ctx, filtered, req.Direction), - req.Start, - req.End, - ), nil + streamIters := make([]iter.EntryIterator, 0, len(filtered)) + for i := range filtered { + // This is the same as how LazyChunk or MemChunk build their iterators, + // they return a TimeRangedIterator which is wrapped in a EntryReversedIter if the direction is BACKWARD + iterForward := iter.NewTimeRangedIterator(iter.NewStreamIterator(filtered[i]), req.Start, req.End) + if req.Direction == logproto.FORWARD { + streamIters = append(streamIters, iterForward) + } else { + reversed, err := iter.NewEntryReversedIter(iterForward) + if err != nil { + return nil, err + } + streamIters = append(streamIters, reversed) + } + } + + return iter.NewHeapIterator(ctx, streamIters, req.Direction), nil } func (q MockQuerier) SelectSamples(ctx context.Context, req SelectSampleParams) (iter.SampleIterator, error) {