Skip to content

Commit 6efff01

Browse files
committed
Filter out label filters that are part of the streams
1 parent 21e79c4 commit 6efff01

9 files changed

+246
-67
lines changed

pkg/bloomgateway/bloomgateway.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/opentracing/opentracing-go"
2020
"github.com/pkg/errors"
2121
"github.com/prometheus/client_golang/prometheus"
22+
"github.com/prometheus/prometheus/model/labels"
2223
"go.uber.org/atomic"
2324

2425
iter "github.com/grafana/loki/v3/pkg/iter/v2"
@@ -160,6 +161,15 @@ func (g *Gateway) stopping(_ error) error {
160161
return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr)
161162
}
162163

164+
func extractSeries(refs []*logproto.GroupedChunkRefs) []labels.Labels {
165+
series := make([]labels.Labels, 0, len(refs))
166+
for _, ref := range refs {
167+
lbs := logproto.FromLabelAdaptersToLabels(ref.Labels.Labels)
168+
series = append(series, lbs)
169+
}
170+
return series
171+
}
172+
163173
// FilterChunkRefs implements BloomGatewayServer
164174
func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error) {
165175
tenantID, err := tenant.TenantID(ctx)
@@ -193,7 +203,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
193203
return nil, errors.New("from time must not be after through time")
194204
}
195205

196-
matchers := v1.ExtractTestableLabelMatchers(req.Plan.AST)
206+
// This time, we do pass the series to the ExtractTestableLabelMatchers
207+
// To this point, we have called ExtractTestableLabelMatchers multiple times
208+
// without the series (faster) just to return early if there are no filters expressions.
209+
// We now need to be more precise and only extract matchers that do not
210+
// match the series labels.
211+
matchers := v1.ExtractTestableLabelMatchers(req.Plan.AST, extractSeries(req.Refs)...)
197212
stats.NumMatchers = len(matchers)
198213
g.metrics.receivedMatchers.Observe(float64(len(matchers)))
199214

