Skip to content

Commit 1de99ee

Browse files
salvacortschaudum
authored andcommitted
fix(blooms): Exclude label filters where label name is part of the series labels. (#14661)
1 parent e2d6b70 commit 1de99ee

16 files changed

+277
-111
lines changed

pkg/bloomgateway/bloomgateway_test.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/pkg/errors"
1616
"github.com/prometheus/client_golang/prometheus"
1717
"github.com/prometheus/common/model"
18+
"github.com/prometheus/prometheus/model/labels"
1819
"github.com/stretchr/testify/require"
1920

2021
"github.com/grafana/loki/v3/pkg/logproto"
@@ -41,7 +42,15 @@ func stringSlice[T fmt.Stringer](s []T) []string {
4142

4243
func groupRefs(t *testing.T, chunkRefs []*logproto.ChunkRef) []*logproto.GroupedChunkRefs {
4344
t.Helper()
44-
return groupChunkRefs(chunkRefs, nil)
45+
grouped := groupChunkRefs(nil, chunkRefs, nil)
46+
// Put fake labels to the series
47+
for _, g := range grouped {
48+
g.Labels = &logproto.IndexSeries{
49+
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", fmt.Sprintf("%d", g.Fingerprint))),
50+
}
51+
}
52+
53+
return grouped
4554
}
4655

4756
func newLimits() *validation.Overrides {
@@ -295,12 +304,18 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
295304
{Fingerprint: 1000, Tenant: tenantID, Refs: []*logproto.ShortRef{
296305
{From: 1696248000000, Through: 1696251600000, Checksum: 2},
297306
{From: 1696244400000, Through: 1696248000000, Checksum: 4},
307+
}, Labels: &logproto.IndexSeries{
308+
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "1000")),
298309
}},
299310
{Fingerprint: 2000, Tenant: tenantID, Refs: []*logproto.ShortRef{
300311
{From: 1696255200000, Through: 1696258800000, Checksum: 3},
312+
}, Labels: &logproto.IndexSeries{
313+
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "2000")),
301314
}},
302315
{Fingerprint: 3000, Tenant: tenantID, Refs: []*logproto.ShortRef{
303316
{From: 1696240800000, Through: 1696244400000, Checksum: 1},
317+
}, Labels: &logproto.IndexSeries{
318+
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "3000")),
304319
}},
305320
},
306321
}, res)
@@ -405,6 +420,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
405420
// see MkBasicSeriesWithBlooms() in pkg/storage/bloom/v1/test_util.go
406421
rnd := rand.Intn(len(inputChunkRefs))
407422
fp := inputChunkRefs[rnd].Fingerprint
423+
lbs := inputChunkRefs[rnd].Labels
408424
chks := inputChunkRefs[rnd].Refs
409425
key := fmt.Sprintf("%s:%04x", model.Fingerprint(fp), 0)
410426

