Skip to content

Commit

Permalink
Revert "Don't free buffers after reading query stream (#9721)"
Browse files Browse the repository at this point in the history
This reverts commit f7b6017.
  • Loading branch information
duricanikolic committed Nov 1, 2024
1 parent 6ed8d6b commit 617cc83
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
if len(resp.Timeseries) > 0 {
for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
resp.FreeBuffer()
return ingesterQueryResult{}, limitErr
}
}
Expand All @@ -277,20 +278,24 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
} else if len(resp.Chunkseries) > 0 {
// Enforce the max chunks limits.
if err := queryLimiter.AddChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
resp.FreeBuffer()
return ingesterQueryResult{}, err
}

if err := queryLimiter.AddEstimatedChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
resp.FreeBuffer()
return ingesterQueryResult{}, err
}

for _, series := range resp.Chunkseries {
if err := queryLimiter.AddSeries(series.Labels); err != nil {
resp.FreeBuffer()
return ingesterQueryResult{}, err
}
}

if err := queryLimiter.AddChunkBytes(ingester_client.ChunksSize(resp.Chunkseries)); err != nil {
resp.FreeBuffer()
return ingesterQueryResult{}, err
}

Expand All @@ -301,15 +306,18 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [

for _, s := range resp.StreamingSeries {
if err := queryLimiter.AddSeries(s.Labels); err != nil {
resp.FreeBuffer()
return ingesterQueryResult{}, err
}

// We enforce the chunk count limit here, but enforce the chunk bytes limit while streaming the chunks themselves.
if err := queryLimiter.AddChunks(int(s.ChunkCount)); err != nil {
resp.FreeBuffer()
return ingesterQueryResult{}, err
}

if err := queryLimiter.AddEstimatedChunks(int(s.ChunkCount)); err != nil {
resp.FreeBuffer()
return ingesterQueryResult{}, err
}

Expand All @@ -319,6 +327,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
streamingSeriesBatches = append(streamingSeriesBatches, labelsBatch)
}

resp.FreeBuffer()

if resp.IsEndOfSeriesStream {
if streamingSeriesCount > 0 {
result.streamingSeries.Series = make([]labels.Labels, 0, streamingSeriesCount)
Expand Down

0 comments on commit 617cc83

Please sign in to comment.