Skip to content

Commit

Permalink
MergeIterator: allocate less memory at first (cortexproject#4341)
Browse files Browse the repository at this point in the history
* MergeIterator: allocate less memory at first

We were allocating 24x the number of streams of batches, where each
batch holds up to 12 samples.

By allowing `c.batches` to reallocate when needed, we avoid the need
to pre-allocate enough memory for all possible scenarios.

* chunk_test: fix innacurate end time on chunks

The `through` time is supposed to be the last time in the chunk, and
having it one step higher was throwing off other tests and benchmarks.

* MergeIterator benchmark: add more realistic sizes

At 15-second scrape intervals a chunk covers 30 minutes, so 1,000 chunks
is about three weeks, a highly un-representative test.

Instant queries, such as those done by the ruler, will only fetch one
chunk from each ingester.

Signed-off-by: Bryan Boreham <[email protected]>
Signed-off-by: Alvin Lin <[email protected]>
  • Loading branch information
bboreham authored and alvinlin123 committed Jan 14, 2022
1 parent da68878 commit 18e05ba
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260
* [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262
* [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336
* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4318

Expand Down
4 changes: 4 additions & 0 deletions pkg/querier/batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) {
{numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.DoubleDelta},
{numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk},
{numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk},
{numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk},
{numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk},
{numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk},
}

for _, scenario := range scenarios {
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/batch/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func mkChunk(t require.TestingT, from model.Time, points int, enc promchunk.Enco
require.Nil(t, npc)
ts = ts.Add(step)
}
ts = ts.Add(-step) // undo the add that we did just before exiting the loop
return chunk.NewChunk(userID, fp, metric, pc, from, ts)
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/querier/batch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func newMergeIterator(cs []GenericChunk) *mergeIterator {
c := &mergeIterator{
its: its,
h: make(iteratorHeap, 0, len(its)),
batches: make(batchStream, 0, len(its)*2*promchunk.BatchSize),
batchesBuf: make(batchStream, len(its)*2*promchunk.BatchSize),
batches: make(batchStream, 0, len(its)),
batchesBuf: make(batchStream, len(its)),
}

for _, iter := range c.its {
Expand Down Expand Up @@ -112,8 +112,7 @@ func (c *mergeIterator) buildNextBatch(size int) bool {
for len(c.h) > 0 && (len(c.batches) == 0 || c.nextBatchEndTime() >= c.h[0].AtTime()) {
c.nextBatchBuf[0] = c.h[0].Batch()
c.batchesBuf = mergeStreams(c.batches, c.nextBatchBuf[:], c.batchesBuf, size)
copy(c.batches[:len(c.batchesBuf)], c.batchesBuf)
c.batches = c.batches[:len(c.batchesBuf)]
c.batches = append(c.batches[:0], c.batchesBuf...)

if c.h[0].Next(size) {
heap.Fix(&c.h, 0)
Expand Down

0 comments on commit 18e05ba

Please sign in to comment.