Skip to content

Commit 76ba24e

Browse files
authored
fix(blooms): Reset error on LazyBloomIter.Seek (#12806)
1 parent 4047902 commit 76ba24e

File tree

2 files changed

+106
-0
lines changed

2 files changed

+106
-0
lines changed

pkg/storage/bloom/v1/bloom_querier.go

+5
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ func (it *LazyBloomIter) ensureInit() {
4545
func (it *LazyBloomIter) Seek(offset BloomOffset) {
4646
it.ensureInit()
4747

48+
// reset error from any previous seek/next that yield pages too large
49+
if errors.Is(it.err, ErrPageTooLarge) {
50+
it.err = nil
51+
}
52+
4853
// if we need a different page or the current page hasn't been loaded,
4954
// load the desired page
5055
if it.curPageIndex != offset.Page || it.curPage == nil {

pkg/storage/bloom/v1/fuse_test.go

+101
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@ package v1
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"sync"
78
"testing"
89

910
"github.com/go-kit/log"
1011
"github.com/grafana/dskit/concurrency"
12+
"github.com/prometheus/common/model"
1113
"github.com/stretchr/testify/require"
1214

1315
"github.com/grafana/loki/v3/pkg/chunkenc"
16+
"github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter"
1417
)
1518

1619
// TODO(owen-d): this is unhinged from the data it represents. I'm leaving this solely so I don't
@@ -44,6 +47,18 @@ func TestFusedQuerier(t *testing.T) {
4447
numSeries := 1000
4548
data, keys := MkBasicSeriesWithBlooms(numSeries, 0, 0x0000, 0xffff, 0, 10000)
4649

50+
// Make the first and third series blooms too big to fit into a single page so we skip them while reading
51+
for i := 0; i < 10000; i++ {
52+
tokenizer := NewNGramTokenizer(4, 0)
53+
line := fmt.Sprintf("%04x:%04x", i, i+1)
54+
it := tokenizer.Tokens(line)
55+
for it.Next() {
56+
key := it.At()
57+
data[0].Bloom.Add(key)
58+
data[2].Bloom.Add(key)
59+
}
60+
}
61+
4762
builder, err := NewBlockBuilder(
4863
BlockOptions{
4964
Schema: Schema{
@@ -130,6 +145,92 @@ func TestFusedQuerier(t *testing.T) {
130145
}
131146
}
132147

148+
func TestLazyBloomIter_Seek_ResetError(t *testing.T) {
149+
// references for linking in memory reader+writer
150+
indexBuf := bytes.NewBuffer(nil)
151+
bloomsBuf := bytes.NewBuffer(nil)
152+
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
153+
reader := NewByteReader(indexBuf, bloomsBuf)
154+
155+
numSeries := 4
156+
data := make([]SeriesWithBloom, 0, numSeries)
157+
tokenizer := NewNGramTokenizer(4, 0)
158+
for i := 0; i < numSeries; i++ {
159+
var series Series
160+
series.Fingerprint = model.Fingerprint(i)
161+
series.Chunks = []ChunkRef{
162+
{
163+
From: 0,
164+
Through: 100,
165+
Checksum: uint32(i),
166+
},
167+
}
168+
169+
var bloom Bloom
170+
bloom.ScalableBloomFilter = *filter.NewScalableBloomFilter(1024, 0.01, 0.8)
171+
172+
nLines := 10
173+
if i == 0 || i == 2 {
174+
// Add enough lines to make the bloom page too large for series 1
175+
nLines = 10000
176+
}
177+
178+
for j := 0; j < nLines; j++ {
179+
line := fmt.Sprintf("%04x:%04x", i, j)
180+
it := tokenizer.Tokens(line)
181+
for it.Next() {
182+
key := it.At()
183+
bloom.Add(key)
184+
}
185+
}
186+
187+
data = append(data, SeriesWithBloom{
188+
Series: &series,
189+
Bloom: &bloom,
190+
})
191+
}
192+
193+
builder, err := NewBlockBuilder(
194+
BlockOptions{
195+
Schema: Schema{
196+
version: DefaultSchemaVersion,
197+
encoding: chunkenc.EncSnappy,
198+
},
199+
SeriesPageSize: 100,
200+
BloomPageSize: 10, // So we force one series per page
201+
},
202+
writer,
203+
)
204+
require.Nil(t, err)
205+
itr := NewSliceIter[SeriesWithBloom](data)
206+
_, err = builder.BuildFrom(itr)
207+
require.NoError(t, err)
208+
require.False(t, itr.Next())
209+
block := NewBlock(reader, NewMetrics(nil))
210+
211+
querier := NewBlockQuerier(block, true, 1000)
212+
213+
for fp := model.Fingerprint(0); fp < model.Fingerprint(numSeries); fp++ {
214+
err := querier.Seek(fp)
215+
require.NoError(t, err)
216+
217+
require.True(t, querier.series.Next())
218+
series := querier.series.At()
219+
require.Equal(t, fp, series.Fingerprint)
220+
221+
querier.blooms.Seek(series.Offset)
222+
223+
if fp == 0 || fp == 2 {
224+
require.False(t, querier.blooms.Next())
225+
require.Error(t, querier.blooms.Err())
226+
continue
227+
}
228+
229+
require.True(t, querier.blooms.Next())
230+
require.NoError(t, querier.blooms.Err())
231+
}
232+
}
233+
133234
func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Output) {
134235
indexBuf := bytes.NewBuffer(nil)
135236
bloomsBuf := bytes.NewBuffer(nil)

0 commit comments

Comments
 (0)