Skip to content

Commit fb9b0e8

Browse files
authored
chore(blooms): adds more instrumentation to block building (#12779)
1 parent ecefb49 commit fb9b0e8

File tree

3 files changed

+63
-15
lines changed

3 files changed

+63
-15
lines changed

pkg/bloomcompactor/spec.go

+28-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"io"
7+
"time"
78

89
"github.com/go-kit/log"
910
"github.com/go-kit/log/level"
@@ -74,12 +75,16 @@ func NewSimpleBloomGenerator(
7475
logger log.Logger,
7576
) *SimpleBloomGenerator {
7677
return &SimpleBloomGenerator{
77-
userID: userID,
78-
opts: opts,
79-
store: store,
80-
chunkLoader: chunkLoader,
81-
blocksIter: blocksIter,
82-
logger: log.With(logger, "component", "bloom_generator"),
78+
userID: userID,
79+
opts: opts,
80+
store: store,
81+
chunkLoader: chunkLoader,
82+
blocksIter: blocksIter,
83+
logger: log.With(
84+
logger,
85+
"component", "bloom_generator",
86+
"org_id", userID,
87+
),
8388
readWriterFn: readWriterFn,
8489
metrics: metrics,
8590
reporter: reporter,
@@ -90,6 +95,13 @@ func NewSimpleBloomGenerator(
9095

9196
func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Series, bloom *v1.Bloom) (int, error) {
9297
return func(series *v1.Series, bloom *v1.Bloom) (int, error) {
98+
start := time.Now()
99+
level.Debug(s.logger).Log(
100+
"msg", "populating bloom filter",
101+
"stage", "before",
102+
"fp", series.Fingerprint,
103+
"chunks", len(series.Chunks),
104+
)
93105
chunkItersWithFP, err := s.chunkLoader.Load(ctx, s.userID, series)
94106
if err != nil {
95107
return 0, errors.Wrapf(err, "failed to load chunks for series: %+v", series)
@@ -103,6 +115,16 @@ func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Se
103115
chunkItersWithFP.itr,
104116
)
105117

118+
level.Debug(s.logger).Log(
119+
"msg", "populating bloom filter",
120+
"stage", "after",
121+
"fp", series.Fingerprint,
122+
"chunks", len(series.Chunks),
123+
"series_bytes", bytesAdded,
124+
"duration", time.Since(start),
125+
"err", err,
126+
)
127+
106128
if s.reporter != nil {
107129
s.reporter(series.Fingerprint)
108130
}

pkg/storage/bloom/v1/builder.go

+17-9
Original file line numberDiff line numberDiff line change
@@ -157,26 +157,26 @@ func (b *BlockBuilder) AddSeries(series SeriesWithBloom) (bool, error) {
157157
return false, errors.Wrapf(err, "writing index for series %v", series.Series.Fingerprint)
158158
}
159159

160-
full, err := b.isBlockFull()
160+
full, _, err := b.IsBlockFull()
161161
if err != nil {
162162
return false, errors.Wrap(err, "checking if block is full")
163163
}
164164

165165
return full, nil
166166
}
167167

168-
func (b *BlockBuilder) isBlockFull() (bool, error) {
169-
// if the block size is 0, the max size is unlimited
170-
if b.opts.BlockSize == 0 {
171-
return false, nil
168+
func (b *BlockBuilder) IsBlockFull() (full bool, size int, err error) {
169+
size, err = b.writer.Size()
170+
if err != nil {
171+
return false, 0, errors.Wrap(err, "getting block size")
172172
}
173173

174-
size, err := b.writer.Size()
175-
if err != nil {
176-
return false, errors.Wrap(err, "getting block size")
174+
// if the block size is 0, the max size is unlimited
175+
if b.opts.BlockSize == 0 {
176+
return false, size, nil
177177
}
178178

179-
return uint64(size) >= b.opts.BlockSize, nil
179+
return uint64(size) >= b.opts.BlockSize, size, nil
180180
}
181181

182182
type BloomBlockBuilder struct {
@@ -657,6 +657,14 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (checksum uint32, totalByte
657657
return 0, totalBytes, errors.Wrap(err, "iterating store")
658658
}
659659

660+
flushedFor := blockFlushReasonFinished
661+
full, sz, _ := builder.IsBlockFull()
662+
if full {
663+
flushedFor = blockFlushReasonFull
664+
}
665+
mb.metrics.blockSize.Observe(float64(sz))
666+
mb.metrics.blockFlushReason.WithLabelValues(flushedFor).Inc()
667+
660668
checksum, err = builder.Close()
661669
if err != nil {
662670
return 0, totalBytes, errors.Wrap(err, "closing block")

pkg/storage/bloom/v1/metrics.go

+18
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ type Metrics struct {
1818
tokensTotal prometheus.Counter
1919
insertsTotal *prometheus.CounterVec
2020

21+
blockSize prometheus.Histogram
22+
blockFlushReason *prometheus.CounterVec
23+
2124
pagesRead *prometheus.CounterVec
2225
pagesSkipped *prometheus.CounterVec
2326
bytesRead *prometheus.CounterVec
@@ -34,6 +37,9 @@ const (
3437
collisionTypeTrue = "true"
3538
collisionTypeCache = "cache"
3639

40+
blockFlushReasonFull = "full"
41+
blockFlushReasonFinished = "finished"
42+
3743
pageTypeBloom = "bloom"
3844
pageTypeSeries = "series"
3945

@@ -94,6 +100,18 @@ func NewMetrics(r prometheus.Registerer) *Metrics {
94100
Help: "Number of inserts into the bloom filter. collision type may be `false` (no collision), `cache` (found in token cache) or true (found in bloom filter). token_type may be either `raw` (the original ngram) or `chunk_prefixed` (the ngram with the chunk prefix)",
95101
}, []string{"token_type", "collision"}),
96102

103+
blockSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
104+
Namespace: constants.Loki,
105+
Name: "bloom_block_size",
106+
Help: "Size of the bloom block in bytes",
107+
Buckets: prometheus.ExponentialBucketsRange(1<<20, 1<<30, 8),
108+
}),
109+
blockFlushReason: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
110+
Namespace: constants.Loki,
111+
Name: "bloom_block_flush_reason_total",
112+
Help: "Reason the block was finished. Can be either `full` (the block hit its maximum size) or `finished` (the block was finished due to the end of the series).",
113+
}, []string{"reason"}),
114+
97115
pagesRead: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
98116
Namespace: constants.Loki,
99117
Name: "bloom_pages_read_total",

0 commit comments

Comments
 (0)