Skip to content

Commit 9c263fb

Browse files
committed
integrates to new bloombuild pkg
Signed-off-by: Owen Diehl <[email protected]>
1 parent a202fe4 commit 9c263fb

File tree

4 files changed

+56
-69
lines changed

4 files changed

+56
-69
lines changed

pkg/bloombuild/builder/batch.go

+14-14
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,9 @@ func newBatchedBlockLoader(
168168
}
169169

170170
// compiler checks
171-
var _ v1.Iterator[*v1.SeriesWithBloom] = &blockLoadingIter{}
172-
var _ v1.CloseableIterator[*v1.SeriesWithBloom] = &blockLoadingIter{}
173-
var _ v1.ResettableIterator[*v1.SeriesWithBloom] = &blockLoadingIter{}
171+
var _ v1.Iterator[*v1.SeriesWithBlooms] = &blockLoadingIter{}
172+
var _ v1.CloseableIterator[*v1.SeriesWithBlooms] = &blockLoadingIter{}
173+
var _ v1.ResettableIterator[*v1.SeriesWithBlooms] = &blockLoadingIter{}
174174

175175
// TODO(chaudum): testware
176176
func newBlockLoadingIter(ctx context.Context, blocks []bloomshipper.BlockRef, fetcher FetchFunc[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier], batchSize int) *blockLoadingIter {
@@ -196,13 +196,13 @@ type blockLoadingIter struct {
196196
// internals
197197
initialized bool
198198
err error
199-
iter v1.Iterator[*v1.SeriesWithBloom]
199+
iter v1.Iterator[*v1.SeriesWithBlooms]
200200
loader *batchedLoader[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier, *bloomshipper.CloseableBlockQuerier]
201201
loaded map[io.Closer]struct{}
202202
}
203203

204204
// At implements v1.Iterator.
205-
func (i *blockLoadingIter) At() *v1.SeriesWithBloom {
205+
func (i *blockLoadingIter) At() *v1.SeriesWithBlooms {
206206
if !i.initialized {
207207
panic("iterator not initialized")
208208
}
@@ -229,7 +229,7 @@ func (i *blockLoadingIter) init() {
229229
i.overlapping = overlappingBlocksIter(i.inputs)
230230

231231
// set initial iter
232-
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
232+
i.iter = v1.NewEmptyIter[*v1.SeriesWithBlooms]()
233233

234234
// set "match all" filter function if not present
235235
if i.filter == nil {
@@ -249,22 +249,22 @@ func (i *blockLoadingIter) loadNext() bool {
249249
loader := newBatchedBlockLoader(i.ctx, i.fetcher, blockRefs, i.batchSize)
250250
filtered := v1.NewFilterIter[*bloomshipper.CloseableBlockQuerier](loader, i.filter)
251251

252-
iters := make([]v1.PeekingIterator[*v1.SeriesWithBloom], 0, len(blockRefs))
252+
iters := make([]v1.PeekingIterator[*v1.SeriesWithBlooms], 0, len(blockRefs))
253253
for filtered.Next() {
254254
bq := filtered.At()
255255
i.loaded[bq] = struct{}{}
256256
iter, err := bq.SeriesIter()
257257
if err != nil {
258258
i.err = err
259-
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
259+
i.iter = v1.NewEmptyIter[*v1.SeriesWithBlooms]()
260260
return false
261261
}
262262
iters = append(iters, iter)
263263
}
264264

265265
if err := filtered.Err(); err != nil {
266266
i.err = err
267-
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
267+
i.iter = v1.NewEmptyIter[*v1.SeriesWithBlooms]()
268268
return false
269269
}
270270

@@ -278,12 +278,12 @@ func (i *blockLoadingIter) loadNext() bool {
278278
// two overlapping blocks can conceivably have the same series, so we need to dedupe,
279279
// preferring the one with the most chunks already indexed since we'll have
280280
// to add fewer chunks to the bloom
281-
i.iter = v1.NewDedupingIter[*v1.SeriesWithBloom, *v1.SeriesWithBloom](
282-
func(a, b *v1.SeriesWithBloom) bool {
281+
i.iter = v1.NewDedupingIter[*v1.SeriesWithBlooms, *v1.SeriesWithBlooms](
282+
func(a, b *v1.SeriesWithBlooms) bool {
283283
return a.Series.Fingerprint == b.Series.Fingerprint
284284
},
285-
v1.Identity[*v1.SeriesWithBloom],
286-
func(a, b *v1.SeriesWithBloom) *v1.SeriesWithBloom {
285+
v1.Identity[*v1.SeriesWithBlooms],
286+
func(a, b *v1.SeriesWithBlooms) *v1.SeriesWithBlooms {
287287
if len(a.Series.Chunks) > len(b.Series.Chunks) {
288288
return a
289289
}
@@ -294,7 +294,7 @@ func (i *blockLoadingIter) loadNext() bool {
294294
return i.iter.Next()
295295
}
296296

297-
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
297+
i.iter = v1.NewEmptyIter[*v1.SeriesWithBlooms]()
298298
i.err = i.overlapping.Err()
299299
return false
300300
}

pkg/bloombuild/builder/batch_test.go

+10
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"testing"
77

8+
"github.com/prometheus/common/model"
89
"github.com/stretchr/testify/require"
910

1011
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
@@ -208,3 +209,12 @@ func TestOverlappingBlocksIter(t *testing.T) {
208209
})
209210
}
210211
}
212+
213+
func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef {
214+
bounds := v1.NewBounds(min, max)
215+
return bloomshipper.BlockRef{
216+
Ref: bloomshipper.Ref{
217+
Bounds: bounds,
218+
},
219+
}
220+
}

pkg/bloombuild/builder/spec.go

+21-36
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66
"io"
7-
"time"
87

98
"github.com/go-kit/log"
109
"github.com/go-kit/log/level"
@@ -45,7 +44,7 @@ type SimpleBloomGenerator struct {
4544
userID string
4645
store v1.Iterator[*v1.Series]
4746
chunkLoader ChunkLoader
48-
blocksIter v1.ResettableIterator[*v1.SeriesWithBloom]
47+
blocksIter v1.ResettableIterator[*v1.SeriesWithBlooms]
4948

5049
// options to build blocks with
5150
opts v1.BlockOptions
@@ -68,7 +67,7 @@ func NewSimpleBloomGenerator(
6867
opts v1.BlockOptions,
6968
store v1.Iterator[*v1.Series],
7069
chunkLoader ChunkLoader,
71-
blocksIter v1.ResettableIterator[*v1.SeriesWithBloom],
70+
blocksIter v1.ResettableIterator[*v1.SeriesWithBlooms],
7271
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
7372
reporter func(model.Fingerprint),
7473
metrics *Metrics,
@@ -98,44 +97,30 @@ func NewSimpleBloomGenerator(
9897
}
9998
}
10099

101-
func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Series, bloom *v1.Bloom) (int, bool, error) {
102-
return func(series *v1.Series, bloom *v1.Bloom) (int, bool, error) {
103-
start := time.Now()
100+
func (s *SimpleBloomGenerator) populator(ctx context.Context) v1.BloomPopulatorFunc {
101+
return func(
102+
series *v1.Series,
103+
srcBlooms v1.SizedIterator[*v1.Bloom],
104+
toAdd v1.ChunkRefs,
105+
ch chan *v1.BloomCreation,
106+
) {
104107
level.Debug(s.logger).Log(
105108
"msg", "populating bloom filter",
106109
"stage", "before",
107110
"fp", series.Fingerprint,
108111
"chunks", len(series.Chunks),
109112
)
110-
chunkItersWithFP, err := s.chunkLoader.Load(ctx, s.userID, series)
111-
if err != nil {
112-
return 0, false, errors.Wrapf(err, "failed to load chunks for series: %+v", series)
113-
}
114-
115-
bytesAdded, skip, err := s.tokenizer.Populate(
116-
&v1.SeriesWithBloom{
117-
Series: series,
118-
Bloom: bloom,
119-
},
120-
chunkItersWithFP.itr,
121-
)
113+
chunkItersWithFP := s.chunkLoader.Load(ctx, s.userID, &v1.Series{
114+
Fingerprint: series.Fingerprint,
115+
Chunks: toAdd,
116+
})
122117

123-
level.Debug(s.logger).Log(
124-
"msg", "populating bloom filter",
125-
"stage", "after",
126-
"fp", series.Fingerprint,
127-
"chunks", len(series.Chunks),
128-
"series_bytes", bytesAdded,
129-
"duration", time.Since(start),
130-
"err", err,
131-
)
118+
s.tokenizer.Populate(srcBlooms, chunkItersWithFP.itr, ch)
132119

133120
if s.reporter != nil {
134121
s.reporter(series.Fingerprint)
135122
}
136-
return bytesAdded, skip, err
137123
}
138-
139124
}
140125

141126
func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIterator {
@@ -179,10 +164,10 @@ type LazyBlockBuilderIterator struct {
179164
ctx context.Context
180165
opts v1.BlockOptions
181166
metrics *Metrics
182-
populate func(*v1.Series, *v1.Bloom) (int, bool, error)
167+
populate v1.BloomPopulatorFunc
183168
readWriterFn func() (v1.BlockWriter, v1.BlockReader)
184169
series v1.PeekingIterator[*v1.Series]
185-
blocks v1.ResettableIterator[*v1.SeriesWithBloom]
170+
blocks v1.ResettableIterator[*v1.SeriesWithBlooms]
186171

187172
bytesAdded int
188173
curr *v1.Block
@@ -193,10 +178,10 @@ func NewLazyBlockBuilderIterator(
193178
ctx context.Context,
194179
opts v1.BlockOptions,
195180
metrics *Metrics,
196-
populate func(*v1.Series, *v1.Bloom) (int, bool, error),
181+
populate v1.BloomPopulatorFunc,
197182
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
198183
series v1.PeekingIterator[*v1.Series],
199-
blocks v1.ResettableIterator[*v1.SeriesWithBloom],
184+
blocks v1.ResettableIterator[*v1.SeriesWithBlooms],
200185
) *LazyBlockBuilderIterator {
201186
return &LazyBlockBuilderIterator{
202187
ctx: ctx,
@@ -270,7 +255,7 @@ type ChunkItersByFingerprint struct {
270255

271256
// ChunkLoader loads chunks from a store
272257
type ChunkLoader interface {
273-
Load(ctx context.Context, userID string, series *v1.Series) (*ChunkItersByFingerprint, error)
258+
Load(ctx context.Context, userID string, series *v1.Series) *ChunkItersByFingerprint
274259
}
275260

276261
// StoreChunkLoader loads chunks from a store
@@ -286,7 +271,7 @@ func NewStoreChunkLoader(fetcherProvider stores.ChunkFetcherProvider, metrics *M
286271
}
287272
}
288273

289-
func (s *StoreChunkLoader) Load(ctx context.Context, userID string, series *v1.Series) (*ChunkItersByFingerprint, error) {
274+
func (s *StoreChunkLoader) Load(ctx context.Context, userID string, series *v1.Series) *ChunkItersByFingerprint {
290275
// NB(owen-d): This is probably unnecessary as we should only have one fetcher
291276
// because we'll only be working on a single index period at a time, but this should protect
292277
// us in the case of refactoring/changing this and likely isn't a perf bottleneck.
@@ -317,5 +302,5 @@ func (s *StoreChunkLoader) Load(ctx context.Context, userID string, series *v1.S
317302
return &ChunkItersByFingerprint{
318303
fp: series.Fingerprint,
319304
itr: newBatchedChunkLoader(ctx, fetchers, inputs, s.metrics, batchedLoaderDefaultBatchSize),
320-
}, nil
305+
}
321306
}

pkg/bloombuild/builder/spec_test.go

+11-19
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,19 @@ import (
1515
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
1616
)
1717

18-
func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBloom, refs []bloomshipper.BlockRef) {
18+
func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBlooms, refs []bloomshipper.BlockRef) {
1919
return blocksFromSchemaWithRange(t, n, options, 0, 0xffff)
2020
}
2121

2222
// splits 100 series across `n` non-overlapping blocks.
2323
// uses options to build blocks with.
24-
func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fromFP, throughFp model.Fingerprint) (res []*v1.Block, data []v1.SeriesWithBloom, refs []bloomshipper.BlockRef) {
24+
func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fromFP, throughFp model.Fingerprint) (res []*v1.Block, data []v1.SeriesWithBlooms, refs []bloomshipper.BlockRef) {
2525
if 100%n != 0 {
2626
panic("100 series must be evenly divisible by n")
2727
}
2828

2929
numSeries := 100
30-
data, _ = v1.MkBasicSeriesWithBlooms(numSeries, 0, fromFP, throughFp, 0, 10000)
30+
data, _ = v1.MkBasicSeriesWithBlooms(numSeries, fromFP, throughFp, 0, 10000)
3131

3232
seriesPerBlock := numSeries / n
3333

@@ -46,7 +46,7 @@ func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fro
4646

4747
minIdx, maxIdx := i*seriesPerBlock, (i+1)*seriesPerBlock
4848

49-
itr := v1.NewSliceIter[v1.SeriesWithBloom](data[minIdx:maxIdx])
49+
itr := v1.NewSliceIter[v1.SeriesWithBlooms](data[minIdx:maxIdx])
5050
_, err = builder.BuildFrom(itr)
5151
require.Nil(t, err)
5252

@@ -62,11 +62,11 @@ func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fro
6262
// doesn't actually load any chunks
6363
type dummyChunkLoader struct{}
6464

65-
func (dummyChunkLoader) Load(_ context.Context, _ string, series *v1.Series) (*ChunkItersByFingerprint, error) {
65+
func (dummyChunkLoader) Load(_ context.Context, _ string, series *v1.Series) *ChunkItersByFingerprint {
6666
return &ChunkItersByFingerprint{
6767
fp: series.Fingerprint,
6868
itr: v1.NewEmptyIter[v1.ChunkRefWithIter](),
69-
}, nil
69+
}
7070
}
7171

7272
func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks []*v1.Block, refs []bloomshipper.BlockRef) *SimpleBloomGenerator {
@@ -132,9 +132,9 @@ func TestSimpleBloomGenerator(t *testing.T) {
132132
} {
133133
t.Run(fmt.Sprintf("%s/%s", tc.desc, enc), func(t *testing.T) {
134134
sourceBlocks, data, refs := blocksFromSchemaWithRange(t, 2, tc.fromSchema, 0x00000, 0x6ffff)
135-
storeItr := v1.NewMapIter[v1.SeriesWithBloom, *v1.Series](
136-
v1.NewSliceIter[v1.SeriesWithBloom](data),
137-
func(swb v1.SeriesWithBloom) *v1.Series {
135+
storeItr := v1.NewMapIter[v1.SeriesWithBlooms, *v1.Series](
136+
v1.NewSliceIter[v1.SeriesWithBlooms](data),
137+
func(swb v1.SeriesWithBlooms) *v1.Series {
138138
return swb.Series
139139
},
140140
)
@@ -150,9 +150,9 @@ func TestSimpleBloomGenerator(t *testing.T) {
150150

151151
// Check all the input series are present in the output blocks.
152152
expectedRefs := v1.PointerSlice(data)
153-
outputRefs := make([]*v1.SeriesWithBloom, 0, len(data))
153+
outputRefs := make([]*v1.SeriesWithBlooms, 0, len(data))
154154
for _, block := range outputBlocks {
155-
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize)
155+
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize).Iter()
156156
for bq.Next() {
157157
outputRefs = append(outputRefs, bq.At())
158158
}
@@ -164,13 +164,5 @@ func TestSimpleBloomGenerator(t *testing.T) {
164164
})
165165
}
166166
}
167-
}
168167

169-
func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef {
170-
bounds := v1.NewBounds(min, max)
171-
return bloomshipper.BlockRef{
172-
Ref: bloomshipper.Ref{
173-
Bounds: bounds,
174-
},
175-
}
176168
}

0 commit comments

Comments
 (0)