Skip to content

Commit 673ede1

Browse files
authored
feat: New bloom planning using chunk size TSDB stats (#14547)
1 parent 4879d10 commit 673ede1

File tree

11 files changed

+592
-59
lines changed

11 files changed

+592
-59
lines changed

docs/sources/shared/configuration.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -3765,7 +3765,7 @@ shard_streams:
37653765
[bloom_creation_enabled: <boolean> | default = false]
37663766

37673767
# Experimental. Bloom planning strategy to use in bloom creation. Can be one of:
3768-
# 'split_keyspace_by_factor'
3768+
# 'split_keyspace_by_factor', 'split_by_series_chunks_size'
37693769
# CLI flag: -bloom-build.planning-strategy
37703770
[bloom_planning_strategy: <string> | default = "split_keyspace_by_factor"]
37713771

@@ -3775,6 +3775,10 @@ shard_streams:
37753775
# CLI flag: -bloom-build.split-keyspace-by
37763776
[bloom_split_series_keyspace_by: <int> | default = 256]
37773777

3778+
# Experimental. Target chunk size in bytes for bloom tasks. Default is 20GB.
3779+
# CLI flag: -bloom-build.split-target-series-chunk-size
3780+
[bloom_task_target_series_chunk_size: <int> | default = 20GB]
3781+
37783782
# Experimental. Compression algorithm for bloom block pages.
37793783
# CLI flag: -bloom-build.block-encoding
37803784
[bloom_block_encoding: <string> | default = "none"]

integration/bloom_building_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ func TestBloomBuilding(t *testing.T) {
9696
"-bloom-build.planner.interval=15s",
9797
"-bloom-build.planner.min-table-offset=0", // Disable table offset so we process today's data.
9898
"-bloom.cache-list-ops=0", // Disable cache list operations to avoid caching issues.
99+
"-bloom-build.planning-strategy=split_by_series_chunks_size",
100+
"-bloom-build.split-target-series-chunk-size=1KB",
99101
)
100102
require.NoError(t, clu.Run())
101103

pkg/bloombuild/planner/planner.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,6 @@ func (p *Planner) runOne(ctx context.Context) error {
281281
continue
282282
}
283283

284-
level.Debug(logger).Log("msg", "computed tasks", "tasks", len(tasks), "existingMetas", len(existingMetas))
285-
286284
var tenantTableEnqueuedTasks int
287285
resultsCh := make(chan *protos.TaskResult, len(tasks))
288286

@@ -377,13 +375,13 @@ func (p *Planner) computeTasks(
377375
table config.DayTable,
378376
tenant string,
379377
) ([]*protos.Task, []bloomshipper.Meta, error) {
380-
logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant)
381-
382378
strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger)
383379
if err != nil {
384380
return nil, nil, fmt.Errorf("error creating strategy: %w", err)
385381
}
386382

383+
logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant, "strategy", strategy.Name())
384+
387385
// Fetch source metas to be used in both build and cleanup of out-of-date metas+blooms
388386
metas, err := p.bloomStore.FetchMetas(
389387
ctx,
@@ -432,6 +430,7 @@ func (p *Planner) computeTasks(
432430
return nil, nil, fmt.Errorf("failed to plan tasks: %w", err)
433431
}
434432

433+
level.Debug(logger).Log("msg", "computed tasks", "tasks", len(tasks), "existingMetas", len(metas))
435434
return tasks, metas, nil
436435
}
437436

pkg/bloombuild/planner/planner_test.go

+9
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"google.golang.org/grpc"
1919

2020
"github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest"
21+
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
2122
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
2223
"github.com/grafana/loki/v3/pkg/storage"
2324
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
@@ -750,3 +751,11 @@ func (f *fakeLimits) BloomBuildMaxBuilders(_ string) int {
750751
func (f *fakeLimits) BloomTaskMaxRetries(_ string) int {
751752
return f.maxRetries
752753
}
754+
755+
func (f *fakeLimits) BloomPlanningStrategy(_ string) string {
756+
return strategies.SplitBySeriesChunkSizeStrategyName
757+
}
758+
759+
func (f *fakeLimits) BloomTaskTargetSeriesChunksSizeBytes(_ string) uint64 {
760+
return 1 << 20 // 1MB
761+
}

pkg/bloombuild/planner/plannertest/utils.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,12 @@ func GenBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
8888
}
8989

