@@ -217,6 +217,7 @@ func (mb *MergeBuilder) processNextSeries(
217
217
) (
218
218
* SeriesWithBlooms , // nextInBlocks pointer update
219
219
int , // bytes added
220
+ int , // chunks added
220
221
bool , // blocksFinished update
221
222
bool , // done building block
222
223
error , // error
@@ -230,7 +231,7 @@ func (mb *MergeBuilder) processNextSeries(
230
231
}()
231
232
232
233
if ! mb .store .Next () {
233
- return nil , 0 , false , true , nil
234
+ return nil , 0 , 0 , false , true , nil
234
235
}
235
236
236
237
nextInStore := mb .store .At ()
@@ -249,7 +250,7 @@ func (mb *MergeBuilder) processNextSeries(
249
250
}
250
251
251
252
if err := mb .blocks .Err (); err != nil {
252
- return nil , 0 , false , false , errors .Wrap (err , "iterating blocks" )
253
+ return nil , 0 , 0 , false , false , errors .Wrap (err , "iterating blocks" )
253
254
}
254
255
blockSeriesIterated ++
255
256
nextInBlocks = mb .blocks .At ()
@@ -276,11 +277,11 @@ func (mb *MergeBuilder) processNextSeries(
276
277
277
278
for bloom := range ch {
278
279
if bloom .Err != nil {
279
- return nil , bytesAdded , false , false , errors .Wrap (bloom .Err , "populating bloom" )
280
+ return nil , bytesAdded , 0 , false , false , errors .Wrap (bloom .Err , "populating bloom" )
280
281
}
281
282
offset , err := builder .AddBloom (bloom .Bloom )
282
283
if err != nil {
283
- return nil , bytesAdded , false , false , errors .Wrapf (
284
+ return nil , bytesAdded , 0 , false , false , errors .Wrapf (
284
285
err , "adding bloom to block for fp (%s)" , nextInStore .Fingerprint ,
285
286
)
286
287
}
@@ -290,25 +291,29 @@ func (mb *MergeBuilder) processNextSeries(
290
291
291
292
done , err := builder .AddSeries (* nextInStore , offsets )
292
293
if err != nil {
293
- return nil , bytesAdded , false , false , errors .Wrap (err , "committing series" )
294
+ return nil , bytesAdded , 0 , false , false , errors .Wrap (err , "committing series" )
294
295
}
295
296
296
- return nextInBlocks , bytesAdded , blocksFinished , done , nil
297
+ return nextInBlocks , bytesAdded , chunksIndexed + chunksCopied , blocksFinished , done , nil
297
298
}
298
299
299
300
func (mb * MergeBuilder ) Build (builder * BlockBuilder ) (checksum uint32 , totalBytes int , err error ) {
300
301
var (
301
- nextInBlocks * SeriesWithBlooms
302
- blocksFinished bool // whether any previous blocks have been exhausted while building new block
303
- done bool
302
+ nextInBlocks * SeriesWithBlooms
303
+ blocksFinished bool // whether any previous blocks have been exhausted while building new block
304
+ done bool
305
+ totalSeriesAdded = 0
306
+ totalChunksAdded int
304
307
)
305
308
for {
306
- var bytesAdded int
307
- nextInBlocks , bytesAdded , blocksFinished , done , err = mb .processNextSeries (builder , nextInBlocks , blocksFinished )
309
+ var bytesAdded , chunksAdded int
310
+ nextInBlocks , bytesAdded , chunksAdded , blocksFinished , done , err = mb .processNextSeries (builder , nextInBlocks , blocksFinished )
308
311
totalBytes += bytesAdded
312
+ totalChunksAdded += chunksAdded
309
313
if err != nil {
310
314
return 0 , totalBytes , errors .Wrap (err , "processing next series" )
311
315
}
316
+ totalSeriesAdded ++
312
317
if done {
313
318
break
314
319
}
@@ -324,6 +329,8 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (checksum uint32, totalByte
324
329
flushedFor = blockFlushReasonFull
325
330
}
326
331
mb .metrics .blockSize .Observe (float64 (sz ))
332
+ mb .metrics .seriesPerBlock .Observe (float64 (totalSeriesAdded ))
333
+ mb .metrics .chunksPerBlock .Observe (float64 (totalChunksAdded ))
327
334
mb .metrics .blockFlushReason .WithLabelValues (flushedFor ).Inc ()
328
335
329
336
checksum , err = builder .Close ()
0 commit comments