Skip to content

Commit bfa6955

Browse files
authored
fix(blooms): skip empty blooms on reads (#13500)
Read path safeguard to prevent filtering chunks associated with empty blooms while we work on finding the underlying cause of empty blooms in compaction.
1 parent 652ad24 commit bfa6955

File tree

3 files changed

+77
-0
lines changed

3 files changed

+77
-0
lines changed

pkg/storage/bloom/v1/filter/scalable.go

+7
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,13 @@ func (s *ScalableBloomFilter) K() uint {
110110
return s.filters[len(s.filters)-1].K()
111111
}
112112

113+
func (s *ScalableBloomFilter) Count() (ct int) {
114+
for _, filter := range s.filters {
115+
ct += int(filter.Count())
116+
}
117+
return
118+
}
119+
113120
// FillRatio returns the average ratio of set bits across every filter.
114121
func (s *ScalableBloomFilter) FillRatio() float64 {
115122
var sum, count float64

pkg/storage/bloom/v1/fuse.go

+7
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,13 @@ func (fq *FusedQuerier) runSeries(schema Schema, series *SeriesWithOffsets, reqs
300300
// Test each bloom individually
301301
bloom := fq.bq.blooms.At()
302302
for j, req := range reqs {
303+
// TODO(owen-d): this is a stopgap to avoid filtering broken blooms until we find their cause.
304+
// In the case we don't have any data in the bloom, don't filter any chunks.
305+
if bloom.ScalableBloomFilter.Count() == 0 {
306+
for k := range inputs[j].InBlooms {
307+
inputs[j].found[k] = true
308+
}
309+
}
303310

304311
// shortcut: series level removal
305312
// we can skip testing chunk keys individually if the bloom doesn't match

pkg/storage/bloom/v1/fuse_test.go

+63
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,69 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) {
356356
}
357357
}
358358

359+
func TestFusedQuerierSkipsEmptyBlooms(t *testing.T) {
360+
// references for linking in memory reader+writer
361+
indexBuf := bytes.NewBuffer(nil)
362+
bloomsBuf := bytes.NewBuffer(nil)
363+
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
364+
reader := NewByteReader(indexBuf, bloomsBuf)
365+
366+
builder, err := NewBlockBuilder(
367+
BlockOptions{
368+
Schema: Schema{
369+
version: DefaultSchemaVersion,
370+
encoding: chunkenc.EncNone,
371+
},
372+
SeriesPageSize: 100,
373+
BloomPageSize: 10 << 10,
374+
},
375+
writer,
376+
)
377+
require.Nil(t, err)
378+
379+
data := SeriesWithBlooms{
380+
Series: &Series{
381+
Fingerprint: 0,
382+
Chunks: []ChunkRef{
383+
{
384+
From: 0,
385+
Through: 10,
386+
Checksum: 0x1234,
387+
},
388+
},
389+
},
390+
Blooms: v2.NewSliceIter([]*Bloom{
391+
// simulate empty bloom
392+
{
393+
*filter.NewScalableBloomFilter(1024, 0.01, 0.8),
394+
},
395+
}),
396+
}
397+
398+
itr := v2.NewSliceIter[SeriesWithBlooms]([]SeriesWithBlooms{data})
399+
_, err = builder.BuildFrom(itr)
400+
require.NoError(t, err)
401+
require.False(t, itr.Next())
402+
block := NewBlock(reader, NewMetrics(nil))
403+
ch := make(chan Output, 1)
404+
req := Request{
405+
Fp: data.Series.Fingerprint,
406+
Chks: data.Series.Chunks,
407+
Search: keysToBloomTest([][]byte{[]byte("foobar")}),
408+
Response: ch,
409+
Recorder: NewBloomRecorder(context.Background(), "unknown"),
410+
}
411+
err = NewBlockQuerier(block, BloomPagePool, DefaultMaxPageSize).Fuse(
412+
[]v2.PeekIterator[Request]{
413+
v2.NewPeekIter[Request](v2.NewSliceIter[Request]([]Request{req})),
414+
},
415+
log.NewNopLogger(),
416+
).Run()
417+
require.NoError(t, err)
418+
x := <-ch
419+
require.Equal(t, 0, len(x.Removals))
420+
}
421+
359422
func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Output) {
360423
indexBuf := bytes.NewBuffer(nil)
361424
bloomsBuf := bytes.NewBuffer(nil)

0 commit comments

Comments
 (0)