From 89ee87aad734d2c764546e3e41ece566048b3683 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Wed, 23 Oct 2024 09:46:32 +0200 Subject: [PATCH] Don't free buffers after reading query stream Follow up on https://github.com/grafana/mimir/pull/9401 While the write path has been carefully crafted to make sure that we don't keep references to the strings backed by arrays with data from grpc stream (because it would cause a memory leak), this doesn't seem to be the case in the query path, where some tests started failing after we started to reuse those backing arrays through the usage of memory buffers implemented in the new gRPC library. This change reverts the buffer freeing, means that it won't be recycled (and will be garbage collected) to ensure data correctness, while we investigate where the data references are kept. Signed-off-by: Oleg Zaytsev --- pkg/distributor/query.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 4f08996a667..73610f6e089 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -269,7 +269,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ if len(resp.Timeseries) > 0 { for _, series := range resp.Timeseries { if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil { - resp.FreeBuffer() return ingesterQueryResult{}, limitErr } } @@ -278,24 +277,20 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ } else if len(resp.Chunkseries) > 0 { // Enforce the max chunks limits. if err := queryLimiter.AddChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil { - resp.FreeBuffer() return ingesterQueryResult{}, err } if err := queryLimiter.AddEstimatedChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil { - resp.FreeBuffer() return ingesterQueryResult{}, err } for _, series := range resp.Chunkseries { if err := queryLimiter.AddSeries(series.Labels); err != nil { - resp.FreeBuffer() return ingesterQueryResult{}, err } } if err := queryLimiter.AddChunkBytes(ingester_client.ChunksSize(resp.Chunkseries)); err != nil { - resp.FreeBuffer() return ingesterQueryResult{}, err } @@ -306,18 +301,15 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ for _, s := range resp.StreamingSeries { if err := queryLimiter.AddSeries(s.Labels); err != nil { - resp.FreeBuffer() return ingesterQueryResult{}, err } // We enforce the chunk count limit here, but enforce the chunk bytes limit while streaming the chunks themselves. if err := queryLimiter.AddChunks(int(s.ChunkCount)); err != nil { - resp.FreeBuffer() return ingesterQueryResult{}, err } if err := queryLimiter.AddEstimatedChunks(int(s.ChunkCount)); err != nil { - resp.FreeBuffer() return ingesterQueryResult{}, err } @@ -327,8 +319,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ streamingSeriesBatches = append(streamingSeriesBatches, labelsBatch) } - resp.FreeBuffer() - if resp.IsEndOfSeriesStream { if streamingSeriesCount > 0 { result.streamingSeries.Series = make([]labels.Labels, 0, streamingSeriesCount)