Skip to content

Commit

Permalink
adding a bunch of tests
Browse files Browse the repository at this point in the history
fixing up how the MockQuerier works.
  • Loading branch information
slim-bean committed Aug 9, 2020
1 parent f3208bc commit 0b5de2a
Show file tree
Hide file tree
Showing 3 changed files with 367 additions and 50 deletions.
63 changes: 50 additions & 13 deletions pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,16 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool)
total := 0
start := q.Start
end := q.End
var lastEntry *loghttp.Entry
var lastEntry []*loghttp.Entry
for total < q.Limit {
bs := q.BatchSize
// We want to truncate the batch size if the remaining number
// of items needed to reach the limit is less than the batch size
if q.Limit-total < q.BatchSize {
// Have to add one because of the timestamp overlap described below, the first result
// will always be the last result of the last batch.
bs = q.Limit - total + 1
// Truncated batchsize is q.Limit - total, however we add to this
// the length of the overlap from the last query to make sure we get the
// correct amount of new logs knowing there will be some overlapping logs returned.
bs = q.Limit - total + len(lastEntry)
}
resp, err = c.QueryRange(q.QueryString, bs, start, end, d, q.Step, q.Interval, q.Quiet)
if err != nil {
Expand All @@ -111,13 +114,21 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool)
break
}
// Also no result, wouldn't expect to hit this.
if lastEntry == nil {
if lastEntry == nil || len(lastEntry) == 0 {
break
}
// Can only happen if all the results return in one request
if resultLength == q.Limit {
break
}
if len(lastEntry) >= q.BatchSize {
log.Fatalf("Invalid batch size %v, the next query will have %v overlapping entries "+
"(there will always be 1 overlapping entry but Loki allows multiple entries to have "+
"the same timestamp, so when a batch ends in this scenario the next query will include "+
"all the overlapping entries again). Please increase your batch size to at least %v to account "+
"for overlapping entryes\n", q.BatchSize, len(lastEntry), len(lastEntry)+1)
}

// Batching works by taking the timestamp of the last query and using it in the next query,
// because Loki supports multiple entries with the same timestamp it's possible for a batch to have
// fallen in the middle of a list of entries for the same time, so to make sure we get all entries
Expand All @@ -127,22 +138,23 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool)
// to get the desired limit.
total += resultLength
// Based on the query direction we either set the start or end for the next query.
// If there are multiple entries in `lastEntry` they have to have the same timestamp so we can pick just the first
if q.Forward {
start = lastEntry.Timestamp
start = lastEntry[0].Timestamp
} else {
// The end timestamp is exclusive on a backward query, so to make sure we get back an overlapping result
// fudge the timestamp forward in time to make sure to get the last entry from this batch in the next query
end = lastEntry.Timestamp.Add(1 * time.Nanosecond)
end = lastEntry[0].Timestamp.Add(1 * time.Nanosecond)
}

}
}

}

func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry *loghttp.Entry) (int, *loghttp.Entry) {
func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) {
length := -1
var entry *loghttp.Entry
var entry []*loghttp.Entry
switch value.Type() {
case logql.ValueTypeStreams:
length, entry = q.printStream(value.(loghttp.Streams), out, lastEntry)
Expand Down Expand Up @@ -241,7 +253,7 @@ func (q *Query) isInstant() bool {
return q.Start == q.End
}

func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry *loghttp.Entry) (int, *loghttp.Entry) {
func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) {
common := commonLabels(streams)

// Remove the labels we want to show from common
Expand Down Expand Up @@ -304,14 +316,39 @@ func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastE
printed := 0
for _, e := range allEntries {
// Skip the last entry if it overlaps, this happens because batching includes the last entry from the last batch
if lastEntry != nil && e.entry.Timestamp == lastEntry.Timestamp && e.entry.Line == lastEntry.Line {
continue
if lastEntry != nil && len(lastEntry) > 0 && e.entry.Timestamp == lastEntry[0].Timestamp {
skip := false
// Because many logs can share a timestamp in the unlucky event a batch ends with a timestamp
// shared by multiple entries we have to check all that were stored to see if we've already
// printed them.
for _, le := range lastEntry {
if e.entry.Line == le.Line {
skip = true
}
}
if skip {
continue
}
}
out.FormatAndPrintln(e.entry.Timestamp, e.labels, maxLabelsLen, e.entry.Line)
printed++
}

return printed, &allEntries[len(allEntries)-1].entry
// Loki allows multiple entries at the same timestamp, this is a bit of a mess if a batch ends
// with an entry that shared multiple timestamps, so we need to keep a list of all these entries
// because the next query is going to contain them too and we want to not duplicate anything already
// printed.
lel := []*loghttp.Entry{}
// Start with the timestamp of the last entry
le := allEntries[len(allEntries)-1].entry
for i, e := range allEntries {
// Save any entry which has this timestamp (most of the time this will only be the single last entry)
if e.entry.Timestamp.Equal(le.Timestamp) {
lel = append(lel, &allEntries[i].entry)
}
}

return printed, lel
}

func (q *Query) printMatrix(matrix loghttp.Matrix) {
Expand Down
Loading

0 comments on commit 0b5de2a

Please sign in to comment.