Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MergeIterator: allocate less memory at first #4341

Merged
merged 4 commits into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260
* [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262
* [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336

## 1.10.0-rc.0 / 2021-06-28
Expand Down
4 changes: 4 additions & 0 deletions pkg/querier/batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) {
{numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.DoubleDelta},
{numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk},
{numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk},
{numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk},
{numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk},
{numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk},
}

for _, scenario := range scenarios {
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/batch/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func mkChunk(t require.TestingT, from model.Time, points int, enc promchunk.Enco
require.Nil(t, npc)
ts = ts.Add(step)
}
ts = ts.Add(-step) // undo the add that we did just before exiting the loop
return chunk.NewChunk(userID, fp, metric, pc, from, ts)
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/querier/batch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func newMergeIterator(cs []GenericChunk) *mergeIterator {
c := &mergeIterator{
its: its,
h: make(iteratorHeap, 0, len(its)),
batches: make(batchStream, 0, len(its)*2*promchunk.BatchSize),
batchesBuf: make(batchStream, len(its)*2*promchunk.BatchSize),
batches: make(batchStream, 0, len(its)),
batchesBuf: make(batchStream, len(its)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall exactly why the pre-allocation was so big - wondering if you know why?

I can't think of a reason why this would affect correctness either, and the perf results speak for themselves...

Copy link
Contributor

@pracucci pracucci Jul 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the correctness perspective, this change should be fine. batchesBuf looks to be written only by mergeStreams() which extends the slice if required.

}

for _, iter := range c.its {
Expand Down Expand Up @@ -112,8 +112,7 @@ func (c *mergeIterator) buildNextBatch(size int) bool {
for len(c.h) > 0 && (len(c.batches) == 0 || c.nextBatchEndTime() >= c.h[0].AtTime()) {
c.nextBatchBuf[0] = c.h[0].Batch()
c.batchesBuf = mergeStreams(c.batches, c.nextBatchBuf[:], c.batchesBuf, size)
copy(c.batches[:len(c.batchesBuf)], c.batchesBuf)
c.batches = c.batches[:len(c.batchesBuf)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a no-op, right? Did it impact performance?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess about this change (but Bryan can confirm or negate), is that we had to do this change because c.batches may need to grow after the change in newMergeIterator(). @bboreham is my understanding correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the append will grow the slice if required, whereas copy will panic. TestMergeIter/DoubleDelta fails if you don't make this change.

c.batches = append(c.batches[:0], c.batchesBuf...)

if c.h[0].Next(size) {
heap.Fix(&c.h, 0)
Expand Down