9090
func GenSeries(bounds v1.FingerprintBounds) []*v1.Series {
91-
series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1))
92-
for i := bounds.Min; i <= bounds.Max; i++ {
91+
return GenSeriesWithStep(bounds, 1)
92+
}
93+
94+
func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series {
95+
series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)/step)
96+
for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) {
9397
series = append(series, &v1.Series{
9498
Fingerprint: i,
9599
Chunks: v1.ChunkRefs{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
package strategies
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math"
7+
"sort"
8+
9+
"github.com/dustin/go-humanize"
10+
"github.com/go-kit/log"
11+
"github.com/go-kit/log/level"
12+
"github.com/prometheus/common/model"
13+
"github.com/prometheus/prometheus/model/labels"
14+
15+
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
16+
iter "github.com/grafana/loki/v3/pkg/iter/v2"
17+
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
18+
"github.com/grafana/loki/v3/pkg/storage/config"
19+
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
20+
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
21+
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
22+
)
23+
24+
type ChunkSizeStrategyLimits interface {
25+
BloomTaskTargetSeriesChunksSizeBytes(tenantID string) uint64
26+
}
27+
28+
type ChunkSizeStrategy struct {
29+
limits ChunkSizeStrategyLimits
30+
logger log.Logger
31+
}
32+
33+
func NewChunkSizeStrategy(
34+
limits ChunkSizeStrategyLimits,
35+
logger log.Logger,
36+
) (*ChunkSizeStrategy, error) {
37+
return &ChunkSizeStrategy{
38+
limits: limits,
39+
logger: logger,
40+
}, nil
41+
}
42+
43+
func (s *ChunkSizeStrategy) Name() string {
44+
return SplitBySeriesChunkSizeStrategyName
45+
}
46+
47+
func (s *ChunkSizeStrategy) Plan(
48+
ctx context.Context,
49+
table config.DayTable,
50+
tenant string,
51+
tsdbs TSDBSet,
52+
metas []bloomshipper.Meta,
53+
) ([]*protos.Task, error) {
54+
targetTaskSize := s.limits.BloomTaskTargetSeriesChunksSizeBytes(tenant)
55+
56+
logger := log.With(s.logger, "table", table.Addr(), "tenant", tenant)
57+
level.Debug(s.logger).Log("msg", "loading work for tenant", "target task size", humanize.Bytes(targetTaskSize))
58+
59+
// Determine which TSDBs have gaps and need to be processed.
60+
tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(v1.NewBounds(0, math.MaxUint64), tsdbs, metas)
61+
if err != nil {
62+
level.Error(logger).Log("msg", "failed to find gaps", "err", err)
63+
return nil, fmt.Errorf("failed to find gaps: %w", err)
64+
}
65+
66+
if len(tsdbsWithGaps) == 0 {
67+
level.Debug(logger).Log("msg", "blooms exist for all tsdbs")
68+
return nil, nil
69+
}
70+
71+
sizedIter, iterSize, err := s.sizedSeriesIter(ctx, tenant, tsdbsWithGaps, targetTaskSize)
72+
if err != nil {
73+
return nil, fmt.Errorf("failed to get sized series iter: %w", err)
74+
}
75+
76+
tasks := make([]*protos.Task, 0, iterSize)
77+
for sizedIter.Next() {
78+
series := sizedIter.At()
79+
if series.Len() == 0 {
80+
// This should never happen, but just in case.
81+
level.Warn(logger).Log("msg", "got empty series batch", "tsdb", series.TSDB().Name())
82+
continue
83+
}
84+
85+
bounds := series.Bounds()
86+
87+
blocks, err := getBlocksMatchingBounds(metas, bounds)
88+
if err != nil {
89+
return nil, fmt.Errorf("failed to get blocks matching bounds: %w", err)
90+
}
91+
92+
planGap := protos.Gap{
93+
Bounds: bounds,
94+
Series: series.V1Series(),
95+
Blocks: blocks,
96+
}
97+
98+
tasks = append(tasks, protos.NewTask(table, tenant, bounds, series.TSDB(), []protos.Gap{planGap}))
99+
}
100+
if err := sizedIter.Err(); err != nil {
101+
return nil, fmt.Errorf("failed to iterate over sized series: %w", err)
102+
}
103+
104+
return tasks, nil
105+
}
106+
107+
func getBlocksMatchingBounds(metas []bloomshipper.Meta, bounds v1.FingerprintBounds) ([]bloomshipper.BlockRef, error) {
108+
blocks := make([]bloomshipper.BlockRef, 0, 10)
109+
110+
for _, meta := range metas {
111+
if meta.Bounds.Intersection(bounds) == nil {
112+
// this meta doesn't overlap the gap, skip
113+
continue
114+
}
115+
116+
for _, block := range meta.Blocks {
117+
if block.Bounds.Intersection(bounds) == nil {
118+
// this block doesn't overlap the gap, skip
119+
continue
120+
}
121+
// this block overlaps the gap, add it to the plan
122+
// for this gap
123+
blocks = append(blocks, block)
124+
}
125+
}
126+
127+
// ensure we sort blocks so deduping iterator works as expected
128+
sort.Slice(blocks, func(i, j int) bool {
129+
return blocks[i].Bounds.Less(blocks[j].Bounds)
130+
})
131+
132+
peekingBlocks := iter.NewPeekIter(
133+
iter.NewSliceIter(
134+
blocks,
135+
),
136+
)
137+
138+
// dedupe blocks which could be in multiple metas
139+
itr := iter.NewDedupingIter(
140+
func(a, b bloomshipper.BlockRef) bool {
141+
return a == b
142+
},
143+
iter.Identity[bloomshipper.BlockRef],
144+
func(a, _ bloomshipper.BlockRef) bloomshipper.BlockRef {
145+
return a
146+
},
147+
peekingBlocks,
148+
)
149+
150+
deduped, err := iter.Collect(itr)
151+
if err != nil {
152+
return nil, fmt.Errorf("failed to dedupe blocks: %w", err)
153+
}
154+
155+
return deduped, nil
156+
}
157+
158+
type seriesWithChunks struct {
159+
tsdb tsdb.SingleTenantTSDBIdentifier
160+
fp model.Fingerprint
161+
chunks []index.ChunkMeta
162+
}
163+
164+
type seriesBatch struct {
165+
series []seriesWithChunks
166+
size uint64
167+
}
168+
169+
func newSeriesBatch() seriesBatch {
170+
return seriesBatch{
171+
series: make([]seriesWithChunks, 0, 100),
172+
}
173+
}
174+
175+
func (b *seriesBatch) Bounds() v1.FingerprintBounds {
176+
if len(b.series) == 0 {
177+
return v1.NewBounds(0, 0)
178+
}
179+
180+
// We assume that the series are sorted by fingerprint.
181+
// This is guaranteed since series are iterated in order by the TSDB.
182+
return v1.NewBounds(b.series[0].fp, b.series[len(b.series)-1].fp)
183+
}
184+
185+
func (b *seriesBatch) V1Series() []*v1.Series {
186+
series := make([]*v1.Series, 0, len(b.series))
187+
for _, s := range b.series {
188+
res := &v1.Series{
189+
Fingerprint: s.fp,
190+
Chunks: make(v1.ChunkRefs, 0, len(s.chunks)),
191+
}
192+
for _, chk := range s.chunks {
193+
res.Chunks = append(res.Chunks, v1.ChunkRef{
194+
From: model.Time(chk.MinTime),
195+
Through: model.Time(chk.MaxTime),
196+
Checksum: chk.Checksum,
197+
})
198+
}
199+
200+
series = append(series, res)
201+
}
202+
203+
return series
204+
}
205+
206+
func (b *seriesBatch) Append(s seriesWithChunks, size uint64) {
207+
b.series = append(b.series, s)
208+
b.size += size
209+
}
210+
211+
func (b *seriesBatch) Len() int {
212+
return len(b.series)
213+
}
214+
215+
func (b *seriesBatch) Size() uint64 {
216+
return b.size
217+
}
218+
219+
func (b *seriesBatch) TSDB() tsdb.SingleTenantTSDBIdentifier {
220+
if len(b.series) == 0 {
221+
return tsdb.SingleTenantTSDBIdentifier{}
222+
}
223+
return b.series[0].tsdb
224+
}
225+
226+
func (s *ChunkSizeStrategy) sizedSeriesIter(
227+
ctx context.Context,
228+
tenant string,
229+
tsdbsWithGaps []tsdbGaps,
230+
targetTaskSizeBytes uint64,
231+
) (iter.Iterator[seriesBatch], int, error) {
232+
batches := make([]seriesBatch, 0, 100)
233+
currentBatch := newSeriesBatch()
234+
235+
for _, idx := range tsdbsWithGaps {
236+
for _, gap := range idx.gaps {
237+
if err := idx.tsdb.ForSeries(
238+
ctx,
239+
tenant,
240+
gap,
241+
0, math.MaxInt64,
242+
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) {
243+
select {
244+
case <-ctx.Done():
245+
return true
246+
default:
247+
var seriesSize uint64
248+
for _, chk := range chks {
249+
seriesSize += uint64(chk.KB * 1024)
250+
}
251+
252+
// Cut a new batch IF the current batch is not empty (so we add at least one series to the batch)
253+
// AND Adding this series to the batch would exceed the target task size.
254+
if currentBatch.Len() > 0 && currentBatch.Size()+seriesSize > targetTaskSizeBytes {
255+
batches = append(batches, currentBatch)
256+
currentBatch = newSeriesBatch()
257+
}
258+
259+
currentBatch.Append(seriesWithChunks{
260+
tsdb: idx.tsdbIdentifier,
261+
fp: fp,
262+
chunks: chks,
263+
}, seriesSize)
264+
return false
265+
}
266+
},
267+
labels.MustNewMatcher(labels.MatchEqual, "", ""),
268+
); err != nil {
269+
return nil, 0, err
270+
}
271+
272+
// Add the last batch for this TSDB if it's not empty.
273+
if currentBatch.Len() > 0 {
274+
batches = append(batches, currentBatch)
275+
currentBatch = newSeriesBatch()
276+
}
277+
}
278+
}
279+
280+
select {
281+
case <-ctx.Done():
282+
return iter.NewEmptyIter[seriesBatch](), 0, ctx.Err()
283+
default:
284+
return iter.NewCancelableIter[seriesBatch](ctx, iter.NewSliceIter[seriesBatch](batches)), len(batches), nil
285+
}
286+
}

0 commit comments

Comments
 (0)