@@ -428,6 +444,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
428444
ChunkRefs: []*logproto.GroupedChunkRefs{
429445
{
430446
Fingerprint: fp,
447+
Labels: lbs,
431448
Refs: chks,
432449
Tenant: tenantID,
433450
},

pkg/bloomgateway/cache.go

+1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ func (e extractor) Extract(start, end int64, r resultscache.Response, _, _ int64
7272
if len(refs) > 0 {
7373
chunkRefs = append(chunkRefs, &logproto.GroupedChunkRefs{
7474
Fingerprint: chunkRef.Fingerprint,
75+
Labels: chunkRef.Labels,
7576
Tenant: chunkRef.Tenant,
7677
Refs: refs,
7778
})

pkg/bloomgateway/client.go

+1
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh
322322
}
323323
return &logproto.GroupedChunkRefs{
324324
Fingerprint: a.Fingerprint,
325+
Labels: a.Labels,
325326
Tenant: a.Tenant,
326327
Refs: mergeChunkSets(a.Refs, b.Refs),
327328
}

pkg/bloomgateway/multiplexing.go

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ func (it *requestIterator) Next() bool {
174174
it.curr = v1.Request{
175175
Recorder: it.recorder,
176176
Fp: model.Fingerprint(group.Fingerprint),
177+
Labels: logproto.FromLabelAdaptersToLabels(group.Labels.Labels),
177178
Chks: convertToChunkRefs(group.Refs),
178179
Search: it.search,
179180
Response: it.channel,

pkg/bloomgateway/multiplexing_test.go

+9
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/prometheus/common/model"
10+
"github.com/prometheus/prometheus/model/labels"
1011
"github.com/stretchr/testify/require"
1112

1213
v2 "github.com/grafana/loki/v3/pkg/iter/v2"
@@ -73,6 +74,8 @@ func TestTask_RequestIterator(t *testing.T) {
7374
Refs: []*logproto.GroupedChunkRefs{
7475
{Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{
7576
{From: ts.Add(-3 * time.Hour), Through: ts.Add(-2 * time.Hour), Checksum: 100},
77+
}, Labels: &logproto.IndexSeries{
78+
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "100")),
7679
}},
7780
},
7881
}
@@ -83,9 +86,13 @@ func TestTask_RequestIterator(t *testing.T) {
8386
Refs: []*logproto.GroupedChunkRefs{
8487
{Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{
8588
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 200},
89+
}, Labels: &logproto.IndexSeries{
90+
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "100")),
8691
}},
8792
{Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{
8893
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 300},
94+
}, Labels: &logproto.IndexSeries{
95+
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "200")),
8996
}},
9097
},
9198
}
@@ -96,6 +103,8 @@ func TestTask_RequestIterator(t *testing.T) {
96103
Refs: []*logproto.GroupedChunkRefs{
97104
{Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{
98105
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 400},
106+
}, Labels: &logproto.IndexSeries{
107+
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "200")),
99108
}},
100109
},
101110
}

pkg/bloomgateway/querier.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/prometheus/client_golang/prometheus"
1111
"github.com/prometheus/client_golang/prometheus/promauto"
1212
"github.com/prometheus/common/model"
13+
"github.com/prometheus/prometheus/model/labels"
1314

1415
"github.com/grafana/loki/v3/pkg/logproto"
1516
"github.com/grafana/loki/v3/pkg/querier/plan"
@@ -101,7 +102,7 @@ func convertToShortRef(ref *logproto.ChunkRef) *logproto.ShortRef {
101102
return &logproto.ShortRef{From: ref.From, Through: ref.Through, Checksum: ref.Checksum}
102103
}
103104

