From c9298bcca5fbf87230727c9d1b5a547ceec00806 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 6 Jul 2020 14:36:06 -0400 Subject: [PATCH 1/2] Improve entry deduplication. This PR removes mostcommon and sort insert function in the heap iterator. I discovered while working on #2293 that those are actually not helping since we're deduping those lines anyways. There were no tests checking if deduping was correctly working so I did added those. Bonus point this means deduping will run faster and the code is less complex. The only side effect is that the order of entries that are at the same timestamp, before the most common entry would appear first, now we keep the same order as we stored them, which I think is better. I also change the label ordering, I think whether we are forward or backward we should keep the same aphabetical labels ordering not sure why direction was altering this before. Signed-off-by: Cyril Tovena --- pkg/iter/iterator.go | 50 +------------- pkg/iter/iterator_test.go | 104 +++++++++++++++++------------ pkg/logql/series_extractor_test.go | 6 +- 3 files changed, 66 insertions(+), 94 deletions(-) diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index 4c5669b20f773..8a2f73b5156cc 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -122,7 +122,7 @@ func (h iteratorMaxHeap) Less(i, j int) bool { case un1 > un2: return true default: // un1 == un2 - return h.iteratorHeap[i].Labels() > h.iteratorHeap[j].Labels() + return h.iteratorHeap[i].Labels() < h.iteratorHeap[j].Labels() } } @@ -232,8 +232,7 @@ func (i *heapIterator) Next() bool { } heap.Pop(i.heap) - // insert keeps i.tuples sorted - i.tuples = insert(i.tuples, tuple{ + i.tuples = append(i.tuples, tuple{ Entry: entry, EntryIterator: next, }) @@ -250,7 +249,7 @@ func (i *heapIterator) Next() bool { // Find in tuples which entry occurs most often which, due to quorum based // replication, is guaranteed to be the correct next entry. - t := mostCommon(i.tuples) + t := i.tuples[0] i.currEntry = t.Entry i.currLabels = t.Labels() @@ -270,49 +269,6 @@ func (i *heapIterator) Next() bool { return true } -// Insert new tuple to correct position into ordered set of tuples. -// Insert sort is fast for small number of elements, and here we only expect max [number of replicas] elements. -func insert(ts []tuple, n tuple) []tuple { - ix := 0 - for ix < len(ts) && ts[ix].Line <= n.Line { - ix++ - } - if ix < len(ts) { - ts = append(ts, tuple{}) // zero element - copy(ts[ix+1:], ts[ix:]) - ts[ix] = n - } else { - ts = append(ts, n) - } - return ts -} - -// Expects that tuples are sorted already. We achieve that by using insert. -func mostCommon(tuples []tuple) tuple { - // trivial case, no need to do extra work. - if len(tuples) == 1 { - return tuples[0] - } - - result := tuples[0] - count, max := 0, 0 - for i := 0; i < len(tuples)-1; i++ { - if tuples[i].Line == tuples[i+1].Line { - count++ - continue - } - if count > max { - result = tuples[i] - max = count - } - count = 0 - } - if count > max { - result = tuples[len(tuples)-1] - } - return result -} - func (i *heapIterator) Entry() logproto.Entry { return i.currEntry } diff --git a/pkg/iter/iterator_test.go b/pkg/iter/iterator_test.go index 7a7f3cbaddfbf..600d4a841ed52 100644 --- a/pkg/iter/iterator_test.go +++ b/pkg/iter/iterator_test.go @@ -3,7 +3,6 @@ package iter import ( "context" "fmt" - "sort" "testing" "time" @@ -231,50 +230,67 @@ func inverse(g generator) generator { } } -func TestMostCommon(t *testing.T) { - // First is most common. - tuples := []tuple{ - {Entry: logproto.Entry{Line: "a"}}, - {Entry: logproto.Entry{Line: "b"}}, - {Entry: logproto.Entry{Line: "c"}}, - {Entry: logproto.Entry{Line: "a"}}, - {Entry: logproto.Entry{Line: "b"}}, - {Entry: logproto.Entry{Line: "c"}}, - {Entry: logproto.Entry{Line: "a"}}, - } - require.Equal(t, "a", mostCommon(tuples).Entry.Line) - - tuples = []tuple{ - {Entry: logproto.Entry{Line: "a"}}, - {Entry: logproto.Entry{Line: "b"}}, - {Entry: logproto.Entry{Line: "b"}}, - {Entry: logproto.Entry{Line: "c"}}, - {Entry: logproto.Entry{Line: "c"}}, - {Entry: logproto.Entry{Line: "c"}}, - {Entry: logproto.Entry{Line: "d"}}, - } - require.Equal(t, "c", mostCommon(tuples).Entry.Line) -} - -func TestInsert(t *testing.T) { - toInsert := []tuple{ - {Entry: logproto.Entry{Line: "a"}}, - {Entry: logproto.Entry{Line: "e"}}, - {Entry: logproto.Entry{Line: "c"}}, - {Entry: logproto.Entry{Line: "b"}}, - {Entry: logproto.Entry{Line: "d"}}, - {Entry: logproto.Entry{Line: "a"}}, - {Entry: logproto.Entry{Line: "c"}}, +func TestHeapIteratorDeduplication(t *testing.T) { + foo := logproto.Stream{ + Labels: `{app="foo"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "1"}, + {Timestamp: time.Unix(0, 2), Line: "2"}, + {Timestamp: time.Unix(0, 3), Line: "3"}, + }, } - - var ts []tuple - for _, e := range toInsert { - ts = insert(ts, e) + bar := logproto.Stream{ + Labels: `{app="bar"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "1"}, + {Timestamp: time.Unix(0, 2), Line: "2"}, + {Timestamp: time.Unix(0, 3), Line: "3"}, + }, } + assertIt := func(it EntryIterator) { + for i := 0; i < 3; i++ { + require.True(t, it.Next()) + require.NoError(t, it.Error()) + require.Equal(t, bar.Labels, it.Labels()) + require.Equal(t, bar.Entries[i], it.Entry()) - require.True(t, sort.SliceIsSorted(ts, func(i, j int) bool { - return ts[i].Line < ts[j].Line - })) + require.True(t, it.Next()) + require.NoError(t, it.Error()) + require.Equal(t, foo.Labels, it.Labels()) + require.Equal(t, foo.Entries[i], it.Entry()) + + } + require.False(t, it.Next()) + require.NoError(t, it.Error()) + } + // forward iteration + it := NewHeapIterator(context.Background(), []EntryIterator{ + NewStreamIterator(foo), + NewStreamIterator(bar), + NewStreamIterator(foo), + NewStreamIterator(bar), + NewStreamIterator(foo), + NewStreamIterator(bar), + NewStreamIterator(foo), + }, logproto.FORWARD) + assertIt(it) + + // backward iteration + it = NewHeapIterator(context.Background(), []EntryIterator{ + NewStreamIterator(foo), + NewStreamIterator(bar), + NewStreamIterator(foo), + NewStreamIterator(bar), + NewStreamIterator(foo), + NewStreamIterator(bar), + NewStreamIterator(foo), + }, logproto.BACKWARD) + // first reverse streams, they should already be correctly ordered for the heap iterator to work. + for i, j := 0, len(foo.Entries)-1; i < j; i, j = i+1, j-1 { + foo.Entries[i], foo.Entries[j] = foo.Entries[j], foo.Entries[i] + bar.Entries[i], bar.Entries[j] = bar.Entries[j], bar.Entries[i] + } + assertIt(it) } func TestReverseIterator(t *testing.T) { @@ -288,10 +304,10 @@ func TestReverseIterator(t *testing.T) { for i := int64((testSize / 2) + 1); i <= testSize; i++ { assert.Equal(t, true, reversedIter.Next()) assert.Equal(t, identity(i), reversedIter.Entry(), fmt.Sprintln("iteration", i)) - assert.Equal(t, reversedIter.Labels(), itr1.Labels()) + assert.Equal(t, reversedIter.Labels(), itr2.Labels()) assert.Equal(t, true, reversedIter.Next()) assert.Equal(t, identity(i), reversedIter.Entry(), fmt.Sprintln("iteration", i)) - assert.Equal(t, reversedIter.Labels(), itr2.Labels()) + assert.Equal(t, reversedIter.Labels(), itr1.Labels()) } assert.Equal(t, false, reversedIter.Next()) diff --git a/pkg/logql/series_extractor_test.go b/pkg/logql/series_extractor_test.go index ad6a77bcd822d..8ef572d2a2f97 100644 --- a/pkg/logql/series_extractor_test.go +++ b/pkg/logql/series_extractor_test.go @@ -111,12 +111,12 @@ func Test_seriesIterator_Peek(t *testing.T) { extractBytes, ), []expectation{ - {true, Sample{Labels: `{app="foo"}`, TimestampNano: 0, Value: 3}}, - {true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 3}}, - {true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 3}}, {true, Sample{Labels: `{app="barr"}`, TimestampNano: 0, Value: 4}}, {true, Sample{Labels: `{app="barr"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 4}}, {true, Sample{Labels: `{app="barr"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 4}}, + {true, Sample{Labels: `{app="foo"}`, TimestampNano: 0, Value: 3}}, + {true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 3}}, + {true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 3}}, {false, Sample{}}, }, }, From 1eb8b5a8fa13f96fd1b66ba9acdad20becdd8afd Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 8 Jul 2020 09:13:15 -0400 Subject: [PATCH 2/2] Improve heap iterator backward test. Signed-off-by: Cyril Tovena --- pkg/iter/iterator_test.go | 41 +++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/pkg/iter/iterator_test.go b/pkg/iter/iterator_test.go index 600d4a841ed52..faa70fc6485cd 100644 --- a/pkg/iter/iterator_test.go +++ b/pkg/iter/iterator_test.go @@ -247,17 +247,21 @@ func TestHeapIteratorDeduplication(t *testing.T) { {Timestamp: time.Unix(0, 3), Line: "3"}, }, } - assertIt := func(it EntryIterator) { - for i := 0; i < 3; i++ { + assertIt := func(it EntryIterator, reversed bool, length int) { + for i := 0; i < length; i++ { + j := i + if reversed { + j = length - 1 - i + } require.True(t, it.Next()) require.NoError(t, it.Error()) require.Equal(t, bar.Labels, it.Labels()) - require.Equal(t, bar.Entries[i], it.Entry()) + require.Equal(t, bar.Entries[j], it.Entry()) require.True(t, it.Next()) require.NoError(t, it.Error()) require.Equal(t, foo.Labels, it.Labels()) - require.Equal(t, foo.Entries[i], it.Entry()) + require.Equal(t, foo.Entries[j], it.Entry()) } require.False(t, it.Next()) @@ -273,24 +277,27 @@ func TestHeapIteratorDeduplication(t *testing.T) { NewStreamIterator(bar), NewStreamIterator(foo), }, logproto.FORWARD) - assertIt(it) + assertIt(it, false, len(foo.Entries)) // backward iteration it = NewHeapIterator(context.Background(), []EntryIterator{ - NewStreamIterator(foo), - NewStreamIterator(bar), - NewStreamIterator(foo), - NewStreamIterator(bar), - NewStreamIterator(foo), - NewStreamIterator(bar), - NewStreamIterator(foo), + mustReverseStreamIterator(NewStreamIterator(foo)), + mustReverseStreamIterator(NewStreamIterator(bar)), + mustReverseStreamIterator(NewStreamIterator(foo)), + mustReverseStreamIterator(NewStreamIterator(bar)), + mustReverseStreamIterator(NewStreamIterator(foo)), + mustReverseStreamIterator(NewStreamIterator(bar)), + mustReverseStreamIterator(NewStreamIterator(foo)), }, logproto.BACKWARD) - // first reverse streams, they should already be correctly ordered for the heap iterator to work. - for i, j := 0, len(foo.Entries)-1; i < j; i, j = i+1, j-1 { - foo.Entries[i], foo.Entries[j] = foo.Entries[j], foo.Entries[i] - bar.Entries[i], bar.Entries[j] = bar.Entries[j], bar.Entries[i] + assertIt(it, true, len(foo.Entries)) +} + +func mustReverseStreamIterator(it EntryIterator) EntryIterator { + reversed, err := NewReversedIter(it, 0, true) + if err != nil { + panic(err) } - assertIt(it) + return reversed } func TestReverseIterator(t *testing.T) {