Skip to content

Commit bebe2b1

Browse files
fix(blooms): Copy chunks from ForSeries (#14863)
(cherry picked from commit bfc2890)
1 parent 6fd9b1e commit bebe2b1

File tree

1 file changed

+31
-42
lines changed

1 file changed

+31
-42
lines changed

pkg/bloombuild/planner/strategies/chunksize.go

+31-42
Original file line numberDiff line numberDiff line change
@@ -155,20 +155,16 @@ func getBlocksMatchingBounds(metas []bloomshipper.Meta, bounds v1.FingerprintBou
155155
return deduped, nil
156156
}
157157

158-
type seriesWithChunks struct {
159-
tsdb tsdb.SingleTenantTSDBIdentifier
160-
fp model.Fingerprint
161-
chunks []index.ChunkMeta
162-
}
163-
164158
type seriesBatch struct {
165-
series []seriesWithChunks
159+
tsdb tsdb.SingleTenantTSDBIdentifier
160+
series []*v1.Series
166161
size uint64
167162
}
168163

169-
func newSeriesBatch() seriesBatch {
164+
func newSeriesBatch(tsdb tsdb.SingleTenantTSDBIdentifier) seriesBatch {
170165
return seriesBatch{
171-
series: make([]seriesWithChunks, 0, 100),
166+
tsdb: tsdb,
167+
series: make([]*v1.Series, 0, 100),
172168
}
173169
}
174170

@@ -179,31 +175,14 @@ func (b *seriesBatch) Bounds() v1.FingerprintBounds {
179175

180176
// We assume that the series are sorted by fingerprint.
181177
// This is guaranteed since series are iterated in order by the TSDB.
182-
return v1.NewBounds(b.series[0].fp, b.series[len(b.series)-1].fp)
178+
return v1.NewBounds(b.series[0].Fingerprint, b.series[len(b.series)-1].Fingerprint)
183179
}
184180

185181
func (b *seriesBatch) V1Series() []*v1.Series {
186-
series := make([]*v1.Series, 0, len(b.series))
187-
for _, s := range b.series {
188-
res := &v1.Series{
189-
Fingerprint: s.fp,
190-
Chunks: make(v1.ChunkRefs, 0, len(s.chunks)),
191-
}
192-
for _, chk := range s.chunks {
193-
res.Chunks = append(res.Chunks, v1.ChunkRef{
194-
From: model.Time(chk.MinTime),
195-
Through: model.Time(chk.MaxTime),
196-
Checksum: chk.Checksum,
197-
})
198-
}
199-
200-
series = append(series, res)
201-
}
202-
203-
return series
182+
return b.series
204183
}
205184

206-
func (b *seriesBatch) Append(s seriesWithChunks, size uint64) {
185+
func (b *seriesBatch) Append(s *v1.Series, size uint64) {
207186
b.series = append(b.series, s)
208187
b.size += size
209188
}
@@ -217,10 +196,7 @@ func (b *seriesBatch) Size() uint64 {
217196
}
218197

219198
func (b *seriesBatch) TSDB() tsdb.SingleTenantTSDBIdentifier {
220-
if len(b.series) == 0 {
221-
return tsdb.SingleTenantTSDBIdentifier{}
222-
}
223-
return b.series[0].tsdb
199+
return b.tsdb
224200
}
225201

226202
func (s *ChunkSizeStrategy) sizedSeriesIter(
@@ -230,9 +206,14 @@ func (s *ChunkSizeStrategy) sizedSeriesIter(
230206
targetTaskSizeBytes uint64,
231207
) (iter.Iterator[seriesBatch], int, error) {
232208
batches := make([]seriesBatch, 0, 100)
233-
currentBatch := newSeriesBatch()
209+
var currentBatch seriesBatch
234210

235211
for _, idx := range tsdbsWithGaps {
212+
if currentBatch.Len() > 0 {
213+
batches = append(batches, currentBatch)
214+
}
215+
currentBatch = newSeriesBatch(idx.tsdbIdentifier)
216+
236217
for _, gap := range idx.gaps {
237218
if err := idx.tsdb.ForSeries(
238219
ctx,
@@ -253,14 +234,22 @@ func (s *ChunkSizeStrategy) sizedSeriesIter(
253234
// AND Adding this series to the batch would exceed the target task size.
254235
if currentBatch.Len() > 0 && currentBatch.Size()+seriesSize > targetTaskSizeBytes {
255236
batches = append(batches, currentBatch)
256-
currentBatch = newSeriesBatch()
237+
currentBatch = newSeriesBatch(idx.tsdbIdentifier)
238+
}
239+
240+
res := &v1.Series{
241+
Fingerprint: fp,
242+
Chunks: make(v1.ChunkRefs, 0, len(chks)),
243+
}
244+
for _, chk := range chks {
245+
res.Chunks = append(res.Chunks, v1.ChunkRef{
246+
From: model.Time(chk.MinTime),
247+
Through: model.Time(chk.MaxTime),
248+
Checksum: chk.Checksum,
249+
})
257250
}
258251

259-
currentBatch.Append(seriesWithChunks{
260-
tsdb: idx.tsdbIdentifier,
261-
fp: fp,
262-
chunks: chks,
263-
}, seriesSize)
252+
currentBatch.Append(res, seriesSize)
264253
return false
265254
}
266255
},
@@ -269,10 +258,10 @@ func (s *ChunkSizeStrategy) sizedSeriesIter(
269258
return nil, 0, err
270259
}
271260

272-
// Add the last batch for this TSDB if it's not empty.
261+
// Add the last batch for this gap if it's not empty.
273262
if currentBatch.Len() > 0 {
274263
batches = append(batches, currentBatch)
275-
currentBatch = newSeriesBatch()
264+
currentBatch = newSeriesBatch(idx.tsdbIdentifier)
276265
}
277266
}
278267
}

0 commit comments

Comments
 (0)