pkg/bloomgateway/bloomgateway_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func stringSlice[T fmt.Stringer](s []T) []string {
4141

4242
func groupRefs(t *testing.T, chunkRefs []*logproto.ChunkRef) []*logproto.GroupedChunkRefs {
4343
t.Helper()
44-
return groupChunkRefs(chunkRefs, nil)
44+
return groupChunkRefs(nil, chunkRefs, nil)
4545
}
4646

4747
func newLimits() *validation.Overrides {

pkg/bloomgateway/querier.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/prometheus/client_golang/prometheus"
1111
"github.com/prometheus/client_golang/prometheus/promauto"
1212
"github.com/prometheus/common/model"
13+
"github.com/prometheus/prometheus/model/labels"
1314

1415
"github.com/grafana/loki/v3/pkg/logproto"
1516
"github.com/grafana/loki/v3/pkg/querier/plan"
@@ -101,7 +102,7 @@ func convertToShortRef(ref *logproto.ChunkRef) *logproto.ShortRef {
101102
return &logproto.ShortRef{From: ref.From, Through: ref.Through, Checksum: ref.Checksum}
102103
}
103104

104-
func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) {
105+
func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series []labels.Labels, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) {
105106
// Shortcut that does not require any filtering
106107
if !bq.limits.BloomGatewayEnabled(tenant) || len(chunkRefs) == 0 || len(v1.ExtractTestableLabelMatchers(queryPlan.AST)) == 0 {
107108
return chunkRefs, nil
@@ -112,7 +113,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
112113

113114
grouped := groupedChunksRefPool.Get(len(chunkRefs))
114115
defer groupedChunksRefPool.Put(grouped)
115-
grouped = groupChunkRefs(chunkRefs, grouped)
116+
grouped = groupChunkRefs(series, chunkRefs, grouped)
116117

117118
preFilterChunks := len(chunkRefs)
118119
preFilterSeries := len(grouped)
@@ -225,7 +226,14 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
225226
// groups them by fingerprint.
226227
// The second argument `grouped` can be used to pass a buffer to avoid allocations.
227228
// If it's nil, the returned slice will be allocated.
228-
func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
229+
func groupChunkRefs(series []labels.Labels, chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
230+
seriesFPs := make(map[uint64]*logproto.IndexSeries, len(series))
231+
for _, s := range series {
232+
seriesFPs[s.Hash()] = &logproto.IndexSeries{
233+
Labels: logproto.FromLabelsToLabelAdapters(s),
234+
}
235+
}
236+
229237
seen := make(map[uint64]int, len(grouped))
230238
for _, chunkRef := range chunkRefs {
231239
if idx, found := seen[chunkRef.Fingerprint]; found {
@@ -234,10 +242,12 @@ func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedC
234242
seen[chunkRef.Fingerprint] = len(grouped)
235243
grouped = append(grouped, &logproto.GroupedChunkRefs{
236244
Fingerprint: chunkRef.Fingerprint,
245+
Labels: seriesFPs[chunkRef.Fingerprint],
237246
Tenant: chunkRef.UserID,
238247
Refs: []*logproto.ShortRef{convertToShortRef(chunkRef)},
239248
})
240249
}
241250
}
251+
242252
return grouped
243253
}

pkg/bloomgateway/querier_test.go

+34-16
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package bloomgateway
22

33
import (
44
"context"
5+
"fmt"
56
"math/rand"
67
"sort"
78
"testing"
@@ -10,6 +11,7 @@ import (
1011
"github.com/go-kit/log"
1112
"github.com/pkg/errors"
1213
"github.com/prometheus/common/model"
14+
"github.com/prometheus/prometheus/model/labels"
1315
"github.com/stretchr/testify/require"
1416

1517
"github.com/grafana/loki/v3/pkg/logproto"
@@ -79,7 +81,7 @@ func TestBloomQuerier(t *testing.T) {
7981
}
8082
expr, err := syntax.ParseExpr(`{foo="bar"}`)
8183
require.NoError(t, err)
82-
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
84+
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
8385
require.NoError(t, err)
8486
require.Equal(t, chunkRefs, res)
8587
require.Equal(t, 0, c.callCount)
@@ -95,7 +97,7 @@ func TestBloomQuerier(t *testing.T) {
9597
chunkRefs := []*logproto.ChunkRef{}
9698
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
9799
require.NoError(t, err)
98-
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
100+
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
99101
require.NoError(t, err)
100102
require.Equal(t, chunkRefs, res)
101103
require.Equal(t, 0, c.callCount)
@@ -115,7 +117,7 @@ func TestBloomQuerier(t *testing.T) {
115117
}
116118
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
117119
require.NoError(t, err)
118-
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
120+
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
119121
require.Error(t, err)
120122
require.Nil(t, res)
121123
})
@@ -134,7 +136,7 @@ func TestBloomQuerier(t *testing.T) {
134136
}
135137
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
136138
require.NoError(t, err)
137-
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
139+
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
138140
require.NoError(t, err)
139141
require.Equal(t, chunkRefs, res)
140142
require.Equal(t, 2, c.callCount)
@@ -143,28 +145,40 @@ func TestBloomQuerier(t *testing.T) {
143145
}
144146

145147
func TestGroupChunkRefs(t *testing.T) {
148+
series := []labels.Labels{
149+
labels.FromStrings("app", "1"),
150+
labels.FromStrings("app", "2"),
151+
labels.FromStrings("app", "3"),
152+
}
153+
146154
chunkRefs := []*logproto.ChunkRef{
147-
{Fingerprint: 0x00, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
148-
{Fingerprint: 0x00, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
149-
{Fingerprint: 0x01, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
150-
{Fingerprint: 0x01, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
151-
{Fingerprint: 0x02, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
152-
{Fingerprint: 0x02, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
155+
{Fingerprint: series[0].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
156+
{Fingerprint: series[0].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
157+
{Fingerprint: series[1].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
158+
{Fingerprint: series[1].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
159+
{Fingerprint: series[2].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
160+
{Fingerprint: series[2].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
153161
}
154162

155-
result := groupChunkRefs(chunkRefs, nil)
163+
result := groupChunkRefs(series, chunkRefs, nil)
156164
require.Equal(t, []*logproto.GroupedChunkRefs{
157-
{Fingerprint: 0x00, Tenant: "tenant", Refs: []*logproto.ShortRef{
165+
{Fingerprint: series[0].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
158166
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
159167
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
168+
}, Labels: &logproto.IndexSeries{
169+
Labels: logproto.FromLabelsToLabelAdapters(series[0]),
160170
}},
161-
{Fingerprint: 0x01, Tenant: "tenant", Refs: []*logproto.ShortRef{
171+
{Fingerprint: series[1].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
162172
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
163173
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
174+
}, Labels: &logproto.IndexSeries{
175+
Labels: logproto.FromLabelsToLabelAdapters(series[1]),
164176
}},
165-
{Fingerprint: 0x02, Tenant: "tenant", Refs: []*logproto.ShortRef{
177+
{Fingerprint: series[2].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
166178
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
167179
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
180+
}, Labels: &logproto.IndexSeries{
181+
Labels: logproto.FromLabelsToLabelAdapters(series[2]),
168182
}},
169183
}, result)
170184
}
@@ -175,11 +189,15 @@ func BenchmarkGroupChunkRefs(b *testing.B) {
175189
n := 1000 // num series
176190
m := 10000 // num chunks per series
177191
chunkRefs := make([]*logproto.ChunkRef, 0, n*m)
192+
series := make([]labels.Labels, 0, n)
178193

179194
for i := 0; i < n; i++ {
195+
s := labels.FromStrings("app", fmt.Sprintf("%d", i))
196+
sFP := s.Hash()
197+
series = append(series, s)
180198
for j := 0; j < m; j++ {
181199
chunkRefs = append(chunkRefs, &logproto.ChunkRef{
182-
Fingerprint: uint64(n),
200+
Fingerprint: sFP,
183201
UserID: "tenant",
184202
From: mktime("2024-04-20 00:00"),
185203
Through: mktime("2024-04-20 00:59"),
@@ -196,5 +214,5 @@ func BenchmarkGroupChunkRefs(b *testing.B) {
196214
b.StartTimer()
197215

198216
groups := make([]*logproto.GroupedChunkRefs, 0, n)
199-
groupChunkRefs(chunkRefs, groups)
217+
groupChunkRefs(series, chunkRefs, groups)
200218
}

pkg/indexgateway/gateway.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type IndexClientWithRange struct {
5858
}
5959

6060
type BloomQuerier interface {
61-
FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error)
61+
FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series []labels.Labels, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error)
6262
}
6363

6464
type Gateway struct {
@@ -257,7 +257,12 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
257257
return result, nil
258258
}
259259

260-
chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, result.Refs, req.Plan)
260+
series, err := g.indexQuerier.GetSeries(ctx, instanceID, req.From, req.Through, matchers...)
261+
if err != nil {
262+
return nil, err
263+
}
264+
265+
chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, series, result.Refs, req.Plan)
261266
if err != nil {
262267
return nil, err
263268
}

0 commit comments

Comments
 (0)