From cb2a2b699f05838a9bdb63f2e23cc5ee385241d1 Mon Sep 17 00:00:00 2001
From: "Grot (@grafanabot)" <43478413+grafanabot@users.noreply.github.com>
Date: Mon, 18 Nov 2024 04:17:02 +0200
Subject: [PATCH] Don't hold labels from store-gateways in two forms, and don't
 convert them multiple times (#9914) (#9930)

* Don't hold labels from store-gateways in two forms

* Don't retain labels longer than needed

* Don't convert mimirpb.LabelAdaptors to labels.Labels multiple times

* Add changelog entry

(cherry picked from commit d2367de16a5623e422f802b1e89826051e3cc6f2)

Co-authored-by: Charles Korn <charleskorn@users.noreply.github.com>
---
 CHANGELOG.md                           |  1 +
 pkg/distributor/distributor.go         |  2 +-
 pkg/distributor/query.go               | 10 +++++----
 pkg/querier/block_streaming.go         | 13 ++++++-----
 pkg/querier/block_streaming_test.go    |  5 +----
 pkg/querier/blocks_store_queryable.go  | 30 +++++++++++++++-----------
 pkg/util/limiter/query_limiter.go      |  6 +++---
 pkg/util/limiter/query_limiter_test.go | 17 +++++++--------
 8 files changed, 46 insertions(+), 38 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index a1389c5a2bf..c4adf5db7f2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -65,6 +65,7 @@
 * [ENHANCEMENT] PromQL: make `sort_by_label` stable. #9879
 * [ENHANCEMENT] Distributor: Initialize ha_tracker cache before ha_tracker and distributor reach running state and begin serving writes. #9826
 * [ENHANCEMENT] Ingester: `-ingest-storage.kafka.max-buffered-bytes` to limit the memory for buffered records when using concurrent fetching. #9892
+* [ENHANCEMENT] Querier: improve performance and memory consumption of queries that select many series. #9914
 * [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508
 * [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508
 * [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508
diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go
index 7ca90a4185e..fe475d5ed09 100644
--- a/pkg/distributor/distributor.go
+++ b/pkg/distributor/distributor.go
@@ -2557,7 +2557,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
 
 	result := make([]labels.Labels, 0, len(metrics))
 	for _, m := range metrics {
-		if err := queryLimiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(m)); err != nil {
+		if err := queryLimiter.AddSeries(m); err != nil {
 			return nil, err
 		}
 		result = append(result, m)
diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go
index 73610f6e089..ee381656840 100644
--- a/pkg/distributor/query.go
+++ b/pkg/distributor/query.go
@@ -268,7 +268,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
 
 			if len(resp.Timeseries) > 0 {
 				for _, series := range resp.Timeseries {
-					if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
+					if limitErr := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); limitErr != nil {
 						return ingesterQueryResult{}, limitErr
 					}
 				}
@@ -285,7 +285,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
 				}
 
 				for _, series := range resp.Chunkseries {
-					if err := queryLimiter.AddSeries(series.Labels); err != nil {
+					if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); err != nil {
 						return ingesterQueryResult{}, err
 					}
 				}
@@ -300,7 +300,9 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
 				streamingSeriesCount += len(resp.StreamingSeries)
 
 				for _, s := range resp.StreamingSeries {
-					if err := queryLimiter.AddSeries(s.Labels); err != nil {
+					l := mimirpb.FromLabelAdaptersToLabels(s.Labels)
+
+					if err := queryLimiter.AddSeries(l); err != nil {
 						return ingesterQueryResult{}, err
 					}
 
@@ -313,7 +315,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
 						return ingesterQueryResult{}, err
 					}
 
-					labelsBatch = append(labelsBatch, mimirpb.FromLabelAdaptersToLabels(s.Labels))
+					labelsBatch = append(labelsBatch, l)
 				}
 
 				streamingSeriesBatches = append(streamingSeriesBatches, labelsBatch)
diff --git a/pkg/querier/block_streaming.go b/pkg/querier/block_streaming.go
index 927c5b2d449..3d21084f9ff 100644
--- a/pkg/querier/block_streaming.go
+++ b/pkg/querier/block_streaming.go
@@ -17,7 +17,6 @@ import (
 	"github.com/prometheus/prometheus/tsdb/chunkenc"
 	"github.com/prometheus/prometheus/util/annotations"
 
-	"github.com/grafana/mimir/pkg/mimirpb"
 	"github.com/grafana/mimir/pkg/querier/stats"
 	"github.com/grafana/mimir/pkg/storage/series"
 	"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
@@ -31,7 +30,7 @@ import (
 
 // Implementation of storage.SeriesSet, based on individual responses from store client.
 type blockStreamingQuerierSeriesSet struct {
-	series       []*storepb.StreamingSeries
+	series       []labels.Labels
 	streamReader chunkStreamReader
 
 	// next response to process
@@ -55,18 +54,22 @@ func (bqss *blockStreamingQuerierSeriesSet) Next() bool {
 		return false
 	}
 
-	currLabels := bqss.series[bqss.nextSeriesIndex].Labels
+	currLabels := bqss.series[bqss.nextSeriesIndex]
 	seriesIdxStart := bqss.nextSeriesIndex // First series in this group. We might merge with more below.
 	bqss.nextSeriesIndex++
 
 	// Chunks may come in multiple responses, but as soon as the response has chunks for a new series,
 	// we can stop searching. Series are sorted. See documentation for StoreClient.Series call for details.
 	// The actually merging of chunks happens in the Iterator() call where chunks are fetched.
-	for bqss.nextSeriesIndex < len(bqss.series) && mimirpb.CompareLabelAdapters(currLabels, bqss.series[bqss.nextSeriesIndex].Labels) == 0 {
+	for bqss.nextSeriesIndex < len(bqss.series) && labels.Equal(currLabels, bqss.series[bqss.nextSeriesIndex]) {
 		bqss.nextSeriesIndex++
 	}
 
-	bqss.currSeries = newBlockStreamingQuerierSeries(mimirpb.FromLabelAdaptersToLabels(currLabels), seriesIdxStart, bqss.nextSeriesIndex-1, bqss.streamReader, bqss.chunkInfo, bqss.nextSeriesIndex >= len(bqss.series), bqss.remoteAddress)
+	bqss.currSeries = newBlockStreamingQuerierSeries(currLabels, seriesIdxStart, bqss.nextSeriesIndex-1, bqss.streamReader, bqss.chunkInfo, bqss.nextSeriesIndex >= len(bqss.series), bqss.remoteAddress)
+
+	// Clear any labels we no longer need, to allow them to be garbage collected when they're no longer needed elsewhere.
+	clear(bqss.series[seriesIdxStart : bqss.nextSeriesIndex-1])
+
 	return true
 }
 
diff --git a/pkg/querier/block_streaming_test.go b/pkg/querier/block_streaming_test.go
index 501fa8f7c43..48b62329aa9 100644
--- a/pkg/querier/block_streaming_test.go
+++ b/pkg/querier/block_streaming_test.go
@@ -19,7 +19,6 @@ import (
 	"go.uber.org/atomic"
 	"google.golang.org/grpc/metadata"
 
-	"github.com/grafana/mimir/pkg/mimirpb"
 	"github.com/grafana/mimir/pkg/querier/stats"
 	"github.com/grafana/mimir/pkg/storegateway/storepb"
 	"github.com/grafana/mimir/pkg/util/limiter"
@@ -166,9 +165,7 @@ func TestBlockStreamingQuerierSeriesSet(t *testing.T) {
 		t.Run(name, func(t *testing.T) {
 			ss := &blockStreamingQuerierSeriesSet{streamReader: &mockChunkStreamer{series: c.input, causeError: c.errorChunkStreamer}}
 			for _, s := range c.input {
-				ss.series = append(ss.series, &storepb.StreamingSeries{
-					Labels: mimirpb.FromLabelsToLabelAdapters(s.lbls),
-				})
+				ss.series = append(ss.series, s.lbls)
 			}
 			idx := 0
 			var it chunkenc.Iterator
diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go
index 9ce696b3553..a6f17182594 100644
--- a/pkg/querier/blocks_store_queryable.go
+++ b/pkg/querier/blocks_store_queryable.go
@@ -780,9 +780,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
 				return err
 			}
 
-			// A storegateway client will only fill either of mySeries or myStreamingSeries, and not both.
+			// A storegateway client will only fill either of mySeries or myStreamingSeriesLabels, and not both.
 			mySeries := []*storepb.Series(nil)
-			myStreamingSeries := []*storepb.StreamingSeries(nil)
+			myStreamingSeriesLabels := []labels.Labels(nil)
 			var myWarnings annotations.Annotations
 			myQueriedBlocks := []ulid.ULID(nil)
 			indexBytesFetched := uint64(0)
@@ -813,7 +813,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
 					mySeries = append(mySeries, s)
 
 					// Add series fingerprint to query limiter; will return error if we are over the limit
-					if err := queryLimiter.AddSeries(s.Labels); err != nil {
+					if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(s.Labels)); err != nil {
 						return err
 					}
 
@@ -853,16 +853,22 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
 				}
 
 				if ss := resp.GetStreamingSeries(); ss != nil {
+					myStreamingSeriesLabels = slices.Grow(myStreamingSeriesLabels, len(ss.Series))
+
 					for _, s := range ss.Series {
 						// Add series fingerprint to query limiter; will return error if we are over the limit
-						if limitErr := queryLimiter.AddSeries(s.Labels); limitErr != nil {
+						l := mimirpb.FromLabelAdaptersToLabels(s.Labels)
+
+						if limitErr := queryLimiter.AddSeries(l); limitErr != nil {
 							return limitErr
 						}
+
+						myStreamingSeriesLabels = append(myStreamingSeriesLabels, l)
 					}
-					myStreamingSeries = append(myStreamingSeries, ss.Series...)
+
 					if ss.IsEndOfSeriesStream {
 						// If we aren't expecting any series from this stream, close it now.
-						if len(myStreamingSeries) == 0 {
+						if len(myStreamingSeriesLabels) == 0 {
 							util.CloseAndExhaust[*storepb.SeriesResponse](stream) //nolint:errcheck
 						}
 
@@ -904,13 +910,13 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
 						chunkInfo.EndSeries(i == len(mySeries)-1)
 					}
 				}
-			} else if len(myStreamingSeries) > 0 {
+			} else if len(myStreamingSeriesLabels) > 0 {
 				// FetchedChunks and FetchedChunkBytes are added by the SeriesChunksStreamReader.
-				reqStats.AddFetchedSeries(uint64(len(myStreamingSeries)))
-				streamReader = newStoreGatewayStreamReader(reqCtx, stream, len(myStreamingSeries), queryLimiter, reqStats, q.metrics, q.logger)
+				reqStats.AddFetchedSeries(uint64(len(myStreamingSeriesLabels)))
+				streamReader = newStoreGatewayStreamReader(reqCtx, stream, len(myStreamingSeriesLabels), queryLimiter, reqStats, q.metrics, q.logger)
 				level.Debug(log).Log("msg", "received streaming series from store-gateway",
 					"instance", c.RemoteAddress(),
-					"fetched series", len(myStreamingSeries),
+					"fetched series", len(myStreamingSeriesLabels),
 					"fetched index bytes", indexBytesFetched,
 					"requested blocks", strings.Join(convertULIDsToString(blockIDs), " "),
 					"queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "))
@@ -925,12 +931,12 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
 			mtx.Lock()
 			if len(mySeries) > 0 {
 				seriesSets = append(seriesSets, &blockQuerierSeriesSet{series: mySeries})
-			} else if len(myStreamingSeries) > 0 {
+			} else if len(myStreamingSeriesLabels) > 0 {
 				if chunkInfo != nil {
 					chunkInfo.SetMsg("store-gateway streaming")
 				}
 				seriesSets = append(seriesSets, &blockStreamingQuerierSeriesSet{
-					series:        myStreamingSeries,
+					series:        myStreamingSeriesLabels,
 					streamReader:  streamReader,
 					chunkInfo:     chunkInfo,
 					remoteAddress: c.RemoteAddress(),
diff --git a/pkg/util/limiter/query_limiter.go b/pkg/util/limiter/query_limiter.go
index fcd61b88869..0a7333f7721 100644
--- a/pkg/util/limiter/query_limiter.go
+++ b/pkg/util/limiter/query_limiter.go
@@ -9,9 +9,9 @@ import (
 	"context"
 	"sync"
 
+	"github.com/prometheus/prometheus/model/labels"
 	"go.uber.org/atomic"
 
-	"github.com/grafana/mimir/pkg/mimirpb"
 	"github.com/grafana/mimir/pkg/querier/stats"
 	"github.com/grafana/mimir/pkg/util/validation"
 )
@@ -74,12 +74,12 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter {
 }
 
 // AddSeries adds the input series and returns an error if the limit is reached.
-func (ql *QueryLimiter) AddSeries(seriesLabels []mimirpb.LabelAdapter) validation.LimitError {
+func (ql *QueryLimiter) AddSeries(seriesLabels labels.Labels) validation.LimitError {
 	// If the max series is unlimited just return without managing map
 	if ql.maxSeriesPerQuery == 0 {
 		return nil
 	}
-	fingerprint := mimirpb.FromLabelAdaptersToLabels(seriesLabels).Hash()
+	fingerprint := seriesLabels.Hash()
 
 	ql.uniqueSeriesMx.Lock()
 	defer ql.uniqueSeriesMx.Unlock()
diff --git a/pkg/util/limiter/query_limiter_test.go b/pkg/util/limiter/query_limiter_test.go
index 0d8041e4e4d..73179122241 100644
--- a/pkg/util/limiter/query_limiter_test.go
+++ b/pkg/util/limiter/query_limiter_test.go
@@ -16,7 +16,6 @@ import (
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 
-	"github.com/grafana/mimir/pkg/mimirpb"
 	"github.com/grafana/mimir/pkg/querier/stats"
 )
 
@@ -37,15 +36,15 @@ func TestQueryLimiter_AddSeries_ShouldReturnNoErrorOnLimitNotExceeded(t *testing
 		reg     = prometheus.NewPedanticRegistry()
 		limiter = NewQueryLimiter(100, 0, 0, 0, stats.NewQueryMetrics(reg))
 	)
-	err := limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series1))
+	err := limiter.AddSeries(series1)
 	assert.NoError(t, err)
-	err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series2))
+	err = limiter.AddSeries(series2)
 	assert.NoError(t, err)
 	assert.Equal(t, 2, limiter.uniqueSeriesCount())
 	assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0)
 
 	// Re-add previous series to make sure it's not double counted
-	err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series1))
+	err = limiter.AddSeries(series1)
 	assert.NoError(t, err)
 	assert.Equal(t, 2, limiter.uniqueSeriesCount())
 	assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0)
@@ -72,21 +71,21 @@ func TestQueryLimiter_AddSeries_ShouldReturnErrorOnLimitExceeded(t *testing.T) {
 		reg     = prometheus.NewPedanticRegistry()
 		limiter = NewQueryLimiter(1, 0, 0, 0, stats.NewQueryMetrics(reg))
 	)
-	err := limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series1))
+	err := limiter.AddSeries(series1)
 	require.NoError(t, err)
 	assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0)
 
-	err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series2))
+	err = limiter.AddSeries(series2)
 	require.Error(t, err)
 	assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0)
 
 	// Add the same series again and ensure that we don't increment the failed queries metric again.
-	err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series2))
+	err = limiter.AddSeries(series2)
 	require.Error(t, err)
 	assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0)
 
 	// Add another series and ensure that we don't increment the failed queries metric again.
-	err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series3))
+	err = limiter.AddSeries(series3)
 	require.Error(t, err)
 	assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0)
 }
@@ -188,7 +187,7 @@ func BenchmarkQueryLimiter_AddSeries(b *testing.B) {
 	reg := prometheus.NewPedanticRegistry()
 	limiter := NewQueryLimiter(b.N+1, 0, 0, 0, stats.NewQueryMetrics(reg))
 	for _, s := range series {
-		err := limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(s))
+		err := limiter.AddSeries(s)
 		assert.NoError(b, err)
 	}
 }