Skip to content

Commit feb210b

Browse files
committed
pass series to page decoder
1 parent 495fe22 commit feb210b

File tree

4 files changed

+33
-19
lines changed

4 files changed

+33
-19
lines changed

pkg/storage/bloom/v1/bloom.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package v1
33
import (
44
"bytes"
55
"fmt"
6+
"github.com/prometheus/common/model"
67
"io"
78

89
"github.com/go-kit/log/level"
@@ -377,13 +378,18 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) {
377378
return checksum, nil
378379
}
379380

380-
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, err error) {
381+
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics, s ...model.Fingerprint) (res *BloomPageDecoder, err error) {
381382
if pageIdx < 0 || pageIdx >= len(b.pageHeaders) {
382383
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Inc()
383384
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Add(float64(b.pageHeaders[pageIdx].DecompressedLen))
384385
return nil, fmt.Errorf("invalid page (%d) for bloom page decoding", pageIdx)
385386
}
386387

388+
var series model.Fingerprint
389+
if len(s) > 0 {
390+
series = s[0]
391+
}
392+
387393
page := b.pageHeaders[pageIdx]
388394
// fmt.Printf("pageIdx=%d page=%+v size=%.2fMiB\n", pageIdx, page, float64(page.Len)/float64(1<<20))
389395

@@ -465,6 +471,7 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize
465471

466472
level.Error(util_log.Logger).Log(
467473
"msg", "page too large",
474+
"series_fp", series.String(),
468475
"version", b.schema.version,
469476
"page", pageIdx,
470477
"len", page.Len,
@@ -488,7 +495,8 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize
488495
"fpRatesPerLayer", fpRatesPerLayer,
489496
)
490497

491-
return nil, ErrPageTooLarge
498+
return nil, fmt.Errorf("bloom too big for series %s - bytes(%d) origBytes(%d) layers(%d) fillRatio(%.2f) lastLayerFillRatio(%.2f) capacity(%d) capacityPerLayer(%s) countPerLayer(%s) bytesPerLayer(%s) fpRatesPerLayer(%s)",
499+
series, bloom.BytesSize(), page.Stats.Bytes, len(bloom.CapacityPerLayer()), bloom.FillRatio(), fillRatioLastLayer, bloom.Capacity(), capacityPerLayer, countPerLayer, bytesPerLayer, fpRatesPerLayer)
492500
}
493501

494502
if _, err = r.Seek(int64(page.Offset), io.SeekStart); err != nil {

pkg/storage/bloom/v1/bloom_querier.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package v1
22

33
import (
44
"github.com/pkg/errors"
5+
"github.com/prometheus/common/model"
56
)
67

78
type BloomQuerier interface {
@@ -44,7 +45,7 @@ func (it *LazyBloomIter) ensureInit() {
4445
}
4546
}
4647

47-
func (it *LazyBloomIter) Seek(offset BloomOffset) {
48+
func (it *LazyBloomIter) Seek(offset BloomOffset, series ...model.Fingerprint) {
4849
it.ensureInit()
4950

5051
// if we need a different page or the current page hasn't been loaded,
@@ -62,29 +63,28 @@ func (it *LazyBloomIter) Seek(offset BloomOffset) {
6263
it.err = errors.Wrap(err, "getting blooms reader")
6364
return
6465
}
65-
decoder, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics)
66+
decoder, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics, series...)
6667
if err != nil {
6768
it.err = errors.Wrap(err, "loading bloom page")
6869
return
6970
}
7071

7172
it.curPageIndex = offset.Page
7273
it.curPage = decoder
73-
7474
}
7575

7676
it.curPage.Seek(offset.ByteOffset)
7777
}
7878

79-
func (it *LazyBloomIter) Next() bool {
79+
func (it *LazyBloomIter) Next(series ...model.Fingerprint) bool {
8080
it.ensureInit()
8181
if it.err != nil {
8282
return false
8383
}
84-
return it.next()
84+
return it.next(series...)
8585
}
8686

87-
func (it *LazyBloomIter) next() bool {
87+
func (it *LazyBloomIter) next(series ...model.Fingerprint) bool {
8888
if it.err != nil {
8989
return false
9090
}
@@ -103,6 +103,7 @@ func (it *LazyBloomIter) next() bool {
103103
it.curPageIndex,
104104
it.m,
105105
it.b.metrics,
106+
series...,
106107
)
107108
if err != nil {
108109
it.err = err

pkg/storage/bloom/v1/fuse.go

+13-8
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,19 @@ func (fq *FusedQuerier) Run() error {
101101
)
102102

103103
// Now that we've found the series, we need to find the unpack the bloom
104-
fq.bq.blooms.Seek(series.Offset)
105-
if !fq.bq.blooms.Next() {
106-
if errors.Is(fq.bq.blooms.Err(), ErrPageTooLarge) {
107-
level.Warn(util_log.Logger).Log(
108-
"msg", "bloom too large for series",
109-
"series", series.Fingerprint,
110-
)
111-
}
104+
fq.bq.blooms.Seek(series.Offset, series.Fingerprint)
105+
if !fq.bq.blooms.Next(series.Fingerprint) {
106+
//if errors.Is(fq.bq.blooms.Err(), ErrPageTooLarge) {
107+
// level.Warn(util_log.Logger).Log(
108+
// "msg", "bloom too large for series",
109+
// "series", series.Fingerprint,
110+
// )
111+
//}
112+
113+
level.Warn(util_log.Logger).Log(
114+
"msg", "bloom too large for series",
115+
err, fq.bq.blooms.Err(),
116+
)
112117

113118
// fingerprint not found, can't remove chunks
114119
level.Debug(fq.logger).Log("msg", "fingerprint not found", "fp", series.Fingerprint, "err", fq.bq.blooms.Err())

pkg/storage/bloom/v1/metrics.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -153,13 +153,13 @@ func NewMetrics(r prometheus.Registerer) *Metrics {
153153
Namespace: constants.Loki,
154154
Name: "bloom_exp_source_bytes",
155155
Help: "Number of bytes in a bloom source",
156-
Buckets: prometheus.ExponentialBucketsRange(1<<10, 512<<20, 10),
156+
Buckets: prometheus.ExponentialBucketsRange(1<<10, 30<<30, 50), // 1KB to 30GB
157157
}, []string{"size"}),
158158
bloomLines: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
159159
Namespace: constants.Loki,
160160
Name: "bloom_exp_lines",
161161
Help: "Number of lines in a bloom",
162-
Buckets: prometheus.ExponentialBucketsRange(1, 33554432, 10),
162+
Buckets: prometheus.ExponentialBucketsRange(1, 50000000, 50),
163163
}, []string{"size"}),
164164
bloomLayers: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
165165
Namespace: constants.Loki,
@@ -171,7 +171,7 @@ func NewMetrics(r prometheus.Registerer) *Metrics {
171171
Namespace: constants.Loki,
172172
Name: "bloom_exp_bytes_size",
173173
Help: "Number of bytes in a bloom",
174-
Buckets: prometheus.ExponentialBucketsRange(1<<10, 512<<20, 10),
174+
Buckets: prometheus.ExponentialBucketsRange(1<<10, 512<<20, 20),
175175
}, []string{"size"}),
176176
bloomLastLayerFillRatio: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
177177
Namespace: constants.Loki,

0 commit comments

Comments
 (0)