diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f825f9bf2..5532ab6d3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260 * [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262 +* [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341 * [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336 ## 1.10.0-rc.0 / 2021-06-28 diff --git a/pkg/querier/batch/batch_test.go b/pkg/querier/batch/batch_test.go index baacbcc719..9b9ffec16e 100644 --- a/pkg/querier/batch/batch_test.go +++ b/pkg/querier/batch/batch_test.go @@ -27,6 +27,10 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) { {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.DoubleDelta}, {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk}, {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk}, + {numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk}, + {numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk}, + {numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk}, + {numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk}, } for _, scenario := range scenarios { diff --git a/pkg/querier/batch/chunk_test.go b/pkg/querier/batch/chunk_test.go index 40a3feb4b2..e672811827 100644 --- a/pkg/querier/batch/chunk_test.go +++ b/pkg/querier/batch/chunk_test.go @@ -59,6 +59,7 @@ func mkChunk(t require.TestingT, from model.Time, points int, enc promchunk.Enco require.Nil(t, npc) ts = ts.Add(step) } + ts = ts.Add(-step) // undo the add that we did just before exiting the loop return chunk.NewChunk(userID, fp, metric, pc, from, ts) } diff --git a/pkg/querier/batch/merge.go b/pkg/querier/batch/merge.go index 88220bd727..7764b37467 100644 --- a/pkg/querier/batch/merge.go +++ b/pkg/querier/batch/merge.go @@ -31,8 +31,8 @@ func newMergeIterator(cs []GenericChunk) *mergeIterator { c := &mergeIterator{ its: its, h: make(iteratorHeap, 0, len(its)), - batches: make(batchStream, 0, len(its)*2*promchunk.BatchSize), - batchesBuf: make(batchStream, len(its)*2*promchunk.BatchSize), + batches: make(batchStream, 0, len(its)), + batchesBuf: make(batchStream, len(its)), } for _, iter := range c.its { @@ -112,8 +112,7 @@ func (c *mergeIterator) buildNextBatch(size int) bool { for len(c.h) > 0 && (len(c.batches) == 0 || c.nextBatchEndTime() >= c.h[0].AtTime()) { c.nextBatchBuf[0] = c.h[0].Batch() c.batchesBuf = mergeStreams(c.batches, c.nextBatchBuf[:], c.batchesBuf, size) - copy(c.batches[:len(c.batchesBuf)], c.batchesBuf) - c.batches = c.batches[:len(c.batchesBuf)] + c.batches = append(c.batches[:0], c.batchesBuf...) if c.h[0].Next(size) { heap.Fix(&c.h, 0)