Skip to content

Commit

Permalink
Don't free buffers after reading query stream
Browse files Browse the repository at this point in the history
Follow up on #9401

While the write path has been carefully crafted to make sure that we
don't keep references to the strings backed by arrays with data from
grpc stream (because it would cause a memory leak), this doesn't seem to
be the case in the query path, where some tests started failing after we
started to reuse those backing arrays through the usage of memory
buffers implemented in the new gRPC library.

This change reverts the buffer freeing, means that it won't be recycled
(and will be garbage collected) to ensure data correctness, while we
investigate where the data references are kept.

Signed-off-by: Oleg Zaytsev <[email protected]>
  • Loading branch information
colega committed Oct 23, 2024
1 parent accccf4 commit 89ee87a
Showing 1 changed file with 0 additions and 10 deletions.
10 changes: 0 additions & 10 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
if len(resp.Timeseries) > 0 {
for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
resp.FreeBuffer()
return ingesterQueryResult{}, limitErr
}
}
Expand All @@ -278,24 +277,20 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
} else if len(resp.Chunkseries) > 0 {
// Enforce the max chunks limits.
if err := queryLimiter.AddChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
resp.FreeBuffer()
return ingesterQueryResult{}, err
}

if err := queryLimiter.AddEstimatedChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
resp.FreeBuffer()
return ingesterQueryResult{}, err
}

for _, series := range resp.Chunkseries {
if err := queryLimiter.AddSeries(series.Labels); err != nil {
resp.FreeBuffer()
return ingesterQueryResult{}, err
}
}

if err := queryLimiter.AddChunkBytes(ingester_client.ChunksSize(resp.Chunkseries)); err != nil {
resp.FreeBuffer()
return ingesterQueryResult{}, err
}

Expand All @@ -306,18 +301,15 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [

for _, s := range resp.StreamingSeries {
if err := queryLimiter.AddSeries(s.Labels); err != nil {
resp.FreeBuffer()
return ingesterQueryResult{}, err
}

// We enforce the chunk count limit here, but enforce the chunk bytes limit while streaming the chunks themselves.
if err := queryLimiter.AddChunks(int(s.ChunkCount)); err != nil {
resp.FreeBuffer()
return ingesterQueryResult{}, err
}

if err := queryLimiter.AddEstimatedChunks(int(s.ChunkCount)); err != nil {
resp.FreeBuffer()
return ingesterQueryResult{}, err
}

Expand All @@ -327,8 +319,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
streamingSeriesBatches = append(streamingSeriesBatches, labelsBatch)
}

resp.FreeBuffer()

if resp.IsEndOfSeriesStream {
if streamingSeriesCount > 0 {
result.streamingSeries.Series = make([]labels.Labels, 0, streamingSeriesCount)
Expand Down

0 comments on commit 89ee87a

Please sign in to comment.