Skip to content

Commit accdecd

Browse files
salvacortsbenclive
authored andcommitted
fix(blooms): Ship chunkrefs in task payload (grafana#13677)
1 parent 36154b2 commit accdecd

File tree

7 files changed

+644
-117
lines changed

7 files changed

+644
-117
lines changed

pkg/bloombuild/builder/builder.go

+3-9
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ func (b *Builder) processTask(
335335
// Fetch blocks that aren't up to date but are in the desired fingerprint range
336336
// to try and accelerate bloom creation.
337337
level.Debug(logger).Log("msg", "loading series and blocks for gap", "blocks", len(gap.Blocks))
338-
seriesItr, blocksIter, err := b.loadWorkForGap(ctx, task.Table, tenant, task.TSDB, gap)
338+
seriesItr, blocksIter, err := b.loadWorkForGap(ctx, task.Table, gap)
339339
if err != nil {
340340
level.Error(logger).Log("msg", "failed to get series and blocks", "err", err)
341341
return nil, fmt.Errorf("failed to get series and blocks: %w", err)
@@ -454,15 +454,9 @@ func (b *Builder) processTask(
454454
func (b *Builder) loadWorkForGap(
455455
ctx context.Context,
456456
table config.DayTable,
457-
tenant string,
458-
id tsdb.Identifier,
459-
gap protos.GapWithBlocks,
457+
gap protos.Gap,
460458
) (iter.Iterator[*v1.Series], iter.CloseResetIterator[*v1.SeriesWithBlooms], error) {
461-
// load a series iterator for the gap
462-
seriesItr, err := b.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.Bounds)
463-
if err != nil {
464-
return nil, nil, errors.Wrap(err, "failed to load tsdb")
465-
}
459+
seriesItr := iter.NewCancelableIter[*v1.Series](ctx, iter.NewSliceIter[*v1.Series](gap.Series))
466460

467461
// load a blocks iterator for the gap
468462
fetcher, err := b.bloomStore.Fetcher(table.ModelTime())

pkg/bloombuild/common/tsdb.go

+10-14
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"strings"
1010

1111
"github.com/go-kit/log"
12-
"github.com/go-kit/log/level"
1312
"github.com/pkg/errors"
1413
"github.com/prometheus/common/model"
1514
"github.com/prometheus/prometheus/model/labels"
@@ -30,6 +29,11 @@ const (
3029
gzipExtension = ".gz"
3130
)
3231

32+
type ClosableForSeries interface {
33+
sharding.ForSeries
34+
Close() error
35+
}
36+
3337
type TSDBStore interface {
3438
UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error)
3539
ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
@@ -38,8 +42,7 @@ type TSDBStore interface {
3842
table config.DayTable,
3943
tenant string,
4044
id tsdb.Identifier,
41-
bounds v1.FingerprintBounds,
42-
) (iter.Iterator[*v1.Series], error)
45+
) (ClosableForSeries, error)
4346
}
4447

4548
// BloomTSDBStore is a wrapper around the storage.Client interface which
@@ -90,8 +93,7 @@ func (b *BloomTSDBStore) LoadTSDB(
9093
table config.DayTable,
9194
tenant string,
9295
id tsdb.Identifier,
93-
bounds v1.FingerprintBounds,
94-
) (iter.Iterator[*v1.Series], error) {
96+
) (ClosableForSeries, error) {
9597
withCompression := id.Name() + gzipExtension
9698

9799
data, err := b.storage.GetUserFile(ctx, table.Addr(), tenant, withCompression)
@@ -118,13 +120,8 @@ func (b *BloomTSDBStore) LoadTSDB(
118120
}
119121

120122
idx := tsdb.NewTSDBIndex(reader)
121-
defer func() {
122-
if err := idx.Close(); err != nil {
123-
level.Error(b.logger).Log("msg", "failed to close index", "err", err)
124-
}
125-
}()
126123

127-
return NewTSDBSeriesIter(ctx, tenant, idx, bounds)
124+
return idx, nil
128125
}
129126

130127
func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[*v1.Series], error) {
@@ -251,12 +248,11 @@ func (s *TSDBStores) LoadTSDB(
251248
table config.DayTable,
252249
tenant string,
253250
id tsdb.Identifier,
254-
bounds v1.FingerprintBounds,
255-
) (iter.Iterator[*v1.Series], error) {
251+
) (ClosableForSeries, error) {
256252
store, err := s.storeForPeriod(table.DayTime)
257253
if err != nil {
258254
return nil, err
259255
}
260256

261-
return store.LoadTSDB(ctx, table, tenant, id, bounds)
257+
return store.LoadTSDB(ctx, table, tenant, id)
262258
}

pkg/bloombuild/planner/planner.go

+73-26
Original file line numberDiff line numberDiff line change
@@ -365,14 +365,37 @@ func (p *Planner) computeTasks(
365365
return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err)
366366
}
367367

368+
// Resolve TSDBs
369+
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
370+
if err != nil {
371+
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
372+
return nil, nil, fmt.Errorf("failed to resolve tsdbs: %w", err)
373+
}
374+
375+
if len(tsdbs) == 0 {
376+
return nil, metas, nil
377+
}
378+
379+
openTSDBs, err := openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs)
380+
if err != nil {
381+
return nil, nil, fmt.Errorf("failed to open all tsdbs: %w", err)
382+
}
383+
defer func() {
384+
for idx, reader := range openTSDBs {
385+
if err := reader.Close(); err != nil {
386+
level.Error(logger).Log("msg", "failed to close index", "err", err, "tsdb", idx.Name())
387+
}
388+
}
389+
}()
390+
368391
for _, ownershipRange := range ownershipRanges {
369392
logger := log.With(logger, "ownership", ownershipRange.String())
370393

371394
// Filter only the metas that overlap in the ownership range
372395
metasInBounds := bloomshipper.FilterMetasOverlappingBounds(metas, ownershipRange)
373396

374397
// Find gaps in the TSDBs for this tenant/table
375-
gaps, err := p.findOutdatedGaps(ctx, tenant, table, ownershipRange, metasInBounds, logger)
398+
gaps, err := p.findOutdatedGaps(ctx, tenant, openTSDBs, ownershipRange, metasInBounds, logger)
376399
if err != nil {
377400
level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err)
378401
continue
@@ -453,6 +476,26 @@ func (p *Planner) processTenantTaskResults(
453476
return tasksSucceed, nil
454477
}
455478

479+
func openAllTSDBs(
480+
ctx context.Context,
481+
table config.DayTable,
482+
tenant string,
483+
store common.TSDBStore,
484+
tsdbs []tsdb.SingleTenantTSDBIdentifier,
485+
) (map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, error) {
486+
openTSDBs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, len(tsdbs))
487+
for _, idx := range tsdbs {
488+
tsdb, err := store.LoadTSDB(ctx, table, tenant, idx)
489+
if err != nil {
490+
return nil, fmt.Errorf("failed to load tsdb: %w", err)
491+
}
492+
493+
openTSDBs[idx] = tsdb
494+
}
495+
496+
return openTSDBs, nil
497+
}
498+
456499
// deleteOutdatedMetasAndBlocks filters out the outdated metas from the `metas` argument and deletes them from the store.
457500
// It returns the up-to-date metas from the `metas` argument.
458501
func (p *Planner) deleteOutdatedMetasAndBlocks(
@@ -655,28 +698,17 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli
655698
// This is a performance optimization to avoid expensive re-reindexing
656699
type blockPlan struct {
657700
tsdb tsdb.SingleTenantTSDBIdentifier
658-
gaps []protos.GapWithBlocks
701+
gaps []protos.Gap
659702
}
660703

661704
func (p *Planner) findOutdatedGaps(
662705
ctx context.Context,
663706
tenant string,
664-
table config.DayTable,
707+
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
665708
ownershipRange v1.FingerprintBounds,
666709
metas []bloomshipper.Meta,
667710
logger log.Logger,
668711
) ([]blockPlan, error) {
669-
// Resolve TSDBs
670-
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
671-
if err != nil {
672-
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
673-
return nil, fmt.Errorf("failed to resolve tsdbs: %w", err)
674-
}
675-
676-
if len(tsdbs) == 0 {
677-
return nil, nil
678-
}
679-
680712
// Determine which TSDBs have gaps in the ownership range and need to
681713
// be processed.
682714
tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(ownershipRange, tsdbs, metas)
@@ -690,7 +722,7 @@ func (p *Planner) findOutdatedGaps(
690722
return nil, nil
691723
}
692724

693-
work, err := blockPlansForGaps(tsdbsWithGaps, metas)
725+
work, err := blockPlansForGaps(ctx, tenant, tsdbsWithGaps, metas)
694726
if err != nil {
695727
level.Error(logger).Log("msg", "failed to create plan", "err", err)
696728
return nil, fmt.Errorf("failed to create plan: %w", err)
@@ -701,18 +733,19 @@ func (p *Planner) findOutdatedGaps(
701733

702734
// Used to signal the gaps that need to be populated for a tsdb
703735
type tsdbGaps struct {
704-
tsdb tsdb.SingleTenantTSDBIdentifier
705-
gaps []v1.FingerprintBounds
736+
tsdbIdentifier tsdb.SingleTenantTSDBIdentifier
737+
tsdb common.ClosableForSeries
738+
gaps []v1.FingerprintBounds
706739
}
707740

708741
// gapsBetweenTSDBsAndMetas returns if the metas are up-to-date with the TSDBs. This is determined by asserting
709742
// that for each TSDB, there are metas covering the entire ownership range which were generated from that specific TSDB.
710743
func gapsBetweenTSDBsAndMetas(
711744
ownershipRange v1.FingerprintBounds,
712-
tsdbs []tsdb.SingleTenantTSDBIdentifier,
745+
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
713746
metas []bloomshipper.Meta,
714747
) (res []tsdbGaps, err error) {
715-
for _, db := range tsdbs {
748+
for db, tsdb := range tsdbs {
716749
id := db.Name()
717750

718751
relevantMetas := make([]v1.FingerprintBounds, 0, len(metas))
@@ -731,8 +764,9 @@ func gapsBetweenTSDBsAndMetas(
731764

732765
if len(gaps) > 0 {
733766
res = append(res, tsdbGaps{
734-
tsdb: db,
735-
gaps: gaps,
767+
tsdbIdentifier: db,
768+
tsdb: tsdb,
769+
gaps: gaps,
736770
})
737771
}
738772
}
@@ -743,22 +777,35 @@ func gapsBetweenTSDBsAndMetas(
743777
// blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks.
744778
// This allows us to expedite bloom generation by using existing blocks to fill in the gaps
745779
// since many will contain the same chunks.
746-
func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan, error) {
780+
func blockPlansForGaps(
781+
ctx context.Context,
782+
tenant string,
783+
tsdbs []tsdbGaps,
784+
metas []bloomshipper.Meta,
785+
) ([]blockPlan, error) {
747786
plans := make([]blockPlan, 0, len(tsdbs))
748787

749788
for _, idx := range tsdbs {
750789
plan := blockPlan{
751-
tsdb: idx.tsdb,
752-
gaps: make([]protos.GapWithBlocks, 0, len(idx.gaps)),
790+
tsdb: idx.tsdbIdentifier,
791+
gaps: make([]protos.Gap, 0, len(idx.gaps)),
753792
}
754793

755794
for _, gap := range idx.gaps {
756-
planGap := protos.GapWithBlocks{
795+
planGap := protos.Gap{
757796
Bounds: gap,
758797
}
759798

760-
for _, meta := range metas {
799+
seriesItr, err := common.NewTSDBSeriesIter(ctx, tenant, idx.tsdb, gap)
800+
if err != nil {
801+
return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.String(), err)
802+
}
803+
planGap.Series, err = iter.Collect(seriesItr)
804+
if err != nil {
805+
return nil, fmt.Errorf("failed to collect series: %w", err)
806+
}
761807

808+
for _, meta := range metas {
762809
if meta.Bounds.Intersection(gap) == nil {
763810
// this meta doesn't overlap the gap, skip
764811
continue

0 commit comments

Comments
 (0)