diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 698b1ecf40e47..63657ce5b5e4d 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -15,6 +15,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logproto" @@ -41,7 +42,15 @@ func stringSlice[T fmt.Stringer](s []T) []string { func groupRefs(t *testing.T, chunkRefs []*logproto.ChunkRef) []*logproto.GroupedChunkRefs { t.Helper() - return groupChunkRefs(chunkRefs, nil) + grouped := groupChunkRefs(nil, chunkRefs, nil) + // Put fake labels to the series + for _, g := range grouped { + g.Labels = &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", fmt.Sprintf("%d", g.Fingerprint))), + } + } + + return grouped } func newLimits() *validation.Overrides { @@ -295,12 +304,18 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { {Fingerprint: 1000, Tenant: tenantID, Refs: []*logproto.ShortRef{ {From: 1696248000000, Through: 1696251600000, Checksum: 2}, {From: 1696244400000, Through: 1696248000000, Checksum: 4}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "1000")), }}, {Fingerprint: 2000, Tenant: tenantID, Refs: []*logproto.ShortRef{ {From: 1696255200000, Through: 1696258800000, Checksum: 3}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "2000")), }}, {Fingerprint: 3000, Tenant: tenantID, Refs: []*logproto.ShortRef{ {From: 1696240800000, Through: 1696244400000, Checksum: 1}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "3000")), }}, }, }, res) @@ -405,6 +420,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { // see MkBasicSeriesWithBlooms() in pkg/storage/bloom/v1/test_util.go rnd := rand.Intn(len(inputChunkRefs)) fp := inputChunkRefs[rnd].Fingerprint + lbs := inputChunkRefs[rnd].Labels chks := inputChunkRefs[rnd].Refs key := fmt.Sprintf("%s:%04x", model.Fingerprint(fp), 0) @@ -428,6 +444,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { ChunkRefs: []*logproto.GroupedChunkRefs{ { Fingerprint: fp, + Labels: lbs, Refs: chks, Tenant: tenantID, }, diff --git a/pkg/bloomgateway/cache.go b/pkg/bloomgateway/cache.go index cec615b393fd7..77d5168ca303a 100644 --- a/pkg/bloomgateway/cache.go +++ b/pkg/bloomgateway/cache.go @@ -72,6 +72,7 @@ func (e extractor) Extract(start, end int64, r resultscache.Response, _, _ int64 if len(refs) > 0 { chunkRefs = append(chunkRefs, &logproto.GroupedChunkRefs{ Fingerprint: chunkRef.Fingerprint, + Labels: chunkRef.Labels, Tenant: chunkRef.Tenant, Refs: refs, }) diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index 9163e91e8756e..23f7026d00706 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -322,6 +322,7 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh } return &logproto.GroupedChunkRefs{ Fingerprint: a.Fingerprint, + Labels: a.Labels, Tenant: a.Tenant, Refs: mergeChunkSets(a.Refs, b.Refs), } diff --git a/pkg/bloomgateway/multiplexing.go b/pkg/bloomgateway/multiplexing.go index 7f929a1e19c6a..2394a72c36874 100644 --- a/pkg/bloomgateway/multiplexing.go +++ b/pkg/bloomgateway/multiplexing.go @@ -174,6 +174,7 @@ func (it *requestIterator) Next() bool { it.curr = v1.Request{ Recorder: it.recorder, Fp: model.Fingerprint(group.Fingerprint), + Labels: logproto.FromLabelAdaptersToLabels(group.Labels.Labels), Chks: convertToChunkRefs(group.Refs), Search: it.search, Response: it.channel, diff --git a/pkg/bloomgateway/multiplexing_test.go b/pkg/bloomgateway/multiplexing_test.go index d290817ef4be7..bfaa168fa4f5d 100644 --- a/pkg/bloomgateway/multiplexing_test.go +++ b/pkg/bloomgateway/multiplexing_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" v2 "github.com/grafana/loki/v3/pkg/iter/v2" @@ -73,6 +74,8 @@ func TestTask_RequestIterator(t *testing.T) { Refs: []*logproto.GroupedChunkRefs{ {Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{ {From: ts.Add(-3 * time.Hour), Through: ts.Add(-2 * time.Hour), Checksum: 100}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "100")), }}, }, } @@ -83,9 +86,13 @@ func TestTask_RequestIterator(t *testing.T) { Refs: []*logproto.GroupedChunkRefs{ {Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{ {From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 200}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "100")), }}, {Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{ {From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 300}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "200")), }}, }, } @@ -96,6 +103,8 @@ func TestTask_RequestIterator(t *testing.T) { Refs: []*logproto.GroupedChunkRefs{ {Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{ {From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 400}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "200")), }}, }, } diff --git a/pkg/bloomgateway/querier.go b/pkg/bloomgateway/querier.go index 211372b84bcc5..3cf6ce0eaf925 100644 --- a/pkg/bloomgateway/querier.go +++ b/pkg/bloomgateway/querier.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/querier/plan" @@ -101,7 +102,7 @@ func convertToShortRef(ref *logproto.ChunkRef) *logproto.ShortRef { return &logproto.ShortRef{From: ref.From, Through: ref.Through, Checksum: ref.Checksum} } -func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) { +func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series map[uint64]labels.Labels, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) { // Shortcut that does not require any filtering if !bq.limits.BloomGatewayEnabled(tenant) || len(chunkRefs) == 0 || len(v1.ExtractTestableLabelMatchers(queryPlan.AST)) == 0 { return chunkRefs, nil @@ -112,7 +113,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from grouped := groupedChunksRefPool.Get(len(chunkRefs)) defer groupedChunksRefPool.Put(grouped) - grouped = groupChunkRefs(chunkRefs, grouped) + grouped = groupChunkRefs(series, chunkRefs, grouped) preFilterChunks := len(chunkRefs) preFilterSeries := len(grouped) @@ -225,7 +226,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from // groups them by fingerprint. // The second argument `grouped` can be used to pass a buffer to avoid allocations. // If it's nil, the returned slice will be allocated. -func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs { +func groupChunkRefs(series map[uint64]labels.Labels, chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs { seen := make(map[uint64]int, len(grouped)) for _, chunkRef := range chunkRefs { if idx, found := seen[chunkRef.Fingerprint]; found { @@ -234,10 +235,14 @@ func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedC seen[chunkRef.Fingerprint] = len(grouped) grouped = append(grouped, &logproto.GroupedChunkRefs{ Fingerprint: chunkRef.Fingerprint, - Tenant: chunkRef.UserID, - Refs: []*logproto.ShortRef{convertToShortRef(chunkRef)}, + Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(series[chunkRef.Fingerprint]), + }, + Tenant: chunkRef.UserID, + Refs: []*logproto.ShortRef{convertToShortRef(chunkRef)}, }) } } + return grouped } diff --git a/pkg/bloomgateway/querier_test.go b/pkg/bloomgateway/querier_test.go index ca4036d266edb..46875ceeaf126 100644 --- a/pkg/bloomgateway/querier_test.go +++ b/pkg/bloomgateway/querier_test.go @@ -2,6 +2,7 @@ package bloomgateway import ( "context" + "fmt" "math/rand" "sort" "testing" @@ -10,6 +11,7 @@ import ( "github.com/go-kit/log" "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logproto" @@ -79,7 +81,7 @@ func TestBloomQuerier(t *testing.T) { } expr, err := syntax.ParseExpr(`{foo="bar"}`) require.NoError(t, err) - res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr}) + res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr}) require.NoError(t, err) require.Equal(t, chunkRefs, res) require.Equal(t, 0, c.callCount) @@ -95,7 +97,7 @@ func TestBloomQuerier(t *testing.T) { chunkRefs := []*logproto.ChunkRef{} expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`) require.NoError(t, err) - res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr}) + res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr}) require.NoError(t, err) require.Equal(t, chunkRefs, res) require.Equal(t, 0, c.callCount) @@ -115,7 +117,7 @@ func TestBloomQuerier(t *testing.T) { } expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`) require.NoError(t, err) - res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr}) + res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr}) require.Error(t, err) require.Nil(t, res) }) @@ -134,7 +136,7 @@ func TestBloomQuerier(t *testing.T) { } expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`) require.NoError(t, err) - res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr}) + res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr}) require.NoError(t, err) require.Equal(t, chunkRefs, res) require.Equal(t, 2, c.callCount) @@ -143,28 +145,44 @@ func TestBloomQuerier(t *testing.T) { } func TestGroupChunkRefs(t *testing.T) { + series := []labels.Labels{ + labels.FromStrings("app", "1"), + labels.FromStrings("app", "2"), + labels.FromStrings("app", "3"), + } + seriesMap := make(map[uint64]labels.Labels) + for _, s := range series { + seriesMap[s.Hash()] = s + } + chunkRefs := []*logproto.ChunkRef{ - {Fingerprint: 0x00, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, - {Fingerprint: 0x00, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, - {Fingerprint: 0x01, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, - {Fingerprint: 0x01, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, - {Fingerprint: 0x02, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, - {Fingerprint: 0x02, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, + {Fingerprint: series[0].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, + {Fingerprint: series[0].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, + {Fingerprint: series[1].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, + {Fingerprint: series[1].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, + {Fingerprint: series[2].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, + {Fingerprint: series[2].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, } - result := groupChunkRefs(chunkRefs, nil) + result := groupChunkRefs(seriesMap, chunkRefs, nil) require.Equal(t, []*logproto.GroupedChunkRefs{ - {Fingerprint: 0x00, Tenant: "tenant", Refs: []*logproto.ShortRef{ + {Fingerprint: series[0].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{ {From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, {From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(series[0]), }}, - {Fingerprint: 0x01, Tenant: "tenant", Refs: []*logproto.ShortRef{ + {Fingerprint: series[1].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{ {From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, {From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(series[1]), }}, - {Fingerprint: 0x02, Tenant: "tenant", Refs: []*logproto.ShortRef{ + {Fingerprint: series[2].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{ {From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, {From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(series[2]), }}, }, result) } @@ -175,11 +193,15 @@ func BenchmarkGroupChunkRefs(b *testing.B) { n := 1000 // num series m := 10000 // num chunks per series chunkRefs := make([]*logproto.ChunkRef, 0, n*m) + series := make(map[uint64]labels.Labels, n) for i := 0; i < n; i++ { + s := labels.FromStrings("app", fmt.Sprintf("%d", i)) + sFP := s.Hash() + series[sFP] = s for j := 0; j < m; j++ { chunkRefs = append(chunkRefs, &logproto.ChunkRef{ - Fingerprint: uint64(n), + Fingerprint: sFP, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59"), @@ -196,5 +218,5 @@ func BenchmarkGroupChunkRefs(b *testing.B) { b.StartTimer() groups := make([]*logproto.GroupedChunkRefs, 0, n) - groupChunkRefs(chunkRefs, groups) + groupChunkRefs(series, chunkRefs, groups) } diff --git a/pkg/bloomgateway/util.go b/pkg/bloomgateway/util.go index 868fdf2ac6ed5..21803d9c84dab 100644 --- a/pkg/bloomgateway/util.go +++ b/pkg/bloomgateway/util.go @@ -110,6 +110,7 @@ func partitionSeriesByDay(from, through model.Time, seriesWithChunks []*logproto res = append(res, &logproto.GroupedChunkRefs{ Fingerprint: series.Fingerprint, + Labels: series.Labels, Tenant: series.Tenant, Refs: relevantChunks, }) diff --git a/pkg/indexgateway/gateway.go b/pkg/indexgateway/gateway.go index a053c8154bde2..785d59b60fab1 100644 --- a/pkg/indexgateway/gateway.go +++ b/pkg/indexgateway/gateway.go @@ -58,7 +58,7 @@ type IndexClientWithRange struct { } type BloomQuerier interface { - FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error) + FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series map[uint64]labels.Labels, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error) } type Gateway struct { @@ -225,12 +225,16 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ return nil, err } + series := make(map[uint64]labels.Labels) result = &logproto.GetChunkRefResponse{ Refs: make([]*logproto.ChunkRef, 0, len(chunks)), } for _, cs := range chunks { for i := range cs { result.Refs = append(result.Refs, &cs[i].ChunkRef) + if _, ok := series[cs[i].Fingerprint]; !ok { + series[cs[i].Fingerprint] = cs[i].Metric + } } } @@ -257,7 +261,7 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ return result, nil } - chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, result.Refs, req.Plan) + chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, series, result.Refs, req.Plan) if err != nil { return nil, err } diff --git a/pkg/logproto/bloomgateway.pb.go b/pkg/logproto/bloomgateway.pb.go index d1a2df6e63c40..e4841b8ae0bd9 100644 --- a/pkg/logproto/bloomgateway.pb.go +++ b/pkg/logproto/bloomgateway.pb.go @@ -178,6 +178,9 @@ type GroupedChunkRefs struct { Fingerprint uint64 `protobuf:"varint,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` Tenant string `protobuf:"bytes,2,opt,name=tenant,proto3" json:"tenant,omitempty"` Refs []*ShortRef `protobuf:"bytes,3,rep,name=refs,proto3" json:"refs,omitempty"` + // Labels are only populated on FilterChunkRefRequest. They are not returned on FilterChunkRefResponse + // TODO(salvacorts): Consider two different messages for FilterChunkRefRequest and FilterChunkRefResponse + Labels *IndexSeries `protobuf:"bytes,4,opt,name=labels,proto3" json:"labels,omitempty"` } func (m *GroupedChunkRefs) Reset() { *m = GroupedChunkRefs{} } @@ -233,6 +236,13 @@ func (m *GroupedChunkRefs) GetRefs() []*ShortRef { return nil } +func (m *GroupedChunkRefs) GetLabels() *IndexSeries { + if m != nil { + return m.Labels + } + return nil +} + func init() { proto.RegisterType((*FilterChunkRefRequest)(nil), "logproto.FilterChunkRefRequest") proto.RegisterType((*FilterChunkRefResponse)(nil), "logproto.FilterChunkRefResponse") @@ -243,39 +253,41 @@ func init() { func init() { proto.RegisterFile("pkg/logproto/bloomgateway.proto", fileDescriptor_a50b5dd1dbcd1415) } var fileDescriptor_a50b5dd1dbcd1415 = []byte{ - // 504 bytes of a gzipped FileDescriptorProto + // 532 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x53, 0xcd, 0x6e, 0xd3, 0x40, - 0x10, 0xf6, 0x36, 0x21, 0x24, 0x1b, 0xfe, 0xb4, 0x82, 0xca, 0x0a, 0xd2, 0xc6, 0xca, 0x81, 0xfa, - 0xe4, 0x95, 0x52, 0x21, 0x71, 0xe1, 0x92, 0x4a, 0x54, 0xdc, 0x60, 0x41, 0x1c, 0x90, 0x38, 0x38, - 0xee, 0xfa, 0x47, 0xb6, 0x77, 0xdc, 0xf5, 0x1a, 0xd4, 0x1b, 0x8f, 0xc0, 0x63, 0xf0, 0x04, 0x3c, - 0x43, 0x8f, 0x11, 0xa7, 0x8a, 0x43, 0x45, 0x9c, 0x0b, 0xc7, 0x3e, 0x02, 0xb2, 0x1d, 0x37, 0x4d, - 0x05, 0xaa, 0xc4, 0x89, 0x93, 0x77, 0x76, 0xbe, 0x19, 0x7f, 0xf3, 0x7d, 0x3b, 0x78, 0x9c, 0xc5, - 0x01, 0x4b, 0x20, 0xc8, 0x14, 0x68, 0x60, 0xf3, 0x04, 0x20, 0x0d, 0x5c, 0x2d, 0x3e, 0xb9, 0x27, - 0x4e, 0x7d, 0x45, 0xfa, 0x6d, 0x72, 0xf4, 0x30, 0x80, 0x00, 0x1a, 0x5c, 0x75, 0x6a, 0xf2, 0xa3, - 0xc7, 0x5b, 0x0d, 0xda, 0x43, 0x93, 0x9c, 0x7c, 0xdf, 0xc1, 0x8f, 0x5e, 0x44, 0x89, 0x16, 0xea, - 0x20, 0x2c, 0x64, 0xcc, 0x85, 0xcf, 0xc5, 0x71, 0x21, 0x72, 0x4d, 0x0e, 0x70, 0xd7, 0x57, 0x90, - 0x9a, 0xc8, 0x42, 0x76, 0x67, 0xc6, 0x4e, 0xcf, 0xc7, 0xc6, 0x8f, 0xf3, 0xf1, 0x5e, 0x10, 0xe9, - 0xb0, 0x98, 0x3b, 0x1e, 0xa4, 0x2c, 0x53, 0x90, 0x0a, 0x1d, 0x8a, 0x22, 0x67, 0x1e, 0xa4, 0x29, - 0x48, 0x96, 0xc2, 0x91, 0x48, 0x9c, 0xb7, 0x51, 0x2a, 0x78, 0x5d, 0x4c, 0x5e, 0xe2, 0xdb, 0x3a, - 0x54, 0x50, 0x04, 0xa1, 0xb9, 0xf3, 0x6f, 0x7d, 0xda, 0x7a, 0xe2, 0xe0, 0xae, 0x12, 0x7e, 0x6e, - 0x76, 0xac, 0x8e, 0x3d, 0x9c, 0x8e, 0x9c, 0xcb, 0x41, 0x0e, 0x15, 0x14, 0x99, 0x38, 0x6a, 0xf9, - 0xe7, 0xbc, 0xc6, 0x11, 0x17, 0x77, 0xb3, 0xc4, 0x95, 0xe6, 0x2d, 0x0b, 0xd9, 0xc3, 0xe9, 0xbd, - 0x0d, 0xfe, 0x55, 0xe2, 0xca, 0xd9, 0xf3, 0x35, 0x8f, 0xa7, 0x57, 0x78, 0x04, 0xca, 0xf5, 0x5d, - 0xe9, 0xb2, 0x04, 0xe2, 0x88, 0x7d, 0xdc, 0x67, 0x95, 0x6e, 0xc7, 0x85, 0x50, 0x91, 0x50, 0xac, - 0x6a, 0xe5, 0xbc, 0x2e, 0x84, 0x3a, 0xa9, 0xca, 0x79, 0xdd, 0x9a, 0xec, 0xe2, 0xde, 0x3c, 0x01, - 0x2f, 0xce, 0xcd, 0x9e, 0xd5, 0xb1, 0x07, 0x7c, 0x1d, 0x4d, 0x38, 0xde, 0xbd, 0xae, 0x69, 0x9e, - 0x81, 0xcc, 0x05, 0x79, 0x86, 0x07, 0x5e, 0xcb, 0xd3, 0x44, 0x37, 0x4e, 0xb2, 0x01, 0x4f, 0xbe, - 0x21, 0xdc, 0x7f, 0x13, 0x82, 0xd2, 0x5c, 0xf8, 0xff, 0x9d, 0x37, 0x23, 0xdc, 0xf7, 0x42, 0xe1, - 0xc5, 0x79, 0x91, 0x9a, 0x1d, 0x0b, 0xd9, 0x77, 0xf9, 0x65, 0x3c, 0xd1, 0xf8, 0xc1, 0xf5, 0xb9, - 0x88, 0x85, 0x87, 0x7e, 0x24, 0x03, 0xa1, 0x32, 0x15, 0x49, 0x5d, 0x8f, 0xd1, 0xe5, 0x57, 0xaf, - 0x2a, 0x69, 0xb5, 0x90, 0xae, 0xd4, 0x35, 0xb7, 0x01, 0x5f, 0x47, 0xe4, 0xc9, 0xd6, 0x2b, 0x20, - 0x1b, 0xed, 0x5a, 0x6d, 0x1a, 0xf7, 0xa7, 0x3e, 0xbe, 0x33, 0xab, 0x56, 0xe5, 0xb0, 0x59, 0x15, - 0xf2, 0x0e, 0xdf, 0xdf, 0xb6, 0x24, 0x27, 0xe3, 0x4d, 0xf1, 0x1f, 0x37, 0x60, 0x64, 0xfd, 0x1d, - 0xd0, 0xd8, 0x39, 0x31, 0x66, 0x1f, 0x16, 0x4b, 0x6a, 0x9c, 0x2d, 0xa9, 0x71, 0xb1, 0xa4, 0xe8, - 0x73, 0x49, 0xd1, 0xd7, 0x92, 0xa2, 0xd3, 0x92, 0xa2, 0x45, 0x49, 0xd1, 0xcf, 0x92, 0xa2, 0x5f, - 0x25, 0x35, 0x2e, 0x4a, 0x8a, 0xbe, 0xac, 0xa8, 0xb1, 0x58, 0x51, 0xe3, 0x6c, 0x45, 0x8d, 0xf7, - 0x7b, 0x37, 0xbc, 0xba, 0xf6, 0xbf, 0xf3, 0x5e, 0xfd, 0xd9, 0xff, 0x1d, 0x00, 0x00, 0xff, 0xff, - 0x30, 0x14, 0xc6, 0xc9, 0x05, 0x04, 0x00, 0x00, + 0x10, 0xf6, 0x36, 0x21, 0x24, 0x1b, 0xfe, 0xb4, 0xa2, 0x95, 0x15, 0xa4, 0x8d, 0x95, 0x03, 0xcd, + 0x05, 0x5b, 0x4a, 0x85, 0xc4, 0x85, 0x4b, 0x2a, 0x51, 0xf5, 0x06, 0x5b, 0xc4, 0x01, 0x89, 0x83, + 0xe3, 0x8c, 0x7f, 0x14, 0x7b, 0xd7, 0xdd, 0x5d, 0x03, 0xbd, 0xf1, 0x08, 0xbc, 0x03, 0x17, 0x9e, + 0x80, 0x67, 0xe8, 0x31, 0xe2, 0x54, 0x71, 0xa8, 0x88, 0x73, 0xe1, 0xd8, 0x47, 0x40, 0x76, 0xe2, + 0xba, 0xa9, 0x40, 0x95, 0x38, 0x71, 0xf2, 0xee, 0xce, 0x37, 0xe3, 0x6f, 0xbe, 0x6f, 0x06, 0xf7, + 0xd3, 0x59, 0xe0, 0xc4, 0x22, 0x48, 0xa5, 0xd0, 0xc2, 0x99, 0xc4, 0x42, 0x24, 0x81, 0xab, 0xe1, + 0x83, 0x7b, 0x62, 0x97, 0x4f, 0xa4, 0x5d, 0x05, 0x7b, 0x0f, 0x03, 0x11, 0x88, 0x15, 0xae, 0x38, + 0xad, 0xe2, 0xbd, 0x47, 0x1b, 0x05, 0xaa, 0xc3, 0x2a, 0x38, 0xf8, 0xbe, 0x85, 0xb7, 0x5f, 0x44, + 0xb1, 0x06, 0xb9, 0x1f, 0x66, 0x7c, 0xc6, 0xc0, 0x67, 0x70, 0x9c, 0x81, 0xd2, 0x64, 0x1f, 0x37, + 0x7d, 0x29, 0x12, 0x13, 0x59, 0x68, 0xd8, 0x18, 0x3b, 0xa7, 0xe7, 0x7d, 0xe3, 0xc7, 0x79, 0x7f, + 0x37, 0x88, 0x74, 0x98, 0x4d, 0x6c, 0x4f, 0x24, 0x4e, 0x2a, 0x45, 0x02, 0x3a, 0x84, 0x4c, 0x39, + 0x9e, 0x48, 0x12, 0xc1, 0x9d, 0x44, 0x4c, 0x21, 0xb6, 0x5f, 0x47, 0x09, 0xb0, 0x32, 0x99, 0x1c, + 0xe2, 0xdb, 0x3a, 0x94, 0x22, 0x0b, 0x42, 0x73, 0xeb, 0xdf, 0xea, 0x54, 0xf9, 0xc4, 0xc6, 0x4d, + 0x09, 0xbe, 0x32, 0x1b, 0x56, 0x63, 0xd8, 0x1d, 0xf5, 0xec, 0xcb, 0x46, 0x0e, 0xa4, 0xc8, 0x52, + 0x98, 0x56, 0xfc, 0x15, 0x2b, 0x71, 0xc4, 0xc5, 0xcd, 0x34, 0x76, 0xb9, 0x79, 0xcb, 0x42, 0xc3, + 0xee, 0xe8, 0x5e, 0x8d, 0x7f, 0x19, 0xbb, 0x7c, 0xfc, 0x7c, 0xcd, 0xe3, 0xe9, 0x15, 0x1e, 0x81, + 0x74, 0x7d, 0x97, 0xbb, 0x4e, 0x2c, 0x66, 0x91, 0xf3, 0x7e, 0xcf, 0x29, 0x74, 0x3b, 0xce, 0x40, + 0x46, 0x20, 0x9d, 0xa2, 0x94, 0xfd, 0x2a, 0x03, 0x79, 0x52, 0xa4, 0xb3, 0xb2, 0x34, 0xd9, 0xc1, + 0xad, 0x49, 0x2c, 0xbc, 0x99, 0x32, 0x5b, 0x56, 0x63, 0xd8, 0x61, 0xeb, 0xdb, 0x80, 0xe1, 0x9d, + 0xeb, 0x9a, 0xaa, 0x54, 0x70, 0x05, 0xe4, 0x19, 0xee, 0x78, 0x15, 0x4f, 0x13, 0xdd, 0xd8, 0x49, + 0x0d, 0x1e, 0x7c, 0x43, 0xb8, 0x7d, 0x14, 0x0a, 0xa9, 0x19, 0xf8, 0xff, 0x9d, 0x37, 0x3d, 0xdc, + 0xf6, 0x42, 0xf0, 0x66, 0x2a, 0x4b, 0xcc, 0x86, 0x85, 0x86, 0x77, 0xd9, 0xe5, 0x7d, 0xf0, 0x05, + 0xe1, 0x07, 0xd7, 0x1b, 0x23, 0x16, 0xee, 0xfa, 0x11, 0x0f, 0x40, 0xa6, 0x32, 0xe2, 0xba, 0xec, + 0xa3, 0xc9, 0xae, 0x3e, 0x15, 0xda, 0x6a, 0xe0, 0x2e, 0xd7, 0x25, 0xb9, 0x0e, 0x5b, 0xdf, 0xc8, + 0xe3, 0x8d, 0x31, 0x20, 0xb5, 0x78, 0x95, 0x38, 0x6b, 0xfb, 0x9f, 0xe0, 0x56, 0xec, 0x4e, 0x20, + 0x56, 0x66, 0xb3, 0x1c, 0x80, 0xed, 0x1a, 0x79, 0xc8, 0xa7, 0xf0, 0xf1, 0xa8, 0xf0, 0x55, 0xb1, + 0x35, 0x68, 0xe4, 0xe3, 0x3b, 0xe3, 0x62, 0xb5, 0x0e, 0x56, 0xab, 0x45, 0xde, 0xe0, 0xfb, 0x9b, + 0x16, 0x2a, 0xd2, 0xaf, 0x2b, 0xfc, 0x71, 0x63, 0x7a, 0xd6, 0xdf, 0x01, 0x2b, 0xfb, 0x07, 0xc6, + 0xf8, 0xdd, 0x7c, 0x41, 0x8d, 0xb3, 0x05, 0x35, 0x2e, 0x16, 0x14, 0x7d, 0xca, 0x29, 0xfa, 0x9a, + 0x53, 0x74, 0x9a, 0x53, 0x34, 0xcf, 0x29, 0xfa, 0x99, 0x53, 0xf4, 0x2b, 0xa7, 0xc6, 0x45, 0x4e, + 0xd1, 0xe7, 0x25, 0x35, 0xe6, 0x4b, 0x6a, 0x9c, 0x2d, 0xa9, 0xf1, 0x76, 0xf7, 0x86, 0x29, 0xad, + 0xfe, 0x3b, 0x69, 0x95, 0x9f, 0xbd, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xcf, 0x05, 0x31, 0x08, + 0x35, 0x04, 0x00, 0x00, } func (this *FilterChunkRefRequest) Equal(that interface{}) bool { @@ -416,6 +428,9 @@ func (this *GroupedChunkRefs) Equal(that interface{}) bool { return false } } + if !this.Labels.Equal(that1.Labels) { + return false + } return true } func (this *FilterChunkRefRequest) GoString() string { @@ -462,13 +477,16 @@ func (this *GroupedChunkRefs) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 7) + s := make([]string, 0, 8) s = append(s, "&logproto.GroupedChunkRefs{") s = append(s, "Fingerprint: "+fmt.Sprintf("%#v", this.Fingerprint)+",\n") s = append(s, "Tenant: "+fmt.Sprintf("%#v", this.Tenant)+",\n") if this.Refs != nil { s = append(s, "Refs: "+fmt.Sprintf("%#v", this.Refs)+",\n") } + if this.Labels != nil { + s = append(s, "Labels: "+fmt.Sprintf("%#v", this.Labels)+",\n") + } s = append(s, "}") return strings.Join(s, "") } @@ -722,6 +740,18 @@ func (m *GroupedChunkRefs) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Labels != nil { + { + size, err := m.Labels.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintBloomgateway(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } if len(m.Refs) > 0 { for iNdEx := len(m.Refs) - 1; iNdEx >= 0; iNdEx-- { { @@ -843,6 +873,10 @@ func (m *GroupedChunkRefs) Size() (n int) { n += 1 + l + sovBloomgateway(uint64(l)) } } + if m.Labels != nil { + l = m.Labels.Size() + n += 1 + l + sovBloomgateway(uint64(l)) + } return n } @@ -911,6 +945,7 @@ func (this *GroupedChunkRefs) String() string { `Fingerprint:` + fmt.Sprintf("%v", this.Fingerprint) + `,`, `Tenant:` + fmt.Sprintf("%v", this.Tenant) + `,`, `Refs:` + repeatedStringForRefs + `,`, + `Labels:` + strings.Replace(fmt.Sprintf("%v", this.Labels), "IndexSeries", "IndexSeries", 1) + `,`, `}`, }, "") return s @@ -1424,6 +1459,42 @@ func (m *GroupedChunkRefs) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBloomgateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthBloomgateway + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthBloomgateway + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Labels == nil { + m.Labels = &IndexSeries{} + } + if err := m.Labels.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipBloomgateway(dAtA[iNdEx:]) diff --git a/pkg/logproto/bloomgateway.proto b/pkg/logproto/bloomgateway.proto index 0d80d865b32f4..809afb90b7b5a 100644 --- a/pkg/logproto/bloomgateway.proto +++ b/pkg/logproto/bloomgateway.proto @@ -45,6 +45,9 @@ message GroupedChunkRefs { uint64 fingerprint = 1; string tenant = 2; repeated ShortRef refs = 3; + // Labels are only populated on FilterChunkRefRequest. They are not returned on FilterChunkRefResponse + // TODO(salvacorts): Consider two different messages for FilterChunkRefRequest and FilterChunkRefResponse + IndexSeries labels = 4; } service BloomGateway { diff --git a/pkg/logproto/compat.go b/pkg/logproto/compat.go index 405b29cc6148d..69ffe4dece3ca 100644 --- a/pkg/logproto/compat.go +++ b/pkg/logproto/compat.go @@ -391,6 +391,7 @@ func (m *FilterChunkRefRequest) WithStartEndForCache(start, end time.Time) resul if len(refs) > 0 { chunkRefs = append(chunkRefs, &GroupedChunkRefs{ Fingerprint: chunkRef.Fingerprint, + Labels: chunkRef.Labels, Tenant: chunkRef.Tenant, Refs: refs, }) diff --git a/pkg/storage/bloom/v1/bloom_tester.go b/pkg/storage/bloom/v1/bloom_tester.go index 6fe00a4cd1730..b70c64804118e 100644 --- a/pkg/storage/bloom/v1/bloom_tester.go +++ b/pkg/storage/bloom/v1/bloom_tester.go @@ -4,28 +4,32 @@ import ( "fmt" "unsafe" + "github.com/go-kit/log/level" + "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" + util_log "github.com/grafana/loki/v3/pkg/util/log" ) type BloomTest interface { - Matches(bloom filter.Checker) bool - MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool + Matches(series labels.Labels, bloom filter.Checker) bool + MatchesWithPrefixBuf(series labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool } type BloomTests []BloomTest -func (b BloomTests) Matches(bloom filter.Checker) bool { +func (b BloomTests) Matches(series labels.Labels, bloom filter.Checker) bool { for _, test := range b { - if !test.Matches(bloom) { + if !test.Matches(series, bloom) { return false } } return true } -func (b BloomTests) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { +func (b BloomTests) MatchesWithPrefixBuf(series labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool { for _, test := range b { - if !test.MatchesWithPrefixBuf(bloom, buf, prefixLen) { + if !test.MatchesWithPrefixBuf(series, bloom, buf, prefixLen) { return false } } @@ -37,12 +41,12 @@ type matchAllTest struct{} var MatchAll = matchAllTest{} // Matches implements BloomTest -func (n matchAllTest) Matches(_ filter.Checker) bool { +func (n matchAllTest) Matches(_ labels.Labels, _ filter.Checker) bool { return true } // MatchesWithPrefixBuf implements BloomTest -func (n matchAllTest) MatchesWithPrefixBuf(_ filter.Checker, _ []byte, _ int) bool { +func (n matchAllTest) MatchesWithPrefixBuf(_ labels.Labels, _ filter.Checker, _ []byte, _ int) bool { return true } @@ -72,13 +76,13 @@ func newOrTest(left, right BloomTest) orTest { } // Matches implements BloomTest -func (o orTest) Matches(bloom filter.Checker) bool { - return o.left.Matches(bloom) || o.right.Matches(bloom) +func (o orTest) Matches(series labels.Labels, bloom filter.Checker) bool { + return o.left.Matches(series, bloom) || o.right.Matches(series, bloom) } // MatchesWithPrefixBuf implements BloomTest -func (o orTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { - return o.left.MatchesWithPrefixBuf(bloom, buf, prefixLen) || o.right.MatchesWithPrefixBuf(bloom, buf, prefixLen) +func (o orTest) MatchesWithPrefixBuf(series labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool { + return o.left.MatchesWithPrefixBuf(series, bloom, buf, prefixLen) || o.right.MatchesWithPrefixBuf(series, bloom, buf, prefixLen) } type andTest struct { @@ -93,13 +97,13 @@ func newAndTest(left, right BloomTest) andTest { } // Matches implements BloomTest -func (a andTest) Matches(bloom filter.Checker) bool { - return a.left.Matches(bloom) && a.right.Matches(bloom) +func (a andTest) Matches(series labels.Labels, bloom filter.Checker) bool { + return a.left.Matches(series, bloom) && a.right.Matches(series, bloom) } // MatchesWithPrefixBuf implements BloomTest -func (a andTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { - return a.left.MatchesWithPrefixBuf(bloom, buf, prefixLen) && a.right.MatchesWithPrefixBuf(bloom, buf, prefixLen) +func (a andTest) MatchesWithPrefixBuf(series labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool { + return a.left.MatchesWithPrefixBuf(series, bloom, buf, prefixLen) && a.right.MatchesWithPrefixBuf(series, bloom, buf, prefixLen) } func LabelMatchersToBloomTest(matchers ...LabelMatcher) BloomTest { @@ -144,7 +148,7 @@ func newStringMatcherTest(matcher PlainLabelMatcher) stringMatcherTest { return stringMatcherTest{matcher: matcher} } -func (sm stringMatcherTest) Matches(bloom filter.Checker) bool { +func (sm stringMatcherTest) Matches(series labels.Labels, bloom filter.Checker) bool { // TODO(rfratto): reintroduce the use of a shared tokenizer here to avoid // desyncing between how tokens are passed during building vs passed during // querying. @@ -155,44 +159,38 @@ func (sm stringMatcherTest) Matches(bloom filter.Checker) bool { // 2. It should be possible to test for just the key var ( - combined = fmt.Sprintf("%s=%s", sm.matcher.Key, sm.matcher.Value) - - rawKey = unsafe.Slice(unsafe.StringData(sm.matcher.Key), len(sm.matcher.Key)) + combined = fmt.Sprintf("%s=%s", sm.matcher.Key, sm.matcher.Value) rawCombined = unsafe.Slice(unsafe.StringData(combined), len(combined)) ) - if !bloom.Test(rawKey) { - // The structured metadata key wasn't indexed. However, sm.matcher might be - // checking against a label which *does* exist, so we can't safely filter - // out this chunk. - // - // TODO(rfratto): The negative test here is a bit confusing, and the key - // presence test should likely be done higher up within FuseQuerier. - return true - } - - return bloom.Test(rawCombined) + return sm.match(series, bloom, rawCombined) } -func (sm stringMatcherTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { +func (sm stringMatcherTest) MatchesWithPrefixBuf(series labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool { var ( - combined = fmt.Sprintf("%s=%s", sm.matcher.Key, sm.matcher.Value) - - prefixedKey = appendToBuf(buf, prefixLen, sm.matcher.Key) + combined = fmt.Sprintf("%s=%s", sm.matcher.Key, sm.matcher.Value) prefixedCombined = appendToBuf(buf, prefixLen, combined) ) - if !bloom.Test(prefixedKey) { - // The structured metadata key wasn't indexed for a prefix. However, - // sm.matcher might be checking against a label which *does* exist, so we - // can't safely filter out this chunk. - // - // TODO(rfratto): The negative test here is a bit confusing, and the key - // presence test should likely be done higher up within FuseQuerier. + return sm.match(series, bloom, prefixedCombined) +} + +// match returns true if the series matches the matcher or is in the bloom filter. +func (sm stringMatcherTest) match(series labels.Labels, bloom filter.Checker, combined []byte) bool { + // If we don't have the series labels, we cannot disambiguate which labels come from the series in which case + // we may filter out chunks for queries like `{env="prod"} | env="prod"` if env=prod is not structured metadata + if len(series) == 0 { + level.Warn(util_log.Logger).Log("msg", "series has no labels, cannot filter out chunks") return true } - return bloom.Test(prefixedCombined) + // It's in the series if the key is set and has the same value. + // By checking val != "" we handle `{env="prod"} | user=""`. + val := series.Get(sm.matcher.Key) + inSeries := val != "" && val == sm.matcher.Value + + inBloom := bloom.Test(combined) + return inSeries || inBloom } // appendToBuf is the equivalent of append(buf[:prefixLen], str). len(buf) must diff --git a/pkg/storage/bloom/v1/bloom_tester_test.go b/pkg/storage/bloom/v1/bloom_tester_test.go index 7a314872cc86e..935da19de255a 100644 --- a/pkg/storage/bloom/v1/bloom_tester_test.go +++ b/pkg/storage/bloom/v1/bloom_tester_test.go @@ -3,6 +3,7 @@ package v1 import ( "testing" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logql/syntax" @@ -20,9 +21,11 @@ func TestLabelMatchersToBloomTest(t *testing.T) { tokenizer, push.LabelAdapter{Name: "trace_id", Value: "exists_1"}, push.LabelAdapter{Name: "trace_id", Value: "exists_2"}, + push.LabelAdapter{Name: "app", Value: "other"}, ) ) + series := labels.FromStrings("env", "prod", "app", "fake") tt := []struct { name string query string @@ -54,15 +57,40 @@ func TestLabelMatchersToBloomTest(t *testing.T) { match: false, }, { - name: "ignore non-indexed key", + name: "filter non-indexed key", query: `{app="fake"} | noexist="noexist"`, - match: true, + match: false, }, { - name: "ignore non-indexed key with empty value", + name: "filter non-indexed key with empty value", query: `{app="fake"} | noexist=""`, + match: false, + }, + { + name: "ignore label from series", + query: `{app="fake"} | env="prod"`, match: true, }, + { + name: "filter label from series", + query: `{app="fake"} | env="dev"`, // env is set to prod in the series + match: false, + }, + { + name: "ignore label from series and structured metadata", + query: `{app="fake"} | app="other"`, + match: true, + }, + { + name: "filter series label with non-existing value", + query: `{app="fake"} | app="noexist"`, + match: false, + }, + { + name: "ignore label from series with empty value", + query: `{app="fake"} | app=""`, + match: false, + }, { name: "ignore unsupported operator", query: `{app="fake"} | trace_id=~".*noexist.*"`, @@ -99,8 +127,8 @@ func TestLabelMatchersToBloomTest(t *testing.T) { bloomTest := LabelMatchersToBloomTest(matchers...) // .Matches and .MatchesWithPrefixBuf should both have the same result. - require.Equal(t, tc.match, bloomTest.Matches(bloom)) - require.Equal(t, tc.match, bloomTest.MatchesWithPrefixBuf(bloom, []byte(prefix), len(prefix))) + require.Equal(t, tc.match, bloomTest.Matches(series, bloom)) + require.Equal(t, tc.match, bloomTest.MatchesWithPrefixBuf(series, bloom, []byte(prefix), len(prefix))) }) } } diff --git a/pkg/storage/bloom/v1/fuse.go b/pkg/storage/bloom/v1/fuse.go index b25743ea4c541..ea28a5534bc49 100644 --- a/pkg/storage/bloom/v1/fuse.go +++ b/pkg/storage/bloom/v1/fuse.go @@ -7,6 +7,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "go.uber.org/atomic" iter "github.com/grafana/loki/v3/pkg/iter/v2" @@ -15,6 +16,7 @@ import ( type Request struct { Fp model.Fingerprint + Labels labels.Labels Chks ChunkRefs Search BloomTest Response chan<- Output @@ -252,6 +254,7 @@ func (fq *FusedQuerier) Run() error { nextBatch := fq.inputs.At() fp := nextBatch[0].Fp + lbs := nextBatch[0].Labels // advance the series iterator to the next fingerprint if err := fq.bq.Seek(fp); err != nil { @@ -276,13 +279,13 @@ func (fq *FusedQuerier) Run() error { continue } - fq.runSeries(schema, series, nextBatch) + fq.runSeries(schema, lbs, series, nextBatch) } return nil } -func (fq *FusedQuerier) runSeries(_ Schema, series *SeriesWithMeta, reqs []Request) { +func (fq *FusedQuerier) runSeries(_ Schema, lbs labels.Labels, series *SeriesWithMeta, reqs []Request) { // For a given chunk|series to be removed, it must fail to match all blooms. // Because iterating/loading blooms can be expensive, we iterate blooms one at a time, collecting // the removals (failures) for each (bloom, chunk) pair. @@ -372,7 +375,7 @@ func (fq *FusedQuerier) runSeries(_ Schema, series *SeriesWithMeta, reqs []Reque // shortcut: series level removal // we can skip testing chunk keys individually if the bloom doesn't match // the query. - if !req.Search.Matches(bloom) { + if !req.Search.Matches(lbs, bloom) { // Nothing else needs to be done for this (bloom, request); // check the next input request continue @@ -387,7 +390,7 @@ func (fq *FusedQuerier) runSeries(_ Schema, series *SeriesWithMeta, reqs []Reque // TODO(rfratto): reuse buffer between multiple calls to // prefixForChunkRef and MatchesWithPrefixBuf to avoid allocations. tokenBuf := prefixForChunkRef(chk) - if matched := req.Search.MatchesWithPrefixBuf(bloom, tokenBuf, len(tokenBuf)); matched { + if matched := req.Search.MatchesWithPrefixBuf(lbs, bloom, tokenBuf, len(tokenBuf)); matched { inputs[j].found[k] = true } } diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index 3283dc6ccdb59..9d3e56236f8b4 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/concurrency" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/compression" @@ -27,12 +28,12 @@ var BloomPagePool = mempool.New("test", []mempool.Bucket{ type singleKeyTest []byte // Matches implements BloomTest. -func (s singleKeyTest) Matches(bloom filter.Checker) bool { +func (s singleKeyTest) Matches(_ labels.Labels, bloom filter.Checker) bool { return bloom.Test(s) } // MatchesWithPrefixBuf implements BloomTest. -func (s singleKeyTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { +func (s singleKeyTest) MatchesWithPrefixBuf(_ labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool { return bloom.Test(append(buf[:prefixLen], s...)) }