104-
func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) {
105+
func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series map[uint64]labels.Labels, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) {
105106
// Shortcut that does not require any filtering
106107
if !bq.limits.BloomGatewayEnabled(tenant) || len(chunkRefs) == 0 || len(v1.ExtractTestableLabelMatchers(queryPlan.AST)) == 0 {
107108
return chunkRefs, nil
@@ -112,7 +113,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
112113

113114
grouped := groupedChunksRefPool.Get(len(chunkRefs))
114115
defer groupedChunksRefPool.Put(grouped)
115-
grouped = groupChunkRefs(chunkRefs, grouped)
116+
grouped = groupChunkRefs(series, chunkRefs, grouped)
116117

117118
preFilterChunks := len(chunkRefs)
118119
preFilterSeries := len(grouped)
@@ -225,7 +226,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
225226
// groups them by fingerprint.
226227
// The second argument `grouped` can be used to pass a buffer to avoid allocations.
227228
// If it's nil, the returned slice will be allocated.
228-
func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
229+
func groupChunkRefs(series map[uint64]labels.Labels, chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
229230
seen := make(map[uint64]int, len(grouped))
230231
for _, chunkRef := range chunkRefs {
231232
if idx, found := seen[chunkRef.Fingerprint]; found {
@@ -234,10 +235,14 @@ func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedC
234235
seen[chunkRef.Fingerprint] = len(grouped)
235236
grouped = append(grouped, &logproto.GroupedChunkRefs{
236237
Fingerprint: chunkRef.Fingerprint,
237-
Tenant: chunkRef.UserID,
238-
Refs: []*logproto.ShortRef{convertToShortRef(chunkRef)},
238+
Labels: &logproto.IndexSeries{
239+
Labels: logproto.FromLabelsToLabelAdapters(series[chunkRef.Fingerprint]),
240+
},
241+
Tenant: chunkRef.UserID,
242+
Refs: []*logproto.ShortRef{convertToShortRef(chunkRef)},
239243
})
240244
}
241245
}
246+
242247
return grouped
243248
}

pkg/bloomgateway/querier_test.go

+38-16
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package bloomgateway
22

33
import (
44
"context"
5+
"fmt"
56
"math/rand"
67
"sort"
78
"testing"
@@ -10,6 +11,7 @@ import (
1011
"github.com/go-kit/log"
1112
"github.com/pkg/errors"
1213
"github.com/prometheus/common/model"
14+
"github.com/prometheus/prometheus/model/labels"
1315
"github.com/stretchr/testify/require"
1416

1517
"github.com/grafana/loki/v3/pkg/logproto"
@@ -79,7 +81,7 @@ func TestBloomQuerier(t *testing.T) {
7981
}
8082
expr, err := syntax.ParseExpr(`{foo="bar"}`)
8183
require.NoError(t, err)
82-
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
84+
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
8385
require.NoError(t, err)
8486
require.Equal(t, chunkRefs, res)
8587
require.Equal(t, 0, c.callCount)
@@ -95,7 +97,7 @@ func TestBloomQuerier(t *testing.T) {
9597
chunkRefs := []*logproto.ChunkRef{}
9698
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
9799
require.NoError(t, err)
98-
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
100+
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
99101
require.NoError(t, err)
100102
require.Equal(t, chunkRefs, res)
101103
require.Equal(t, 0, c.callCount)
@@ -115,7 +117,7 @@ func TestBloomQuerier(t *testing.T) {
115117
}
116118
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
117119
require.NoError(t, err)
118-
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
120+
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
119121
require.Error(t, err)
120122
require.Nil(t, res)
121123
})
@@ -134,7 +136,7 @@ func TestBloomQuerier(t *testing.T) {
134136
}
135137
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
136138
require.NoError(t, err)
137-
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
139+
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
138140
require.NoError(t, err)
139141
require.Equal(t, chunkRefs, res)
140142
require.Equal(t, 2, c.callCount)
@@ -143,28 +145,44 @@ func TestBloomQuerier(t *testing.T) {
143145
}
144146

145147
func TestGroupChunkRefs(t *testing.T) {
148+
series := []labels.Labels{
149+
labels.FromStrings("app", "1"),
150+
labels.FromStrings("app", "2"),
151+
labels.FromStrings("app", "3"),
152+
}
153+
seriesMap := make(map[uint64]labels.Labels)
154+
for _, s := range series {
155+
seriesMap[s.Hash()] = s
156+
}
157+
146158
chunkRefs := []*logproto.ChunkRef{
147-
{Fingerprint: 0x00, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
148-
{Fingerprint: 0x00, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
149-
{Fingerprint: 0x01, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
150-
{Fingerprint: 0x01, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
151-
{Fingerprint: 0x02, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
152-
{Fingerprint: 0x02, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
159+
{Fingerprint: series[0].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
160+
{Fingerprint: series[0].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
161+
{Fingerprint: series[1].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
162+
{Fingerprint: series[1].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
163+
{Fingerprint: series[2].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
164+
{Fingerprint: series[2].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
153165
}
154166

155-
result := groupChunkRefs(chunkRefs, nil)
167+
result := groupChunkRefs(seriesMap, chunkRefs, nil)
156168
require.Equal(t, []*logproto.GroupedChunkRefs{
157-
{Fingerprint: 0x00, Tenant: "tenant", Refs: []*logproto.ShortRef{
169+
{Fingerprint: series[0].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
158170
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
159171
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
172+
}, Labels: &logproto.IndexSeries{
173+
Labels: logproto.FromLabelsToLabelAdapters(series[0]),
160174
}},
161-
{Fingerprint: 0x01, Tenant: "tenant", Refs: []*logproto.ShortRef{
175+
{Fingerprint: series[1].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
162176
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
163177
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
178+
}, Labels: &logproto.IndexSeries{
179+
Labels: logproto.FromLabelsToLabelAdapters(series[1]),
164180
}},
165-
{Fingerprint: 0x02, Tenant: "tenant", Refs: []*logproto.ShortRef{
181+
{Fingerprint: series[2].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
166182
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
167183
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
184+
}, Labels: &logproto.IndexSeries{
185+
Labels: logproto.FromLabelsToLabelAdapters(series[2]),
168186
}},
169187
}, result)
170188
}
@@ -175,11 +193,15 @@ func BenchmarkGroupChunkRefs(b *testing.B) {
175193
n := 1000 // num series
176194
m := 10000 // num chunks per series
177195
chunkRefs := make([]*logproto.ChunkRef, 0, n*m)
196+
series := make(map[uint64]labels.Labels, n)
178197

179198
for i := 0; i < n; i++ {
199+
s := labels.FromStrings("app", fmt.Sprintf("%d", i))
200+
sFP := s.Hash()
201+
series[sFP] = s
180202
for j := 0; j < m; j++ {
181203
chunkRefs = append(chunkRefs, &logproto.ChunkRef{
182-
Fingerprint: uint64(n),
204+
Fingerprint: sFP,
183205
UserID: "tenant",
184206
From: mktime("2024-04-20 00:00"),
185207
Through: mktime("2024-04-20 00:59"),
@@ -196,5 +218,5 @@ func BenchmarkGroupChunkRefs(b *testing.B) {
196218
b.StartTimer()
197219

198220
groups := make([]*logproto.GroupedChunkRefs, 0, n)
199-
groupChunkRefs(chunkRefs, groups)
221+
groupChunkRefs(series, chunkRefs, groups)
200222
}

pkg/bloomgateway/util.go

+1
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ func partitionSeriesByDay(from, through model.Time, seriesWithChunks []*logproto
110110

111111
res = append(res, &logproto.GroupedChunkRefs{
112112
Fingerprint: series.Fingerprint,
113+
Labels: series.Labels,
113114
Tenant: series.Tenant,
114115
Refs: relevantChunks,
115116
})

pkg/indexgateway/gateway.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type IndexClientWithRange struct {
5858
}
5959

6060
type BloomQuerier interface {
61-
FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error)
61+
FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series map[uint64]labels.Labels, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error)
6262
}
6363

6464
type Gateway struct {
@@ -225,12 +225,16 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
225225
return nil, err
226226
}
227227

228+
series := make(map[uint64]labels.Labels)
228229
result = &logproto.GetChunkRefResponse{
229230
Refs: make([]*logproto.ChunkRef, 0, len(chunks)),
230231
}
231232
for _, cs := range chunks {
232233
for i := range cs {
233234
result.Refs = append(result.Refs, &cs[i].ChunkRef)
235+
if _, ok := series[cs[i].Fingerprint]; !ok {
236+
series[cs[i].Fingerprint] = cs[i].Metric
237+
}
234238
}
235239
}
236240

@@ -257,7 +261,7 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
257261
return result, nil
258262
}
259263

260-
chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, result.Refs, req.Plan)
264+
chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, series, result.Refs, req.Plan)
261265
if err != nil {
262266
return nil, err
263267
}

0 commit comments

Comments
 (0)