diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index b5bb682e8efcd..4e45467eb52f7 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -31,7 +31,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" - "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" utillog "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/util/ring" ) @@ -415,7 +414,6 @@ func (b *Builder) processTask( Bounds: gap.Bounds, }, }, - Sources: []tsdb.SingleTenantTSDBIdentifier{task.TSDB}, } // Fetch blocks that aren't up to date but are in the desired fingerprint range @@ -492,6 +490,7 @@ func (b *Builder) processTask( level.Debug(logger).Log("msg", "uploaded block", "progress_pct", fmt.Sprintf("%.2f", pct)) meta.Blocks = append(meta.Blocks, built.BlockRef) + meta.Sources = append(meta.Sources, task.TSDB) } if err := newBlocks.Err(); err != nil { diff --git a/pkg/bloomgateway/resolver.go b/pkg/bloomgateway/resolver.go index 0f6fe27626958..71f410ad8f3d4 100644 --- a/pkg/bloomgateway/resolver.go +++ b/pkg/bloomgateway/resolver.go @@ -2,6 +2,7 @@ package bloomgateway import ( "context" + "slices" "sort" "time" @@ -61,36 +62,55 @@ func (r *defaultBlockResolver) Resolve(ctx context.Context, tenant string, inter } func blocksMatchingSeries(metas []bloomshipper.Meta, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) []blockWithSeries { - result := make([]blockWithSeries, 0, len(metas)) - - for _, meta := range metas { - for _, block := range meta.Blocks { + slices.SortFunc(series, func(a, b *logproto.GroupedChunkRefs) int { return int(a.Fingerprint - b.Fingerprint) }) - // skip blocks that are not within time interval - if !interval.Overlaps(block.Interval()) { - continue + result := make([]blockWithSeries, 0, len(metas)) + cache := make(map[bloomshipper.BlockRef]int) + + // find the newest block for each series + for _, s := range series { + var b *bloomshipper.BlockRef + var newestTs time.Time + + for i := range metas { + for j := range metas[i].Blocks { + block := metas[i].Blocks[j] + // To keep backwards compatibility, we can only look at the source at index 0 + // because in the past the slice had always length 1, see + // https://github.com/grafana/loki/blob/b4060154d198e17bef8ba0fbb1c99bb5c93a412d/pkg/bloombuild/builder/builder.go#L418 + sourceTs := metas[i].Sources[0].TS + // Newer metas have len(Sources) == len(Blocks) + if len(metas[i].Sources) > j { + sourceTs = metas[i].Sources[j].TS + } + // skip blocks that are not within time interval + if !interval.Overlaps(block.Interval()) { + continue + } + // skip blocks that do not contain the series + if block.Cmp(s.Fingerprint) != v1.Overlap { + continue + } + // only use the block if it is newer than the previous + if sourceTs.After(newestTs) { + b = &block + newestTs = sourceTs + } } + } - min := sort.Search(len(series), func(i int) bool { - return block.Cmp(series[i].Fingerprint) > v1.Before - }) - - max := sort.Search(len(series), func(i int) bool { - return block.Cmp(series[i].Fingerprint) == v1.After - }) - - // All fingerprints fall outside of the consumer's range - if min == len(series) || max == 0 || min == max { - continue - } + if b == nil { + continue + } - // At least one fingerprint is within bounds of the blocks - // so append to results - dst := make([]*logproto.GroupedChunkRefs, max-min) - _ = copy(dst, series[min:max]) + idx, ok := cache[*b] + if ok { + result[idx].series = append(result[idx].series, s) + } else { + cache[*b] = len(result) result = append(result, blockWithSeries{ - block: block, - series: dst, + block: *b, + series: []*logproto.GroupedChunkRefs{s}, }) } } diff --git a/pkg/bloomgateway/resolver_test.go b/pkg/bloomgateway/resolver_test.go index e6369cbeff9ea..217f07324da3b 100644 --- a/pkg/bloomgateway/resolver_test.go +++ b/pkg/bloomgateway/resolver_test.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" ) func makeBlockRef(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshipper.BlockRef { @@ -28,6 +29,9 @@ func makeMeta(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshi Blocks: []bloomshipper.BlockRef{ makeBlockRef(minFp, maxFp, from, through), }, + Sources: []tsdb.SingleTenantTSDBIdentifier{ + {TS: through.Time()}, + }, } } @@ -100,14 +104,21 @@ func TestBlockResolver_BlocksMatchingSeries(t *testing.T) { t.Run("multiple overlapping blocks within time range covering full keyspace", func(t *testing.T) { metas := []bloomshipper.Meta{ - makeMeta(0x00, 0xdf, 1000, 1999), - makeMeta(0xc0, 0xff, 1000, 1999), + // 2 series overlap + makeMeta(0x00, 0xdf, 1000, 1499), // "old" meta covers first 4 series + makeMeta(0xc0, 0xff, 1500, 1999), // "new" meta covers last 4 series } res := blocksMatchingSeries(metas, interval, series) + for i := range res { + t.Logf("%s", res[i].block) + for j := range res[i].series { + t.Logf(" %016x", res[i].series[j].Fingerprint) + } + } expected := []blockWithSeries{ { block: metas[0].Blocks[0], - series: series[0:4], + series: series[0:2], // series 0x00c0 and 0x00d0 are covered in the newer block }, { block: metas[1].Blocks[0],