Skip to content

Commit 955d4f8

Browse files
chaudumgrafana-delivery-bot[bot]
authored andcommitted
fix(blooms): Deduplicate filtered series and chunks (#12791)
Signed-off-by: Christian Haudum <[email protected]> (cherry picked from commit 3bf2d1f)
1 parent 27d6d17 commit 955d4f8

File tree

5 files changed

+139
-71
lines changed

5 files changed

+139
-71
lines changed

pkg/bloomgateway/cache.go

+8-47
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@ package bloomgateway
33
import (
44
"context"
55
"flag"
6-
"sort"
76
"time"
87

98
"github.com/go-kit/log"
109
"github.com/prometheus/common/model"
11-
"golang.org/x/exp/slices"
1210
"google.golang.org/grpc"
1311

1412
"github.com/grafana/loki/v3/pkg/logproto"
@@ -95,58 +93,21 @@ func newMerger() merger {
9593
// We merge all chunks grouped by their fingerprint.
9694
func (m merger) MergeResponse(responses ...resultscache.Response) (resultscache.Response, error) {
9795
var size int
98-
for _, r := range responses {
99-
res := r.(*logproto.FilterChunkRefResponse)
100-
size += len(res.ChunkRefs)
101-
}
10296

103-
chunkRefs := make([]*logproto.GroupedChunkRefs, 0, size)
97+
unmerged := make([][]*logproto.GroupedChunkRefs, 0, len(responses))
10498
for _, r := range responses {
10599
res := r.(*logproto.FilterChunkRefResponse)
106-
chunkRefs = append(chunkRefs, res.ChunkRefs...)
107-
}
108-
109-
return &logproto.FilterChunkRefResponse{
110-
ChunkRefs: mergeGroupedChunkRefs(chunkRefs),
111-
}, nil
112-
}
113-
114-
// Merge duplicated fingerprints by:
115-
// 1. Sort the chunkRefs by their stream fingerprint
116-
// 2. Remove duplicated FPs appending all chunks into the first fingerprint's chunk list.
117-
func mergeGroupedChunkRefs(chunkRefs []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
118-
if len(chunkRefs) <= 1 {
119-
return chunkRefs
120-
}
121-
122-
sort.Slice(chunkRefs, func(i, j int) bool {
123-
return chunkRefs[i].Fingerprint < chunkRefs[j].Fingerprint
124-
})
125-
126-
var lastDiffFP int
127-
for i := 1; i < len(chunkRefs); i++ {
128-
if chunkRefs[lastDiffFP].Fingerprint == chunkRefs[i].Fingerprint {
129-
chunkRefs[lastDiffFP].Refs = mergeShortRefs(append(chunkRefs[lastDiffFP].Refs, chunkRefs[i].Refs...))
130-
} else {
131-
lastDiffFP++
132-
chunkRefs[lastDiffFP] = chunkRefs[i]
133-
}
100+
unmerged = append(unmerged, res.ChunkRefs)
101+
size += len(res.ChunkRefs)
134102
}
135-
return chunkRefs[:lastDiffFP+1]
136-
}
137103

138-
// mergeShortRefs merges short-refs by removing duplicated checksums.
139-
func mergeShortRefs(refs []*logproto.ShortRef) []*logproto.ShortRef {
140-
if len(refs) <= 1 {
141-
return refs
104+
buf := make([]*logproto.GroupedChunkRefs, 0, size)
105+
deduped, err := mergeSeries(unmerged, buf)
106+
if err != nil {
107+
return nil, err
142108
}
143109

144-
sort.Slice(refs, func(i, j int) bool {
145-
return refs[i].Checksum < refs[j].Checksum
146-
})
147-
return slices.CompactFunc(refs, func(a, b *logproto.ShortRef) bool {
148-
return a.Checksum == b.Checksum
149-
})
110+
return &logproto.FilterChunkRefResponse{ChunkRefs: deduped}, nil
150111
}
151112

152113
type ClientCache struct {

pkg/bloomgateway/cache_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,11 @@ func TestMerge(t *testing.T) {
288288
Fingerprint: 2,
289289
Tenant: "fake",
290290
Refs: []*logproto.ShortRef{
291+
{
292+
From: 700,
293+
Through: 1000,
294+
Checksum: 40,
295+
},
291296
{
292297
From: 1000,
293298
Through: 1500,
@@ -303,11 +308,6 @@ func TestMerge(t *testing.T) {
303308
Through: 2500,
304309
Checksum: 30,
305310
},
306-
{
307-
From: 700,
308-
Through: 1000,
309-
Checksum: 40,
310-
},
311311
{
312312
From: 2000,
313313
Through: 2700,

pkg/bloomgateway/client.go

+88-18
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package bloomgateway
33
import (
44
"context"
55
"flag"
6-
"fmt"
76
"io"
87
"math"
98
"sort"
@@ -15,13 +14,15 @@ import (
1514
ringclient "github.com/grafana/dskit/ring/client"
1615
"github.com/pkg/errors"
1716
"github.com/prometheus/client_golang/prometheus"
17+
"golang.org/x/exp/slices"
1818
"google.golang.org/grpc"
1919
"google.golang.org/grpc/health/grpc_health_v1"
2020

2121
"github.com/grafana/loki/v3/pkg/logproto"
2222
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
2323
"github.com/grafana/loki/v3/pkg/querier/plan"
2424
"github.com/grafana/loki/v3/pkg/queue"
25+
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
2526
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
2627
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
2728
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
@@ -258,17 +259,6 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, interva
258259
return rs.groups[i].Fingerprint < rs.groups[j].Fingerprint
259260
})
260261

261-
level.Info(c.logger).Log(
262-
"msg", "do FilterChunkRefs for addresses",
263-
"part", fmt.Sprintf("%d/%d", i+1, len(servers)),
264-
"addr", rs.addr,
265-
"from", interval.Start.Time(),
266-
"through", interval.End.Time(),
267-
"series", len(rs.groups),
268-
"blocks", len(rs.blocks),
269-
"tenant", tenant,
270-
)
271-
272262
return c.doForAddrs([]string{rs.addr}, func(client logproto.BloomGatewayClient) error {
273263
req := &logproto.FilterChunkRefRequest{
274264
From: interval.Start,
@@ -290,15 +280,95 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, interva
290280
if err != nil {
291281
return nil, err
292282
}
293-
return flatten(results, count), nil
283+
284+
buf := make([]*logproto.GroupedChunkRefs, 0, count)
285+
return mergeSeries(results, buf)
294286
}
295287

296-
func flatten(input [][]*logproto.GroupedChunkRefs, n int) []*logproto.GroupedChunkRefs {
297-
result := make([]*logproto.GroupedChunkRefs, 0, n)
298-
for _, res := range input {
299-
result = append(result, res...)
288+
// mergeSeries combines respones from multiple FilterChunkRefs calls and deduplicates
289+
// chunks from series that appear in multiple responses.
290+
// To avoid allocations, an optional slice can be passed as second argument.
291+
func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedChunkRefs) ([]*logproto.GroupedChunkRefs, error) {
292+
// clear provided buffer
293+
buf = buf[:0]
294+
295+
iters := make([]v1.PeekingIterator[*logproto.GroupedChunkRefs], 0, len(input))
296+
for _, inp := range input {
297+
iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp)))
300298
}
301-
return result
299+
300+
heapIter := v1.NewHeapIterator[*logproto.GroupedChunkRefs](
301+
func(a, b *logproto.GroupedChunkRefs) bool {
302+
return a.Fingerprint < b.Fingerprint
303+
},
304+
iters...,
305+
)
306+
307+
dedupeIter := v1.NewDedupingIter[*logproto.GroupedChunkRefs, *logproto.GroupedChunkRefs](
308+
// eq
309+
func(a, b *logproto.GroupedChunkRefs) bool { return a.Fingerprint == b.Fingerprint },
310+
// from
311+
v1.Identity[*logproto.GroupedChunkRefs],
312+
// merge
313+
func(a, b *logproto.GroupedChunkRefs) *logproto.GroupedChunkRefs {
314+
return &logproto.GroupedChunkRefs{
315+
Fingerprint: a.Fingerprint,
316+
Tenant: a.Tenant,
317+
Refs: mergeChunks(a.Refs, b.Refs),
318+
}
319+
},
320+
// iterator
321+
v1.NewPeekingIter(heapIter),
322+
)
323+
324+
return v1.CollectInto(dedupeIter, buf)
325+
}
326+
327+
func mergeChunks(inputs ...[]*logproto.ShortRef) []*logproto.ShortRef {
328+
if len(inputs) == 0 {
329+
return nil
330+
}
331+
332+
if len(inputs) == 1 {
333+
slices.SortFunc(
334+
inputs[0],
335+
func(a, b *logproto.ShortRef) int {
336+
if a.Equal(b) {
337+
return 0
338+
}
339+
if a.From.Before(b.From) || (a.From.Equal(b.From) && a.Through.Before(b.Through)) {
340+
return -1
341+
}
342+
return 1
343+
},
344+
)
345+
return inputs[0]
346+
}
347+
348+
iters := make([]v1.PeekingIterator[*logproto.ShortRef], 0, len(inputs))
349+
for _, inp := range inputs {
350+
iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp)))
351+
}
352+
353+
chunkDedupe := v1.NewDedupingIter[*logproto.ShortRef, *logproto.ShortRef](
354+
// eq
355+
func(a, b *logproto.ShortRef) bool { return a.Equal(b) },
356+
// from
357+
v1.Identity[*logproto.ShortRef],
358+
// merge
359+
func(a, b *logproto.ShortRef) *logproto.ShortRef { return a },
360+
// iterator
361+
v1.NewPeekingIter[*logproto.ShortRef](
362+
v1.NewHeapIterator[*logproto.ShortRef](
363+
func(a, b *logproto.ShortRef) bool {
364+
return a.From.Before(b.From) || (a.From.Equal(b.From) && a.Through.Before(b.Through))
365+
},
366+
iters...,
367+
),
368+
),
369+
)
370+
merged, _ := v1.Collect(chunkDedupe)
371+
return merged
302372
}
303373

304374
// doForAddrs sequetially calls the provided callback function fn for each

pkg/bloomgateway/client_test.go

+37
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ import (
77
"github.com/go-kit/log"
88
"github.com/grafana/dskit/flagext"
99
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/common/model"
1011
"github.com/stretchr/testify/require"
1112

13+
"github.com/grafana/loki/v3/pkg/logproto"
1214
"github.com/grafana/loki/v3/pkg/logql/syntax"
1315
"github.com/grafana/loki/v3/pkg/querier/plan"
1416
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
@@ -33,3 +35,38 @@ func TestBloomGatewayClient(t *testing.T) {
3335
require.Equal(t, 0, len(res))
3436
})
3537
}
38+
39+
func shortRef(f, t model.Time, c uint32) *logproto.ShortRef {
40+
return &logproto.ShortRef{
41+
From: f,
42+
Through: t,
43+
Checksum: c,
44+
}
45+
}
46+
47+
func TestGatewayClient_MergeSeries(t *testing.T) {
48+
inputs := [][]*logproto.GroupedChunkRefs{
49+
// response 1
50+
{
51+
{Fingerprint: 0x00, Refs: []*logproto.ShortRef{shortRef(0, 1, 1), shortRef(1, 2, 2)}}, // not overlapping
52+
{Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks
53+
{Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(0, 1, 5), shortRef(1, 2, 6)}}, // partially overlapping chunks
54+
},
55+
// response 2
56+
{
57+
{Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks
58+
{Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(1, 2, 6), shortRef(2, 3, 7)}}, // partially overlapping chunks
59+
{Fingerprint: 0x03, Refs: []*logproto.ShortRef{shortRef(0, 1, 8), shortRef(1, 2, 9)}}, // not overlapping
60+
},
61+
}
62+
63+
expected := []*logproto.GroupedChunkRefs{
64+
{Fingerprint: 0x00, Refs: []*logproto.ShortRef{shortRef(0, 1, 1), shortRef(1, 2, 2)}}, // not overlapping
65+
{Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks
66+
{Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(0, 1, 5), shortRef(1, 2, 6), shortRef(2, 3, 7)}}, // partially overlapping chunks
67+
{Fingerprint: 0x03, Refs: []*logproto.ShortRef{shortRef(0, 1, 8), shortRef(1, 2, 9)}}, // not overlapping
68+
}
69+
70+
result, _ := mergeSeries(inputs, nil)
71+
require.Equal(t, expected, result)
72+
}

pkg/bloomgateway/resolver.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
)
1616

1717
type BlockResolver interface {
18-
Resolve(context.Context, string, bloomshipper.Interval, []*logproto.GroupedChunkRefs) ([]blockWithSeries, []*logproto.GroupedChunkRefs, error)
18+
Resolve(ctx context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) (blocks []blockWithSeries, skipped []*logproto.GroupedChunkRefs, err error)
1919
}
2020

2121
type blockWithSeries struct {

0 commit comments

Comments
 (0)