From 8c0116db04a28fce3ec4160fa7d67a2a2d58f2fc Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 30 Oct 2024 11:46:48 +0100 Subject: [PATCH 1/8] Filter out label filters that are part of the streams --- pkg/bloomgateway/bloomgateway.go | 20 ++- pkg/bloomgateway/bloomgateway_test.go | 2 +- pkg/bloomgateway/querier.go | 16 ++- pkg/bloomgateway/querier_test.go | 50 +++++--- pkg/indexgateway/gateway.go | 9 +- pkg/logproto/bloomgateway.pb.go | 141 +++++++++++++++------ pkg/logproto/bloomgateway.proto | 1 + pkg/storage/bloom/v1/ast_extractor.go | 49 ++++++- pkg/storage/bloom/v1/ast_extractor_test.go | 28 +++- 9 files changed, 249 insertions(+), 67 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index c221af9e6d34c..936208c5672d0 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -19,6 +19,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" "go.uber.org/atomic" iter "github.com/grafana/loki/v3/pkg/iter/v2" @@ -160,6 +161,18 @@ func (g *Gateway) stopping(_ error) error { return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr) } +func extractSeries(refs []*logproto.GroupedChunkRefs) []labels.Labels { + series := make([]labels.Labels, 0, len(refs)) + for _, ref := range refs { + if ref.Labels == nil { + continue + } + lbs := logproto.FromLabelAdaptersToLabels(ref.Labels.Labels) + series = append(series, lbs) + } + return series +} + // FilterChunkRefs implements BloomGatewayServer func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error) { tenantID, err := tenant.TenantID(ctx) @@ -193,7 +206,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return nil, errors.New("from time must not be after through time") } - matchers := v1.ExtractTestableLabelMatchers(req.Plan.AST) + // This time, we do pass the series to the ExtractTestableLabelMatchers + // To this point, we have called ExtractTestableLabelMatchers multiple times + // without the series (faster) just to return early if there are no filters expressions. + // We now need to be more precise and only extract matchers that do not + // match the series labels. + matchers := v1.ExtractTestableLabelMatchers(req.Plan.AST, extractSeries(req.Refs)...) stats.NumMatchers = len(matchers) g.metrics.receivedMatchers.Observe(float64(len(matchers))) diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 698b1ecf40e47..20f1c7eba162a 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -41,7 +41,7 @@ func stringSlice[T fmt.Stringer](s []T) []string { func groupRefs(t *testing.T, chunkRefs []*logproto.ChunkRef) []*logproto.GroupedChunkRefs { t.Helper() - return groupChunkRefs(chunkRefs, nil) + return groupChunkRefs(nil, chunkRefs, nil) } func newLimits() *validation.Overrides { diff --git a/pkg/bloomgateway/querier.go b/pkg/bloomgateway/querier.go index 211372b84bcc5..d1d64d9c34b48 100644 --- a/pkg/bloomgateway/querier.go +++ b/pkg/bloomgateway/querier.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/querier/plan" @@ -101,7 +102,7 @@ func convertToShortRef(ref *logproto.ChunkRef) *logproto.ShortRef { return &logproto.ShortRef{From: ref.From, Through: ref.Through, Checksum: ref.Checksum} } -func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) { +func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series []labels.Labels, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) { // Shortcut that does not require any filtering if !bq.limits.BloomGatewayEnabled(tenant) || len(chunkRefs) == 0 || len(v1.ExtractTestableLabelMatchers(queryPlan.AST)) == 0 { return chunkRefs, nil @@ -112,7 +113,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from grouped := groupedChunksRefPool.Get(len(chunkRefs)) defer groupedChunksRefPool.Put(grouped) - grouped = groupChunkRefs(chunkRefs, grouped) + grouped = groupChunkRefs(series, chunkRefs, grouped) preFilterChunks := len(chunkRefs) preFilterSeries := len(grouped) @@ -225,7 +226,14 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from // groups them by fingerprint. // The second argument `grouped` can be used to pass a buffer to avoid allocations. // If it's nil, the returned slice will be allocated. -func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs { +func groupChunkRefs(series []labels.Labels, chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs { + seriesFPs := make(map[uint64]*logproto.IndexSeries, len(series)) + for _, s := range series { + seriesFPs[s.Hash()] = &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(s), + } + } + seen := make(map[uint64]int, len(grouped)) for _, chunkRef := range chunkRefs { if idx, found := seen[chunkRef.Fingerprint]; found { @@ -234,10 +242,12 @@ func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedC seen[chunkRef.Fingerprint] = len(grouped) grouped = append(grouped, &logproto.GroupedChunkRefs{ Fingerprint: chunkRef.Fingerprint, + Labels: seriesFPs[chunkRef.Fingerprint], Tenant: chunkRef.UserID, Refs: []*logproto.ShortRef{convertToShortRef(chunkRef)}, }) } } + return grouped } diff --git a/pkg/bloomgateway/querier_test.go b/pkg/bloomgateway/querier_test.go index ca4036d266edb..daad565c1d06f 100644 --- a/pkg/bloomgateway/querier_test.go +++ b/pkg/bloomgateway/querier_test.go @@ -2,6 +2,7 @@ package bloomgateway import ( "context" + "fmt" "math/rand" "sort" "testing" @@ -10,6 +11,7 @@ import ( "github.com/go-kit/log" "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logproto" @@ -79,7 +81,7 @@ func TestBloomQuerier(t *testing.T) { } expr, err := syntax.ParseExpr(`{foo="bar"}`) require.NoError(t, err) - res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr}) + res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr}) require.NoError(t, err) require.Equal(t, chunkRefs, res) require.Equal(t, 0, c.callCount) @@ -95,7 +97,7 @@ func TestBloomQuerier(t *testing.T) { chunkRefs := []*logproto.ChunkRef{} expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`) require.NoError(t, err) - res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr}) + res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr}) require.NoError(t, err) require.Equal(t, chunkRefs, res) require.Equal(t, 0, c.callCount) @@ -115,7 +117,7 @@ func TestBloomQuerier(t *testing.T) { } expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`) require.NoError(t, err) - res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr}) + res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr}) require.Error(t, err) require.Nil(t, res) }) @@ -134,7 +136,7 @@ func TestBloomQuerier(t *testing.T) { } expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`) require.NoError(t, err) - res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr}) + res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr}) require.NoError(t, err) require.Equal(t, chunkRefs, res) require.Equal(t, 2, c.callCount) @@ -143,28 +145,40 @@ func TestBloomQuerier(t *testing.T) { } func TestGroupChunkRefs(t *testing.T) { + series := []labels.Labels{ + labels.FromStrings("app", "1"), + labels.FromStrings("app", "2"), + labels.FromStrings("app", "3"), + } + chunkRefs := []*logproto.ChunkRef{ - {Fingerprint: 0x00, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, - {Fingerprint: 0x00, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, - {Fingerprint: 0x01, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, - {Fingerprint: 0x01, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, - {Fingerprint: 0x02, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, - {Fingerprint: 0x02, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, + {Fingerprint: series[0].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, + {Fingerprint: series[0].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, + {Fingerprint: series[1].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, + {Fingerprint: series[1].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, + {Fingerprint: series[2].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, + {Fingerprint: series[2].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, } - result := groupChunkRefs(chunkRefs, nil) + result := groupChunkRefs(series, chunkRefs, nil) require.Equal(t, []*logproto.GroupedChunkRefs{ - {Fingerprint: 0x00, Tenant: "tenant", Refs: []*logproto.ShortRef{ + {Fingerprint: series[0].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{ {From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, {From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(series[0]), }}, - {Fingerprint: 0x01, Tenant: "tenant", Refs: []*logproto.ShortRef{ + {Fingerprint: series[1].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{ {From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, {From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(series[1]), }}, - {Fingerprint: 0x02, Tenant: "tenant", Refs: []*logproto.ShortRef{ + {Fingerprint: series[2].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{ {From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, {From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(series[2]), }}, }, result) } @@ -175,11 +189,15 @@ func BenchmarkGroupChunkRefs(b *testing.B) { n := 1000 // num series m := 10000 // num chunks per series chunkRefs := make([]*logproto.ChunkRef, 0, n*m) + series := make([]labels.Labels, 0, n) for i := 0; i < n; i++ { + s := labels.FromStrings("app", fmt.Sprintf("%d", i)) + sFP := s.Hash() + series = append(series, s) for j := 0; j < m; j++ { chunkRefs = append(chunkRefs, &logproto.ChunkRef{ - Fingerprint: uint64(n), + Fingerprint: sFP, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59"), @@ -196,5 +214,5 @@ func BenchmarkGroupChunkRefs(b *testing.B) { b.StartTimer() groups := make([]*logproto.GroupedChunkRefs, 0, n) - groupChunkRefs(chunkRefs, groups) + groupChunkRefs(series, chunkRefs, groups) } diff --git a/pkg/indexgateway/gateway.go b/pkg/indexgateway/gateway.go index a053c8154bde2..1329692cb1988 100644 --- a/pkg/indexgateway/gateway.go +++ b/pkg/indexgateway/gateway.go @@ -58,7 +58,7 @@ type IndexClientWithRange struct { } type BloomQuerier interface { - FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error) + FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series []labels.Labels, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error) } type Gateway struct { @@ -257,7 +257,12 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ return result, nil } - chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, result.Refs, req.Plan) + series, err := g.indexQuerier.GetSeries(ctx, instanceID, req.From, req.Through, matchers...) + if err != nil { + return nil, err + } + + chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, series, result.Refs, req.Plan) if err != nil { return nil, err } diff --git a/pkg/logproto/bloomgateway.pb.go b/pkg/logproto/bloomgateway.pb.go index d1a2df6e63c40..bc91671d6424f 100644 --- a/pkg/logproto/bloomgateway.pb.go +++ b/pkg/logproto/bloomgateway.pb.go @@ -175,9 +175,10 @@ func (m *ShortRef) GetChecksum() uint32 { } type GroupedChunkRefs struct { - Fingerprint uint64 `protobuf:"varint,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` - Tenant string `protobuf:"bytes,2,opt,name=tenant,proto3" json:"tenant,omitempty"` - Refs []*ShortRef `protobuf:"bytes,3,rep,name=refs,proto3" json:"refs,omitempty"` + Fingerprint uint64 `protobuf:"varint,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` + Tenant string `protobuf:"bytes,2,opt,name=tenant,proto3" json:"tenant,omitempty"` + Refs []*ShortRef `protobuf:"bytes,3,rep,name=refs,proto3" json:"refs,omitempty"` + Labels *IndexSeries `protobuf:"bytes,4,opt,name=labels,proto3" json:"labels,omitempty"` } func (m *GroupedChunkRefs) Reset() { *m = GroupedChunkRefs{} } @@ -233,6 +234,13 @@ func (m *GroupedChunkRefs) GetRefs() []*ShortRef { return nil } +func (m *GroupedChunkRefs) GetLabels() *IndexSeries { + if m != nil { + return m.Labels + } + return nil +} + func init() { proto.RegisterType((*FilterChunkRefRequest)(nil), "logproto.FilterChunkRefRequest") proto.RegisterType((*FilterChunkRefResponse)(nil), "logproto.FilterChunkRefResponse") @@ -243,39 +251,41 @@ func init() { func init() { proto.RegisterFile("pkg/logproto/bloomgateway.proto", fileDescriptor_a50b5dd1dbcd1415) } var fileDescriptor_a50b5dd1dbcd1415 = []byte{ - // 504 bytes of a gzipped FileDescriptorProto + // 532 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x53, 0xcd, 0x6e, 0xd3, 0x40, - 0x10, 0xf6, 0x36, 0x21, 0x24, 0x1b, 0xfe, 0xb4, 0x82, 0xca, 0x0a, 0xd2, 0xc6, 0xca, 0x81, 0xfa, - 0xe4, 0x95, 0x52, 0x21, 0x71, 0xe1, 0x92, 0x4a, 0x54, 0xdc, 0x60, 0x41, 0x1c, 0x90, 0x38, 0x38, - 0xee, 0xfa, 0x47, 0xb6, 0x77, 0xdc, 0xf5, 0x1a, 0xd4, 0x1b, 0x8f, 0xc0, 0x63, 0xf0, 0x04, 0x3c, - 0x43, 0x8f, 0x11, 0xa7, 0x8a, 0x43, 0x45, 0x9c, 0x0b, 0xc7, 0x3e, 0x02, 0xb2, 0x1d, 0x37, 0x4d, - 0x05, 0xaa, 0xc4, 0x89, 0x93, 0x77, 0x76, 0xbe, 0x19, 0x7f, 0xf3, 0x7d, 0x3b, 0x78, 0x9c, 0xc5, - 0x01, 0x4b, 0x20, 0xc8, 0x14, 0x68, 0x60, 0xf3, 0x04, 0x20, 0x0d, 0x5c, 0x2d, 0x3e, 0xb9, 0x27, - 0x4e, 0x7d, 0x45, 0xfa, 0x6d, 0x72, 0xf4, 0x30, 0x80, 0x00, 0x1a, 0x5c, 0x75, 0x6a, 0xf2, 0xa3, - 0xc7, 0x5b, 0x0d, 0xda, 0x43, 0x93, 0x9c, 0x7c, 0xdf, 0xc1, 0x8f, 0x5e, 0x44, 0x89, 0x16, 0xea, - 0x20, 0x2c, 0x64, 0xcc, 0x85, 0xcf, 0xc5, 0x71, 0x21, 0x72, 0x4d, 0x0e, 0x70, 0xd7, 0x57, 0x90, - 0x9a, 0xc8, 0x42, 0x76, 0x67, 0xc6, 0x4e, 0xcf, 0xc7, 0xc6, 0x8f, 0xf3, 0xf1, 0x5e, 0x10, 0xe9, - 0xb0, 0x98, 0x3b, 0x1e, 0xa4, 0x2c, 0x53, 0x90, 0x0a, 0x1d, 0x8a, 0x22, 0x67, 0x1e, 0xa4, 0x29, - 0x48, 0x96, 0xc2, 0x91, 0x48, 0x9c, 0xb7, 0x51, 0x2a, 0x78, 0x5d, 0x4c, 0x5e, 0xe2, 0xdb, 0x3a, - 0x54, 0x50, 0x04, 0xa1, 0xb9, 0xf3, 0x6f, 0x7d, 0xda, 0x7a, 0xe2, 0xe0, 0xae, 0x12, 0x7e, 0x6e, - 0x76, 0xac, 0x8e, 0x3d, 0x9c, 0x8e, 0x9c, 0xcb, 0x41, 0x0e, 0x15, 0x14, 0x99, 0x38, 0x6a, 0xf9, - 0xe7, 0xbc, 0xc6, 0x11, 0x17, 0x77, 0xb3, 0xc4, 0x95, 0xe6, 0x2d, 0x0b, 0xd9, 0xc3, 0xe9, 0xbd, - 0x0d, 0xfe, 0x55, 0xe2, 0xca, 0xd9, 0xf3, 0x35, 0x8f, 0xa7, 0x57, 0x78, 0x04, 0xca, 0xf5, 0x5d, - 0xe9, 0xb2, 0x04, 0xe2, 0x88, 0x7d, 0xdc, 0x67, 0x95, 0x6e, 0xc7, 0x85, 0x50, 0x91, 0x50, 0xac, - 0x6a, 0xe5, 0xbc, 0x2e, 0x84, 0x3a, 0xa9, 0xca, 0x79, 0xdd, 0x9a, 0xec, 0xe2, 0xde, 0x3c, 0x01, - 0x2f, 0xce, 0xcd, 0x9e, 0xd5, 0xb1, 0x07, 0x7c, 0x1d, 0x4d, 0x38, 0xde, 0xbd, 0xae, 0x69, 0x9e, - 0x81, 0xcc, 0x05, 0x79, 0x86, 0x07, 0x5e, 0xcb, 0xd3, 0x44, 0x37, 0x4e, 0xb2, 0x01, 0x4f, 0xbe, - 0x21, 0xdc, 0x7f, 0x13, 0x82, 0xd2, 0x5c, 0xf8, 0xff, 0x9d, 0x37, 0x23, 0xdc, 0xf7, 0x42, 0xe1, - 0xc5, 0x79, 0x91, 0x9a, 0x1d, 0x0b, 0xd9, 0x77, 0xf9, 0x65, 0x3c, 0xd1, 0xf8, 0xc1, 0xf5, 0xb9, - 0x88, 0x85, 0x87, 0x7e, 0x24, 0x03, 0xa1, 0x32, 0x15, 0x49, 0x5d, 0x8f, 0xd1, 0xe5, 0x57, 0xaf, - 0x2a, 0x69, 0xb5, 0x90, 0xae, 0xd4, 0x35, 0xb7, 0x01, 0x5f, 0x47, 0xe4, 0xc9, 0xd6, 0x2b, 0x20, - 0x1b, 0xed, 0x5a, 0x6d, 0x1a, 0xf7, 0xa7, 0x3e, 0xbe, 0x33, 0xab, 0x56, 0xe5, 0xb0, 0x59, 0x15, - 0xf2, 0x0e, 0xdf, 0xdf, 0xb6, 0x24, 0x27, 0xe3, 0x4d, 0xf1, 0x1f, 0x37, 0x60, 0x64, 0xfd, 0x1d, - 0xd0, 0xd8, 0x39, 0x31, 0x66, 0x1f, 0x16, 0x4b, 0x6a, 0x9c, 0x2d, 0xa9, 0x71, 0xb1, 0xa4, 0xe8, - 0x73, 0x49, 0xd1, 0xd7, 0x92, 0xa2, 0xd3, 0x92, 0xa2, 0x45, 0x49, 0xd1, 0xcf, 0x92, 0xa2, 0x5f, - 0x25, 0x35, 0x2e, 0x4a, 0x8a, 0xbe, 0xac, 0xa8, 0xb1, 0x58, 0x51, 0xe3, 0x6c, 0x45, 0x8d, 0xf7, - 0x7b, 0x37, 0xbc, 0xba, 0xf6, 0xbf, 0xf3, 0x5e, 0xfd, 0xd9, 0xff, 0x1d, 0x00, 0x00, 0xff, 0xff, - 0x30, 0x14, 0xc6, 0xc9, 0x05, 0x04, 0x00, 0x00, + 0x10, 0xf6, 0x36, 0x21, 0x24, 0x1b, 0xfe, 0xb4, 0xa2, 0x95, 0x15, 0xa4, 0x8d, 0x95, 0x03, 0xcd, + 0x05, 0x5b, 0x4a, 0x85, 0xc4, 0x85, 0x4b, 0x2a, 0x51, 0xf5, 0x06, 0x5b, 0xc4, 0x01, 0x89, 0x83, + 0xe3, 0x8c, 0x7f, 0x14, 0x7b, 0xd7, 0xdd, 0x5d, 0x03, 0xbd, 0xf1, 0x08, 0xbc, 0x03, 0x17, 0x9e, + 0x80, 0x67, 0xe8, 0x31, 0xe2, 0x54, 0x71, 0xa8, 0x88, 0x73, 0xe1, 0xd8, 0x47, 0x40, 0x76, 0xe2, + 0xba, 0xa9, 0x40, 0x95, 0x38, 0x71, 0xf2, 0xee, 0xce, 0x37, 0xe3, 0x6f, 0xbe, 0x6f, 0x06, 0xf7, + 0xd3, 0x59, 0xe0, 0xc4, 0x22, 0x48, 0xa5, 0xd0, 0xc2, 0x99, 0xc4, 0x42, 0x24, 0x81, 0xab, 0xe1, + 0x83, 0x7b, 0x62, 0x97, 0x4f, 0xa4, 0x5d, 0x05, 0x7b, 0x0f, 0x03, 0x11, 0x88, 0x15, 0xae, 0x38, + 0xad, 0xe2, 0xbd, 0x47, 0x1b, 0x05, 0xaa, 0xc3, 0x2a, 0x38, 0xf8, 0xbe, 0x85, 0xb7, 0x5f, 0x44, + 0xb1, 0x06, 0xb9, 0x1f, 0x66, 0x7c, 0xc6, 0xc0, 0x67, 0x70, 0x9c, 0x81, 0xd2, 0x64, 0x1f, 0x37, + 0x7d, 0x29, 0x12, 0x13, 0x59, 0x68, 0xd8, 0x18, 0x3b, 0xa7, 0xe7, 0x7d, 0xe3, 0xc7, 0x79, 0x7f, + 0x37, 0x88, 0x74, 0x98, 0x4d, 0x6c, 0x4f, 0x24, 0x4e, 0x2a, 0x45, 0x02, 0x3a, 0x84, 0x4c, 0x39, + 0x9e, 0x48, 0x12, 0xc1, 0x9d, 0x44, 0x4c, 0x21, 0xb6, 0x5f, 0x47, 0x09, 0xb0, 0x32, 0x99, 0x1c, + 0xe2, 0xdb, 0x3a, 0x94, 0x22, 0x0b, 0x42, 0x73, 0xeb, 0xdf, 0xea, 0x54, 0xf9, 0xc4, 0xc6, 0x4d, + 0x09, 0xbe, 0x32, 0x1b, 0x56, 0x63, 0xd8, 0x1d, 0xf5, 0xec, 0xcb, 0x46, 0x0e, 0xa4, 0xc8, 0x52, + 0x98, 0x56, 0xfc, 0x15, 0x2b, 0x71, 0xc4, 0xc5, 0xcd, 0x34, 0x76, 0xb9, 0x79, 0xcb, 0x42, 0xc3, + 0xee, 0xe8, 0x5e, 0x8d, 0x7f, 0x19, 0xbb, 0x7c, 0xfc, 0x7c, 0xcd, 0xe3, 0xe9, 0x15, 0x1e, 0x81, + 0x74, 0x7d, 0x97, 0xbb, 0x4e, 0x2c, 0x66, 0x91, 0xf3, 0x7e, 0xcf, 0x29, 0x74, 0x3b, 0xce, 0x40, + 0x46, 0x20, 0x9d, 0xa2, 0x94, 0xfd, 0x2a, 0x03, 0x79, 0x52, 0xa4, 0xb3, 0xb2, 0x34, 0xd9, 0xc1, + 0xad, 0x49, 0x2c, 0xbc, 0x99, 0x32, 0x5b, 0x56, 0x63, 0xd8, 0x61, 0xeb, 0xdb, 0x80, 0xe1, 0x9d, + 0xeb, 0x9a, 0xaa, 0x54, 0x70, 0x05, 0xe4, 0x19, 0xee, 0x78, 0x15, 0x4f, 0x13, 0xdd, 0xd8, 0x49, + 0x0d, 0x1e, 0x7c, 0x43, 0xb8, 0x7d, 0x14, 0x0a, 0xa9, 0x19, 0xf8, 0xff, 0x9d, 0x37, 0x3d, 0xdc, + 0xf6, 0x42, 0xf0, 0x66, 0x2a, 0x4b, 0xcc, 0x86, 0x85, 0x86, 0x77, 0xd9, 0xe5, 0x7d, 0xf0, 0x05, + 0xe1, 0x07, 0xd7, 0x1b, 0x23, 0x16, 0xee, 0xfa, 0x11, 0x0f, 0x40, 0xa6, 0x32, 0xe2, 0xba, 0xec, + 0xa3, 0xc9, 0xae, 0x3e, 0x15, 0xda, 0x6a, 0xe0, 0x2e, 0xd7, 0x25, 0xb9, 0x0e, 0x5b, 0xdf, 0xc8, + 0xe3, 0x8d, 0x31, 0x20, 0xb5, 0x78, 0x95, 0x38, 0x6b, 0xfb, 0x9f, 0xe0, 0x56, 0xec, 0x4e, 0x20, + 0x56, 0x66, 0xb3, 0x1c, 0x80, 0xed, 0x1a, 0x79, 0xc8, 0xa7, 0xf0, 0xf1, 0xa8, 0xf0, 0x55, 0xb1, + 0x35, 0x68, 0xe4, 0xe3, 0x3b, 0xe3, 0x62, 0xb5, 0x0e, 0x56, 0xab, 0x45, 0xde, 0xe0, 0xfb, 0x9b, + 0x16, 0x2a, 0xd2, 0xaf, 0x2b, 0xfc, 0x71, 0x63, 0x7a, 0xd6, 0xdf, 0x01, 0x2b, 0xfb, 0x07, 0xc6, + 0xf8, 0xdd, 0x7c, 0x41, 0x8d, 0xb3, 0x05, 0x35, 0x2e, 0x16, 0x14, 0x7d, 0xca, 0x29, 0xfa, 0x9a, + 0x53, 0x74, 0x9a, 0x53, 0x34, 0xcf, 0x29, 0xfa, 0x99, 0x53, 0xf4, 0x2b, 0xa7, 0xc6, 0x45, 0x4e, + 0xd1, 0xe7, 0x25, 0x35, 0xe6, 0x4b, 0x6a, 0x9c, 0x2d, 0xa9, 0xf1, 0x76, 0xf7, 0x86, 0x29, 0xad, + 0xfe, 0x3b, 0x69, 0x95, 0x9f, 0xbd, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xcf, 0x05, 0x31, 0x08, + 0x35, 0x04, 0x00, 0x00, } func (this *FilterChunkRefRequest) Equal(that interface{}) bool { @@ -416,6 +426,9 @@ func (this *GroupedChunkRefs) Equal(that interface{}) bool { return false } } + if !this.Labels.Equal(that1.Labels) { + return false + } return true } func (this *FilterChunkRefRequest) GoString() string { @@ -462,13 +475,16 @@ func (this *GroupedChunkRefs) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 7) + s := make([]string, 0, 8) s = append(s, "&logproto.GroupedChunkRefs{") s = append(s, "Fingerprint: "+fmt.Sprintf("%#v", this.Fingerprint)+",\n") s = append(s, "Tenant: "+fmt.Sprintf("%#v", this.Tenant)+",\n") if this.Refs != nil { s = append(s, "Refs: "+fmt.Sprintf("%#v", this.Refs)+",\n") } + if this.Labels != nil { + s = append(s, "Labels: "+fmt.Sprintf("%#v", this.Labels)+",\n") + } s = append(s, "}") return strings.Join(s, "") } @@ -722,6 +738,18 @@ func (m *GroupedChunkRefs) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Labels != nil { + { + size, err := m.Labels.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintBloomgateway(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } if len(m.Refs) > 0 { for iNdEx := len(m.Refs) - 1; iNdEx >= 0; iNdEx-- { { @@ -843,6 +871,10 @@ func (m *GroupedChunkRefs) Size() (n int) { n += 1 + l + sovBloomgateway(uint64(l)) } } + if m.Labels != nil { + l = m.Labels.Size() + n += 1 + l + sovBloomgateway(uint64(l)) + } return n } @@ -911,6 +943,7 @@ func (this *GroupedChunkRefs) String() string { `Fingerprint:` + fmt.Sprintf("%v", this.Fingerprint) + `,`, `Tenant:` + fmt.Sprintf("%v", this.Tenant) + `,`, `Refs:` + repeatedStringForRefs + `,`, + `Labels:` + strings.Replace(fmt.Sprintf("%v", this.Labels), "IndexSeries", "IndexSeries", 1) + `,`, `}`, }, "") return s @@ -1424,6 +1457,42 @@ func (m *GroupedChunkRefs) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBloomgateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthBloomgateway + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthBloomgateway + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Labels == nil { + m.Labels = &IndexSeries{} + } + if err := m.Labels.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipBloomgateway(dAtA[iNdEx:]) diff --git a/pkg/logproto/bloomgateway.proto b/pkg/logproto/bloomgateway.proto index 0d80d865b32f4..9a8b935dc31c6 100644 --- a/pkg/logproto/bloomgateway.proto +++ b/pkg/logproto/bloomgateway.proto @@ -45,6 +45,7 @@ message GroupedChunkRefs { uint64 fingerprint = 1; string tenant = 2; repeated ShortRef refs = 3; + IndexSeries labels = 4; } service BloomGateway { diff --git a/pkg/storage/bloom/v1/ast_extractor.go b/pkg/storage/bloom/v1/ast_extractor.go index 4c59c93e937fb..98f8d1e6b3c81 100644 --- a/pkg/storage/bloom/v1/ast_extractor.go +++ b/pkg/storage/bloom/v1/ast_extractor.go @@ -27,30 +27,55 @@ type OrLabelMatcher struct{ Left, Right LabelMatcher } // if both of the Left and Right label matcher bloom tests pass. type AndLabelMatcher struct{ Left, Right LabelMatcher } +type seriesLabelNames map[string]struct{} + +func newSeriesLabelNames(series []labels.Labels) seriesLabelNames { + lblNames := make(seriesLabelNames, len(series)) + for _, lbls := range series { + for _, lbl := range lbls { + lblNames[lbl.Name] = struct{}{} + } + } + return lblNames +} + +func (s seriesLabelNames) contains(name string) bool { + _, ok := s[name] + return ok +} + // ExtractTestableLabelMatchers extracts label matchers from the label filters // in an expression. The resulting label matchers can then be used for testing // against bloom filters. Only label matchers before the first parse stage are // included. +// If series are provided, labels matchers that are part of the series are not included // // Unsupported LabelFilterExprs map to an UnsupportedLabelMatcher, for which // bloom tests should always pass. -func ExtractTestableLabelMatchers(expr syntax.Expr) []LabelMatcher { +func ExtractTestableLabelMatchers(expr syntax.Expr, series ...labels.Labels) []LabelMatcher { if expr == nil { return nil } filters := syntax.ExtractLabelFiltersBeforeParser(expr) - return buildLabelMatchers(filters) + + // Only check series labels if we pass in series. + var seriesLbs seriesLabelNames + if len(series) > 0 { + seriesLbs = newSeriesLabelNames(series) + } + + return buildLabelMatchers(seriesLbs, filters) } -func buildLabelMatchers(exprs []*syntax.LabelFilterExpr) []LabelMatcher { +func buildLabelMatchers(series seriesLabelNames, exprs []*syntax.LabelFilterExpr) []LabelMatcher { matchers := make([]LabelMatcher, 0, len(exprs)) for _, expr := range exprs { - matchers = append(matchers, buildLabelMatcher(expr.LabelFilterer)) + matchers = append(matchers, buildLabelMatcher(series, expr.LabelFilterer)) } return matchers } -func buildLabelMatcher(filter log.LabelFilterer) LabelMatcher { +func buildLabelMatcher(series seriesLabelNames, filter log.LabelFilterer) LabelMatcher { switch filter := filter.(type) { case *log.LineFilterLabelFilter: @@ -58,6 +83,11 @@ func buildLabelMatcher(filter log.LabelFilterer) LabelMatcher { return UnsupportedLabelMatcher{} } + // Do not filter by labels that are part of the series. + if series.contains(filter.Name) { + return UnsupportedLabelMatcher{} + } + return PlainLabelMatcher{ Key: filter.Name, Value: filter.Value, @@ -68,6 +98,11 @@ func buildLabelMatcher(filter log.LabelFilterer) LabelMatcher { return UnsupportedLabelMatcher{} } + // Do not filter by labels that are part of the series. + if series.contains(filter.Name) { + return UnsupportedLabelMatcher{} + } + return PlainLabelMatcher{ Key: filter.Name, Value: filter.Value, @@ -75,8 +110,8 @@ func buildLabelMatcher(filter log.LabelFilterer) LabelMatcher { case *log.BinaryLabelFilter: var ( - left = buildLabelMatcher(filter.Left) - right = buildLabelMatcher(filter.Right) + left = buildLabelMatcher(series, filter.Left) + right = buildLabelMatcher(series, filter.Right) ) if filter.And { diff --git a/pkg/storage/bloom/v1/ast_extractor_test.go b/pkg/storage/bloom/v1/ast_extractor_test.go index 856f0412c8a99..5cd1ca8e957ff 100644 --- a/pkg/storage/bloom/v1/ast_extractor_test.go +++ b/pkg/storage/bloom/v1/ast_extractor_test.go @@ -4,6 +4,7 @@ import ( "fmt" "testing" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logql/syntax" @@ -14,6 +15,7 @@ func TestExtractLabelMatchers(t *testing.T) { tt := []struct { name string input string + series []labels.Labels expect []v1.LabelMatcher }{ { @@ -24,6 +26,30 @@ func TestExtractLabelMatchers(t *testing.T) { }, }, + { + name: "basic label matcher in series", + input: `{app="foo"} | key="value"`, + series: []labels.Labels{ + labels.FromStrings("app", "foo", "bar", "baz"), + labels.FromStrings("app", "foo", "key", "other"), + }, + expect: []v1.LabelMatcher{ + v1.UnsupportedLabelMatcher{}, + }, + }, + + { + name: "basic label matcher not in series", + input: `{app="foo"} | key="value"`, + series: []labels.Labels{ + labels.FromStrings("app", "foo", "bar", "baz"), + labels.FromStrings("app", "foo", "env", "prod"), + }, + expect: []v1.LabelMatcher{ + v1.PlainLabelMatcher{Key: "key", Value: "value"}, + }, + }, + { name: "or label matcher", input: `{app="foo"} | key1="value1" or key2="value2"`, @@ -68,7 +94,7 @@ func TestExtractLabelMatchers(t *testing.T) { t.Run(tc.name, func(t *testing.T) { expr, err := syntax.ParseExpr(tc.input) require.NoError(t, err) - require.Equal(t, tc.expect, v1.ExtractTestableLabelMatchers(expr)) + require.Equal(t, tc.expect, v1.ExtractTestableLabelMatchers(expr, tc.series...)) }) } } From 20ab9b3f28a244a54ced250cacc527549a4354bb Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 30 Oct 2024 17:08:08 +0100 Subject: [PATCH 2/8] CR feedback --- pkg/bloomgateway/bloomgateway.go | 20 +----- pkg/bloomgateway/multiplexing.go | 1 + pkg/bloomgateway/querier.go | 19 ++---- pkg/bloomgateway/querier_test.go | 18 +++-- pkg/indexgateway/gateway.go | 11 ++- pkg/logproto/bloomgateway.pb.go | 10 +-- pkg/logproto/bloomgateway.proto | 2 + pkg/storage/bloom/v1/ast_extractor.go | 49 ++------------ pkg/storage/bloom/v1/ast_extractor_test.go | 28 +------- pkg/storage/bloom/v1/bloom_tester.go | 79 +++++++++++----------- pkg/storage/bloom/v1/bloom_tester_test.go | 27 ++++++-- pkg/storage/bloom/v1/fuse.go | 11 +-- pkg/storage/bloom/v1/fuse_test.go | 5 +- 13 files changed, 114 insertions(+), 166 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 936208c5672d0..c221af9e6d34c 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -19,7 +19,6 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/model/labels" "go.uber.org/atomic" iter "github.com/grafana/loki/v3/pkg/iter/v2" @@ -161,18 +160,6 @@ func (g *Gateway) stopping(_ error) error { return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr) } -func extractSeries(refs []*logproto.GroupedChunkRefs) []labels.Labels { - series := make([]labels.Labels, 0, len(refs)) - for _, ref := range refs { - if ref.Labels == nil { - continue - } - lbs := logproto.FromLabelAdaptersToLabels(ref.Labels.Labels) - series = append(series, lbs) - } - return series -} - // FilterChunkRefs implements BloomGatewayServer func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error) { tenantID, err := tenant.TenantID(ctx) @@ -206,12 +193,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return nil, errors.New("from time must not be after through time") } - // This time, we do pass the series to the ExtractTestableLabelMatchers - // To this point, we have called ExtractTestableLabelMatchers multiple times - // without the series (faster) just to return early if there are no filters expressions. - // We now need to be more precise and only extract matchers that do not - // match the series labels. - matchers := v1.ExtractTestableLabelMatchers(req.Plan.AST, extractSeries(req.Refs)...) + matchers := v1.ExtractTestableLabelMatchers(req.Plan.AST) stats.NumMatchers = len(matchers) g.metrics.receivedMatchers.Observe(float64(len(matchers))) diff --git a/pkg/bloomgateway/multiplexing.go b/pkg/bloomgateway/multiplexing.go index 7f929a1e19c6a..2394a72c36874 100644 --- a/pkg/bloomgateway/multiplexing.go +++ b/pkg/bloomgateway/multiplexing.go @@ -174,6 +174,7 @@ func (it *requestIterator) Next() bool { it.curr = v1.Request{ Recorder: it.recorder, Fp: model.Fingerprint(group.Fingerprint), + Labels: logproto.FromLabelAdaptersToLabels(group.Labels.Labels), Chks: convertToChunkRefs(group.Refs), Search: it.search, Response: it.channel, diff --git a/pkg/bloomgateway/querier.go b/pkg/bloomgateway/querier.go index d1d64d9c34b48..3cf6ce0eaf925 100644 --- a/pkg/bloomgateway/querier.go +++ b/pkg/bloomgateway/querier.go @@ -102,7 +102,7 @@ func convertToShortRef(ref *logproto.ChunkRef) *logproto.ShortRef { return &logproto.ShortRef{From: ref.From, Through: ref.Through, Checksum: ref.Checksum} } -func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series []labels.Labels, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) { +func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series map[uint64]labels.Labels, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) { // Shortcut that does not require any filtering if !bq.limits.BloomGatewayEnabled(tenant) || len(chunkRefs) == 0 || len(v1.ExtractTestableLabelMatchers(queryPlan.AST)) == 0 { return chunkRefs, nil @@ -226,14 +226,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from // groups them by fingerprint. // The second argument `grouped` can be used to pass a buffer to avoid allocations. // If it's nil, the returned slice will be allocated. -func groupChunkRefs(series []labels.Labels, chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs { - seriesFPs := make(map[uint64]*logproto.IndexSeries, len(series)) - for _, s := range series { - seriesFPs[s.Hash()] = &logproto.IndexSeries{ - Labels: logproto.FromLabelsToLabelAdapters(s), - } - } - +func groupChunkRefs(series map[uint64]labels.Labels, chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs { seen := make(map[uint64]int, len(grouped)) for _, chunkRef := range chunkRefs { if idx, found := seen[chunkRef.Fingerprint]; found { @@ -242,9 +235,11 @@ func groupChunkRefs(series []labels.Labels, chunkRefs []*logproto.ChunkRef, grou seen[chunkRef.Fingerprint] = len(grouped) grouped = append(grouped, &logproto.GroupedChunkRefs{ Fingerprint: chunkRef.Fingerprint, - Labels: seriesFPs[chunkRef.Fingerprint], - Tenant: chunkRef.UserID, - Refs: []*logproto.ShortRef{convertToShortRef(chunkRef)}, + Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(series[chunkRef.Fingerprint]), + }, + Tenant: chunkRef.UserID, + Refs: []*logproto.ShortRef{convertToShortRef(chunkRef)}, }) } } diff --git a/pkg/bloomgateway/querier_test.go b/pkg/bloomgateway/querier_test.go index daad565c1d06f..bb67c9f1bd178 100644 --- a/pkg/bloomgateway/querier_test.go +++ b/pkg/bloomgateway/querier_test.go @@ -81,7 +81,7 @@ func TestBloomQuerier(t *testing.T) { } expr, err := syntax.ParseExpr(`{foo="bar"}`) require.NoError(t, err) - res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr}) + res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, plan.QueryPlan{AST: expr}) require.NoError(t, err) require.Equal(t, chunkRefs, res) require.Equal(t, 0, c.callCount) @@ -97,7 +97,7 @@ func TestBloomQuerier(t *testing.T) { chunkRefs := []*logproto.ChunkRef{} expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`) require.NoError(t, err) - res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr}) + res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, plan.QueryPlan{AST: expr}) require.NoError(t, err) require.Equal(t, chunkRefs, res) require.Equal(t, 0, c.callCount) @@ -117,7 +117,7 @@ func TestBloomQuerier(t *testing.T) { } expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`) require.NoError(t, err) - res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr}) + res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, plan.QueryPlan{AST: expr}) require.Error(t, err) require.Nil(t, res) }) @@ -136,7 +136,7 @@ func TestBloomQuerier(t *testing.T) { } expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`) require.NoError(t, err) - res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr}) + res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, plan.QueryPlan{AST: expr}) require.NoError(t, err) require.Equal(t, chunkRefs, res) require.Equal(t, 2, c.callCount) @@ -150,6 +150,10 @@ func TestGroupChunkRefs(t *testing.T) { labels.FromStrings("app", "2"), labels.FromStrings("app", "3"), } + seriesMap := make(map[uint64]labels.Labels) + for _, s := range series { + seriesMap[s.Hash()] = s + } chunkRefs := []*logproto.ChunkRef{ {Fingerprint: series[0].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, @@ -160,7 +164,7 @@ func TestGroupChunkRefs(t *testing.T) { {Fingerprint: series[2].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")}, } - result := groupChunkRefs(series, chunkRefs, nil) + result := groupChunkRefs(seriesMap, chunkRefs, nil) require.Equal(t, []*logproto.GroupedChunkRefs{ {Fingerprint: series[0].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{ {From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")}, @@ -189,12 +193,12 @@ func BenchmarkGroupChunkRefs(b *testing.B) { n := 1000 // num series m := 10000 // num chunks per series chunkRefs := make([]*logproto.ChunkRef, 0, n*m) - series := make([]labels.Labels, 0, n) + series := make(map[uint64]labels.Labels, n) for i := 0; i < n; i++ { s := labels.FromStrings("app", fmt.Sprintf("%d", i)) sFP := s.Hash() - series = append(series, s) + series[sFP] = s for j := 0; j < m; j++ { chunkRefs = append(chunkRefs, &logproto.ChunkRef{ Fingerprint: sFP, diff --git a/pkg/indexgateway/gateway.go b/pkg/indexgateway/gateway.go index 1329692cb1988..785d59b60fab1 100644 --- a/pkg/indexgateway/gateway.go +++ b/pkg/indexgateway/gateway.go @@ -58,7 +58,7 @@ type IndexClientWithRange struct { } type BloomQuerier interface { - FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series []labels.Labels, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error) + FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series map[uint64]labels.Labels, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error) } type Gateway struct { @@ -225,12 +225,16 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ return nil, err } + series := make(map[uint64]labels.Labels) result = &logproto.GetChunkRefResponse{ Refs: make([]*logproto.ChunkRef, 0, len(chunks)), } for _, cs := range chunks { for i := range cs { result.Refs = append(result.Refs, &cs[i].ChunkRef) + if _, ok := series[cs[i].Fingerprint]; !ok { + series[cs[i].Fingerprint] = cs[i].Metric + } } } @@ -257,11 +261,6 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ return result, nil } - series, err := g.indexQuerier.GetSeries(ctx, instanceID, req.From, req.Through, matchers...) - if err != nil { - return nil, err - } - chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, series, result.Refs, req.Plan) if err != nil { return nil, err diff --git a/pkg/logproto/bloomgateway.pb.go b/pkg/logproto/bloomgateway.pb.go index bc91671d6424f..e4841b8ae0bd9 100644 --- a/pkg/logproto/bloomgateway.pb.go +++ b/pkg/logproto/bloomgateway.pb.go @@ -175,10 +175,12 @@ func (m *ShortRef) GetChecksum() uint32 { } type GroupedChunkRefs struct { - Fingerprint uint64 `protobuf:"varint,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` - Tenant string `protobuf:"bytes,2,opt,name=tenant,proto3" json:"tenant,omitempty"` - Refs []*ShortRef `protobuf:"bytes,3,rep,name=refs,proto3" json:"refs,omitempty"` - Labels *IndexSeries `protobuf:"bytes,4,opt,name=labels,proto3" json:"labels,omitempty"` + Fingerprint uint64 `protobuf:"varint,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` + Tenant string `protobuf:"bytes,2,opt,name=tenant,proto3" json:"tenant,omitempty"` + Refs []*ShortRef `protobuf:"bytes,3,rep,name=refs,proto3" json:"refs,omitempty"` + // Labels are only populated on FilterChunkRefRequest. They are not returned on FilterChunkRefResponse + // TODO(salvacorts): Consider two different messages for FilterChunkRefRequest and FilterChunkRefResponse + Labels *IndexSeries `protobuf:"bytes,4,opt,name=labels,proto3" json:"labels,omitempty"` } func (m *GroupedChunkRefs) Reset() { *m = GroupedChunkRefs{} } diff --git a/pkg/logproto/bloomgateway.proto b/pkg/logproto/bloomgateway.proto index 9a8b935dc31c6..809afb90b7b5a 100644 --- a/pkg/logproto/bloomgateway.proto +++ b/pkg/logproto/bloomgateway.proto @@ -45,6 +45,8 @@ message GroupedChunkRefs { uint64 fingerprint = 1; string tenant = 2; repeated ShortRef refs = 3; + // Labels are only populated on FilterChunkRefRequest. They are not returned on FilterChunkRefResponse + // TODO(salvacorts): Consider two different messages for FilterChunkRefRequest and FilterChunkRefResponse IndexSeries labels = 4; } diff --git a/pkg/storage/bloom/v1/ast_extractor.go b/pkg/storage/bloom/v1/ast_extractor.go index 98f8d1e6b3c81..4c59c93e937fb 100644 --- a/pkg/storage/bloom/v1/ast_extractor.go +++ b/pkg/storage/bloom/v1/ast_extractor.go @@ -27,55 +27,30 @@ type OrLabelMatcher struct{ Left, Right LabelMatcher } // if both of the Left and Right label matcher bloom tests pass. type AndLabelMatcher struct{ Left, Right LabelMatcher } -type seriesLabelNames map[string]struct{} - -func newSeriesLabelNames(series []labels.Labels) seriesLabelNames { - lblNames := make(seriesLabelNames, len(series)) - for _, lbls := range series { - for _, lbl := range lbls { - lblNames[lbl.Name] = struct{}{} - } - } - return lblNames -} - -func (s seriesLabelNames) contains(name string) bool { - _, ok := s[name] - return ok -} - // ExtractTestableLabelMatchers extracts label matchers from the label filters // in an expression. The resulting label matchers can then be used for testing // against bloom filters. Only label matchers before the first parse stage are // included. -// If series are provided, labels matchers that are part of the series are not included // // Unsupported LabelFilterExprs map to an UnsupportedLabelMatcher, for which // bloom tests should always pass. -func ExtractTestableLabelMatchers(expr syntax.Expr, series ...labels.Labels) []LabelMatcher { +func ExtractTestableLabelMatchers(expr syntax.Expr) []LabelMatcher { if expr == nil { return nil } filters := syntax.ExtractLabelFiltersBeforeParser(expr) - - // Only check series labels if we pass in series. - var seriesLbs seriesLabelNames - if len(series) > 0 { - seriesLbs = newSeriesLabelNames(series) - } - - return buildLabelMatchers(seriesLbs, filters) + return buildLabelMatchers(filters) } -func buildLabelMatchers(series seriesLabelNames, exprs []*syntax.LabelFilterExpr) []LabelMatcher { +func buildLabelMatchers(exprs []*syntax.LabelFilterExpr) []LabelMatcher { matchers := make([]LabelMatcher, 0, len(exprs)) for _, expr := range exprs { - matchers = append(matchers, buildLabelMatcher(series, expr.LabelFilterer)) + matchers = append(matchers, buildLabelMatcher(expr.LabelFilterer)) } return matchers } -func buildLabelMatcher(series seriesLabelNames, filter log.LabelFilterer) LabelMatcher { +func buildLabelMatcher(filter log.LabelFilterer) LabelMatcher { switch filter := filter.(type) { case *log.LineFilterLabelFilter: @@ -83,11 +58,6 @@ func buildLabelMatcher(series seriesLabelNames, filter log.LabelFilterer) LabelM return UnsupportedLabelMatcher{} } - // Do not filter by labels that are part of the series. - if series.contains(filter.Name) { - return UnsupportedLabelMatcher{} - } - return PlainLabelMatcher{ Key: filter.Name, Value: filter.Value, @@ -98,11 +68,6 @@ func buildLabelMatcher(series seriesLabelNames, filter log.LabelFilterer) LabelM return UnsupportedLabelMatcher{} } - // Do not filter by labels that are part of the series. - if series.contains(filter.Name) { - return UnsupportedLabelMatcher{} - } - return PlainLabelMatcher{ Key: filter.Name, Value: filter.Value, @@ -110,8 +75,8 @@ func buildLabelMatcher(series seriesLabelNames, filter log.LabelFilterer) LabelM case *log.BinaryLabelFilter: var ( - left = buildLabelMatcher(series, filter.Left) - right = buildLabelMatcher(series, filter.Right) + left = buildLabelMatcher(filter.Left) + right = buildLabelMatcher(filter.Right) ) if filter.And { diff --git a/pkg/storage/bloom/v1/ast_extractor_test.go b/pkg/storage/bloom/v1/ast_extractor_test.go index 5cd1ca8e957ff..856f0412c8a99 100644 --- a/pkg/storage/bloom/v1/ast_extractor_test.go +++ b/pkg/storage/bloom/v1/ast_extractor_test.go @@ -4,7 +4,6 @@ import ( "fmt" "testing" - "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logql/syntax" @@ -15,7 +14,6 @@ func TestExtractLabelMatchers(t *testing.T) { tt := []struct { name string input string - series []labels.Labels expect []v1.LabelMatcher }{ { @@ -26,30 +24,6 @@ func TestExtractLabelMatchers(t *testing.T) { }, }, - { - name: "basic label matcher in series", - input: `{app="foo"} | key="value"`, - series: []labels.Labels{ - labels.FromStrings("app", "foo", "bar", "baz"), - labels.FromStrings("app", "foo", "key", "other"), - }, - expect: []v1.LabelMatcher{ - v1.UnsupportedLabelMatcher{}, - }, - }, - - { - name: "basic label matcher not in series", - input: `{app="foo"} | key="value"`, - series: []labels.Labels{ - labels.FromStrings("app", "foo", "bar", "baz"), - labels.FromStrings("app", "foo", "env", "prod"), - }, - expect: []v1.LabelMatcher{ - v1.PlainLabelMatcher{Key: "key", Value: "value"}, - }, - }, - { name: "or label matcher", input: `{app="foo"} | key1="value1" or key2="value2"`, @@ -94,7 +68,7 @@ func TestExtractLabelMatchers(t *testing.T) { t.Run(tc.name, func(t *testing.T) { expr, err := syntax.ParseExpr(tc.input) require.NoError(t, err) - require.Equal(t, tc.expect, v1.ExtractTestableLabelMatchers(expr, tc.series...)) + require.Equal(t, tc.expect, v1.ExtractTestableLabelMatchers(expr)) }) } } diff --git a/pkg/storage/bloom/v1/bloom_tester.go b/pkg/storage/bloom/v1/bloom_tester.go index 6fe00a4cd1730..74607b0ff70b6 100644 --- a/pkg/storage/bloom/v1/bloom_tester.go +++ b/pkg/storage/bloom/v1/bloom_tester.go @@ -4,28 +4,30 @@ import ( "fmt" "unsafe" + "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" ) type BloomTest interface { - Matches(bloom filter.Checker) bool - MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool + Matches(series labels.Labels, bloom filter.Checker) bool + MatchesWithPrefixBuf(series labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool } type BloomTests []BloomTest -func (b BloomTests) Matches(bloom filter.Checker) bool { +func (b BloomTests) Matches(series labels.Labels, bloom filter.Checker) bool { for _, test := range b { - if !test.Matches(bloom) { + if !test.Matches(series, bloom) { return false } } return true } -func (b BloomTests) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { +func (b BloomTests) MatchesWithPrefixBuf(series labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool { for _, test := range b { - if !test.MatchesWithPrefixBuf(bloom, buf, prefixLen) { + if !test.MatchesWithPrefixBuf(series, bloom, buf, prefixLen) { return false } } @@ -37,12 +39,12 @@ type matchAllTest struct{} var MatchAll = matchAllTest{} // Matches implements BloomTest -func (n matchAllTest) Matches(_ filter.Checker) bool { +func (n matchAllTest) Matches(_ labels.Labels, _ filter.Checker) bool { return true } // MatchesWithPrefixBuf implements BloomTest -func (n matchAllTest) MatchesWithPrefixBuf(_ filter.Checker, _ []byte, _ int) bool { +func (n matchAllTest) MatchesWithPrefixBuf(_ labels.Labels, _ filter.Checker, _ []byte, _ int) bool { return true } @@ -72,13 +74,13 @@ func newOrTest(left, right BloomTest) orTest { } // Matches implements BloomTest -func (o orTest) Matches(bloom filter.Checker) bool { - return o.left.Matches(bloom) || o.right.Matches(bloom) +func (o orTest) Matches(series labels.Labels, bloom filter.Checker) bool { + return o.left.Matches(series, bloom) || o.right.Matches(series, bloom) } // MatchesWithPrefixBuf implements BloomTest -func (o orTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { - return o.left.MatchesWithPrefixBuf(bloom, buf, prefixLen) || o.right.MatchesWithPrefixBuf(bloom, buf, prefixLen) +func (o orTest) MatchesWithPrefixBuf(series labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool { + return o.left.MatchesWithPrefixBuf(series, bloom, buf, prefixLen) || o.right.MatchesWithPrefixBuf(series, bloom, buf, prefixLen) } type andTest struct { @@ -93,13 +95,13 @@ func newAndTest(left, right BloomTest) andTest { } // Matches implements BloomTest -func (a andTest) Matches(bloom filter.Checker) bool { - return a.left.Matches(bloom) && a.right.Matches(bloom) +func (a andTest) Matches(series labels.Labels, bloom filter.Checker) bool { + return a.left.Matches(series, bloom) && a.right.Matches(series, bloom) } // MatchesWithPrefixBuf implements BloomTest -func (a andTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { - return a.left.MatchesWithPrefixBuf(bloom, buf, prefixLen) && a.right.MatchesWithPrefixBuf(bloom, buf, prefixLen) +func (a andTest) MatchesWithPrefixBuf(series labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool { + return a.left.MatchesWithPrefixBuf(series, bloom, buf, prefixLen) && a.right.MatchesWithPrefixBuf(series, bloom, buf, prefixLen) } func LabelMatchersToBloomTest(matchers ...LabelMatcher) BloomTest { @@ -144,7 +146,7 @@ func newStringMatcherTest(matcher PlainLabelMatcher) stringMatcherTest { return stringMatcherTest{matcher: matcher} } -func (sm stringMatcherTest) Matches(bloom filter.Checker) bool { +func (sm stringMatcherTest) Matches(series labels.Labels, bloom filter.Checker) bool { // TODO(rfratto): reintroduce the use of a shared tokenizer here to avoid // desyncing between how tokens are passed during building vs passed during // querying. @@ -161,20 +163,10 @@ func (sm stringMatcherTest) Matches(bloom filter.Checker) bool { rawCombined = unsafe.Slice(unsafe.StringData(combined), len(combined)) ) - if !bloom.Test(rawKey) { - // The structured metadata key wasn't indexed. However, sm.matcher might be - // checking against a label which *does* exist, so we can't safely filter - // out this chunk. - // - // TODO(rfratto): The negative test here is a bit confusing, and the key - // presence test should likely be done higher up within FuseQuerier. - return true - } - - return bloom.Test(rawCombined) + return sm.match(series, bloom, rawKey, rawCombined) } -func (sm stringMatcherTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { +func (sm stringMatcherTest) MatchesWithPrefixBuf(series labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool { var ( combined = fmt.Sprintf("%s=%s", sm.matcher.Key, sm.matcher.Value) @@ -182,17 +174,28 @@ func (sm stringMatcherTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byt prefixedCombined = appendToBuf(buf, prefixLen, combined) ) - if !bloom.Test(prefixedKey) { - // The structured metadata key wasn't indexed for a prefix. However, - // sm.matcher might be checking against a label which *does* exist, so we - // can't safely filter out this chunk. - // - // TODO(rfratto): The negative test here is a bit confusing, and the key - // presence test should likely be done higher up within FuseQuerier. - return true + return sm.match(series, bloom, prefixedKey, prefixedCombined) +} + +func (sm stringMatcherTest) match(series labels.Labels, bloom filter.Checker, key []byte, combined []byte) bool { + // If the label is part of the series, we cannot check the bloom since + // the label is not structured metadata + if value := series.Get(sm.matcher.Key); value != "" { + // If the series label value is the same as the matcher value, we cannot filter out this chunk. + // Otherwise, we can filter out this chunk. + // E.g. `{env="prod"} | env="prod"` should not filter out the chunk. + // E.g. `{env="prod"} | env="dev"` should filter out the chunk. + // E.g. `{env="prod"} | env=""` should filter out the chunk. + return value == sm.matcher.Value + } + + // To this point we know the label is structured metadata so if the label name is not + // in the bloom, we can filter out the chunk. + if !bloom.Test(key) { + return false } - return bloom.Test(prefixedCombined) + return bloom.Test(combined) } // appendToBuf is the equivalent of append(buf[:prefixLen], str). len(buf) must diff --git a/pkg/storage/bloom/v1/bloom_tester_test.go b/pkg/storage/bloom/v1/bloom_tester_test.go index 7a314872cc86e..3afa59109c5f7 100644 --- a/pkg/storage/bloom/v1/bloom_tester_test.go +++ b/pkg/storage/bloom/v1/bloom_tester_test.go @@ -3,6 +3,7 @@ package v1 import ( "testing" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logql/syntax" @@ -23,6 +24,7 @@ func TestLabelMatchersToBloomTest(t *testing.T) { ) ) + series := labels.FromStrings("app", "fake") tt := []struct { name string query string @@ -54,15 +56,30 @@ func TestLabelMatchersToBloomTest(t *testing.T) { match: false, }, { - name: "ignore non-indexed key", + name: "filter non-indexed key", query: `{app="fake"} | noexist="noexist"`, - match: true, + match: false, }, { - name: "ignore non-indexed key with empty value", + name: "filter non-indexed key with empty value", query: `{app="fake"} | noexist=""`, + match: false, + }, + { + name: "filter series label with different value", + query: `{app="fake"} | app="noexist"`, + match: false, + }, + { + name: "ignore label from series", + query: `{app="fake"} | app="fake"`, match: true, }, + { + name: "ignore label from series with empty value", + query: `{app="fake"} | app=""`, + match: false, + }, { name: "ignore unsupported operator", query: `{app="fake"} | trace_id=~".*noexist.*"`, @@ -99,8 +116,8 @@ func TestLabelMatchersToBloomTest(t *testing.T) { bloomTest := LabelMatchersToBloomTest(matchers...) // .Matches and .MatchesWithPrefixBuf should both have the same result. - require.Equal(t, tc.match, bloomTest.Matches(bloom)) - require.Equal(t, tc.match, bloomTest.MatchesWithPrefixBuf(bloom, []byte(prefix), len(prefix))) + require.Equal(t, tc.match, bloomTest.Matches(series, bloom)) + require.Equal(t, tc.match, bloomTest.MatchesWithPrefixBuf(series, bloom, []byte(prefix), len(prefix))) }) } } diff --git a/pkg/storage/bloom/v1/fuse.go b/pkg/storage/bloom/v1/fuse.go index b25743ea4c541..ea28a5534bc49 100644 --- a/pkg/storage/bloom/v1/fuse.go +++ b/pkg/storage/bloom/v1/fuse.go @@ -7,6 +7,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "go.uber.org/atomic" iter "github.com/grafana/loki/v3/pkg/iter/v2" @@ -15,6 +16,7 @@ import ( type Request struct { Fp model.Fingerprint + Labels labels.Labels Chks ChunkRefs Search BloomTest Response chan<- Output @@ -252,6 +254,7 @@ func (fq *FusedQuerier) Run() error { nextBatch := fq.inputs.At() fp := nextBatch[0].Fp + lbs := nextBatch[0].Labels // advance the series iterator to the next fingerprint if err := fq.bq.Seek(fp); err != nil { @@ -276,13 +279,13 @@ func (fq *FusedQuerier) Run() error { continue } - fq.runSeries(schema, series, nextBatch) + fq.runSeries(schema, lbs, series, nextBatch) } return nil } -func (fq *FusedQuerier) runSeries(_ Schema, series *SeriesWithMeta, reqs []Request) { +func (fq *FusedQuerier) runSeries(_ Schema, lbs labels.Labels, series *SeriesWithMeta, reqs []Request) { // For a given chunk|series to be removed, it must fail to match all blooms. // Because iterating/loading blooms can be expensive, we iterate blooms one at a time, collecting // the removals (failures) for each (bloom, chunk) pair. @@ -372,7 +375,7 @@ func (fq *FusedQuerier) runSeries(_ Schema, series *SeriesWithMeta, reqs []Reque // shortcut: series level removal // we can skip testing chunk keys individually if the bloom doesn't match // the query. - if !req.Search.Matches(bloom) { + if !req.Search.Matches(lbs, bloom) { // Nothing else needs to be done for this (bloom, request); // check the next input request continue @@ -387,7 +390,7 @@ func (fq *FusedQuerier) runSeries(_ Schema, series *SeriesWithMeta, reqs []Reque // TODO(rfratto): reuse buffer between multiple calls to // prefixForChunkRef and MatchesWithPrefixBuf to avoid allocations. tokenBuf := prefixForChunkRef(chk) - if matched := req.Search.MatchesWithPrefixBuf(bloom, tokenBuf, len(tokenBuf)); matched { + if matched := req.Search.MatchesWithPrefixBuf(lbs, bloom, tokenBuf, len(tokenBuf)); matched { inputs[j].found[k] = true } } diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index 3283dc6ccdb59..9d3e56236f8b4 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/concurrency" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/compression" @@ -27,12 +28,12 @@ var BloomPagePool = mempool.New("test", []mempool.Bucket{ type singleKeyTest []byte // Matches implements BloomTest. -func (s singleKeyTest) Matches(bloom filter.Checker) bool { +func (s singleKeyTest) Matches(_ labels.Labels, bloom filter.Checker) bool { return bloom.Test(s) } // MatchesWithPrefixBuf implements BloomTest. -func (s singleKeyTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { +func (s singleKeyTest) MatchesWithPrefixBuf(_ labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool { return bloom.Test(append(buf[:prefixLen], s...)) } From 45a81567cd9a3298923a4919267d89aa909902c8 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 30 Oct 2024 17:31:59 +0100 Subject: [PATCH 3/8] fix tests --- pkg/bloomgateway/bloomgateway_test.go | 19 ++++++++++++++++++- pkg/bloomgateway/multiplexing_test.go | 9 +++++++++ pkg/bloomgateway/querier_test.go | 8 ++++---- pkg/bloomgateway/util.go | 1 + 4 files changed, 32 insertions(+), 5 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 20f1c7eba162a..344657c9946b2 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -15,6 +15,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logproto" @@ -41,7 +42,15 @@ func stringSlice[T fmt.Stringer](s []T) []string { func groupRefs(t *testing.T, chunkRefs []*logproto.ChunkRef) []*logproto.GroupedChunkRefs { t.Helper() - return groupChunkRefs(nil, chunkRefs, nil) + grouped := groupChunkRefs(nil, chunkRefs, nil) + // Put fake labels to the series + for i, g := range grouped { + g.Labels = &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", fmt.Sprintf("%d", i))), + } + } + + return grouped } func newLimits() *validation.Overrides { @@ -295,12 +304,18 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { {Fingerprint: 1000, Tenant: tenantID, Refs: []*logproto.ShortRef{ {From: 1696248000000, Through: 1696251600000, Checksum: 2}, {From: 1696244400000, Through: 1696248000000, Checksum: 4}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "0")), }}, {Fingerprint: 2000, Tenant: tenantID, Refs: []*logproto.ShortRef{ {From: 1696255200000, Through: 1696258800000, Checksum: 3}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "1")), }}, {Fingerprint: 3000, Tenant: tenantID, Refs: []*logproto.ShortRef{ {From: 1696240800000, Through: 1696244400000, Checksum: 1}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "2")), }}, }, }, res) @@ -405,6 +420,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { // see MkBasicSeriesWithBlooms() in pkg/storage/bloom/v1/test_util.go rnd := rand.Intn(len(inputChunkRefs)) fp := inputChunkRefs[rnd].Fingerprint + lbs := inputChunkRefs[rnd].Labels chks := inputChunkRefs[rnd].Refs key := fmt.Sprintf("%s:%04x", model.Fingerprint(fp), 0) @@ -428,6 +444,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { ChunkRefs: []*logproto.GroupedChunkRefs{ { Fingerprint: fp, + Labels: lbs, Refs: chks, Tenant: tenantID, }, diff --git a/pkg/bloomgateway/multiplexing_test.go b/pkg/bloomgateway/multiplexing_test.go index d290817ef4be7..bfaa168fa4f5d 100644 --- a/pkg/bloomgateway/multiplexing_test.go +++ b/pkg/bloomgateway/multiplexing_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" v2 "github.com/grafana/loki/v3/pkg/iter/v2" @@ -73,6 +74,8 @@ func TestTask_RequestIterator(t *testing.T) { Refs: []*logproto.GroupedChunkRefs{ {Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{ {From: ts.Add(-3 * time.Hour), Through: ts.Add(-2 * time.Hour), Checksum: 100}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "100")), }}, }, } @@ -83,9 +86,13 @@ func TestTask_RequestIterator(t *testing.T) { Refs: []*logproto.GroupedChunkRefs{ {Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{ {From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 200}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "100")), }}, {Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{ {From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 300}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "200")), }}, }, } @@ -96,6 +103,8 @@ func TestTask_RequestIterator(t *testing.T) { Refs: []*logproto.GroupedChunkRefs{ {Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{ {From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 400}, + }, Labels: &logproto.IndexSeries{ + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "200")), }}, }, } diff --git a/pkg/bloomgateway/querier_test.go b/pkg/bloomgateway/querier_test.go index bb67c9f1bd178..46875ceeaf126 100644 --- a/pkg/bloomgateway/querier_test.go +++ b/pkg/bloomgateway/querier_test.go @@ -81,7 +81,7 @@ func TestBloomQuerier(t *testing.T) { } expr, err := syntax.ParseExpr(`{foo="bar"}`) require.NoError(t, err) - res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, plan.QueryPlan{AST: expr}) + res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr}) require.NoError(t, err) require.Equal(t, chunkRefs, res) require.Equal(t, 0, c.callCount) @@ -97,7 +97,7 @@ func TestBloomQuerier(t *testing.T) { chunkRefs := []*logproto.ChunkRef{} expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`) require.NoError(t, err) - res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, plan.QueryPlan{AST: expr}) + res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr}) require.NoError(t, err) require.Equal(t, chunkRefs, res) require.Equal(t, 0, c.callCount) @@ -117,7 +117,7 @@ func TestBloomQuerier(t *testing.T) { } expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`) require.NoError(t, err) - res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, plan.QueryPlan{AST: expr}) + res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr}) require.Error(t, err) require.Nil(t, res) }) @@ -136,7 +136,7 @@ func TestBloomQuerier(t *testing.T) { } expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`) require.NoError(t, err) - res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, plan.QueryPlan{AST: expr}) + res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr}) require.NoError(t, err) require.Equal(t, chunkRefs, res) require.Equal(t, 2, c.callCount) diff --git a/pkg/bloomgateway/util.go b/pkg/bloomgateway/util.go index 868fdf2ac6ed5..21803d9c84dab 100644 --- a/pkg/bloomgateway/util.go +++ b/pkg/bloomgateway/util.go @@ -110,6 +110,7 @@ func partitionSeriesByDay(from, through model.Time, seriesWithChunks []*logproto res = append(res, &logproto.GroupedChunkRefs{ Fingerprint: series.Fingerprint, + Labels: series.Labels, Tenant: series.Tenant, Refs: relevantChunks, }) From 8e04c4ca38f72e7ac8fad230bd5e19a9d0a43aef Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Thu, 31 Oct 2024 09:21:35 +0100 Subject: [PATCH 4/8] fix tests --- pkg/bloomgateway/bloomgateway_test.go | 10 +++++----- pkg/bloomgateway/cache.go | 1 + pkg/bloomgateway/client.go | 1 + pkg/logproto/compat.go | 1 + 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 344657c9946b2..63657ce5b5e4d 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -44,9 +44,9 @@ func groupRefs(t *testing.T, chunkRefs []*logproto.ChunkRef) []*logproto.Grouped t.Helper() grouped := groupChunkRefs(nil, chunkRefs, nil) // Put fake labels to the series - for i, g := range grouped { + for _, g := range grouped { g.Labels = &logproto.IndexSeries{ - Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", fmt.Sprintf("%d", i))), + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", fmt.Sprintf("%d", g.Fingerprint))), } } @@ -305,17 +305,17 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { {From: 1696248000000, Through: 1696251600000, Checksum: 2}, {From: 1696244400000, Through: 1696248000000, Checksum: 4}, }, Labels: &logproto.IndexSeries{ - Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "0")), + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "1000")), }}, {Fingerprint: 2000, Tenant: tenantID, Refs: []*logproto.ShortRef{ {From: 1696255200000, Through: 1696258800000, Checksum: 3}, }, Labels: &logproto.IndexSeries{ - Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "1")), + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "2000")), }}, {Fingerprint: 3000, Tenant: tenantID, Refs: []*logproto.ShortRef{ {From: 1696240800000, Through: 1696244400000, Checksum: 1}, }, Labels: &logproto.IndexSeries{ - Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "2")), + Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "3000")), }}, }, }, res) diff --git a/pkg/bloomgateway/cache.go b/pkg/bloomgateway/cache.go index cec615b393fd7..77d5168ca303a 100644 --- a/pkg/bloomgateway/cache.go +++ b/pkg/bloomgateway/cache.go @@ -72,6 +72,7 @@ func (e extractor) Extract(start, end int64, r resultscache.Response, _, _ int64 if len(refs) > 0 { chunkRefs = append(chunkRefs, &logproto.GroupedChunkRefs{ Fingerprint: chunkRef.Fingerprint, + Labels: chunkRef.Labels, Tenant: chunkRef.Tenant, Refs: refs, }) diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index 9163e91e8756e..23f7026d00706 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -322,6 +322,7 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh } return &logproto.GroupedChunkRefs{ Fingerprint: a.Fingerprint, + Labels: a.Labels, Tenant: a.Tenant, Refs: mergeChunkSets(a.Refs, b.Refs), } diff --git a/pkg/logproto/compat.go b/pkg/logproto/compat.go index 405b29cc6148d..69ffe4dece3ca 100644 --- a/pkg/logproto/compat.go +++ b/pkg/logproto/compat.go @@ -391,6 +391,7 @@ func (m *FilterChunkRefRequest) WithStartEndForCache(start, end time.Time) resul if len(refs) > 0 { chunkRefs = append(chunkRefs, &GroupedChunkRefs{ Fingerprint: chunkRef.Fingerprint, + Labels: chunkRef.Labels, Tenant: chunkRef.Tenant, Refs: refs, }) From f0209fd57c43317fc76c1ad1be1328bd2927718a Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Thu, 31 Oct 2024 11:44:06 +0100 Subject: [PATCH 5/8] Fix bloom checker --- pkg/storage/bloom/v1/bloom_tester.go | 40 +++++++++-------------- pkg/storage/bloom/v1/bloom_tester_test.go | 29 +++++++++++++--- 2 files changed, 40 insertions(+), 29 deletions(-) diff --git a/pkg/storage/bloom/v1/bloom_tester.go b/pkg/storage/bloom/v1/bloom_tester.go index 74607b0ff70b6..44c9bfefb882c 100644 --- a/pkg/storage/bloom/v1/bloom_tester.go +++ b/pkg/storage/bloom/v1/bloom_tester.go @@ -157,45 +157,37 @@ func (sm stringMatcherTest) Matches(series labels.Labels, bloom filter.Checker) // 2. It should be possible to test for just the key var ( - combined = fmt.Sprintf("%s=%s", sm.matcher.Key, sm.matcher.Value) - - rawKey = unsafe.Slice(unsafe.StringData(sm.matcher.Key), len(sm.matcher.Key)) + combined = fmt.Sprintf("%s=%s", sm.matcher.Key, sm.matcher.Value) rawCombined = unsafe.Slice(unsafe.StringData(combined), len(combined)) ) - return sm.match(series, bloom, rawKey, rawCombined) + return sm.match(series, bloom, rawCombined) } func (sm stringMatcherTest) MatchesWithPrefixBuf(series labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool { var ( - combined = fmt.Sprintf("%s=%s", sm.matcher.Key, sm.matcher.Value) - - prefixedKey = appendToBuf(buf, prefixLen, sm.matcher.Key) + combined = fmt.Sprintf("%s=%s", sm.matcher.Key, sm.matcher.Value) prefixedCombined = appendToBuf(buf, prefixLen, combined) ) - return sm.match(series, bloom, prefixedKey, prefixedCombined) + return sm.match(series, bloom, prefixedCombined) } -func (sm stringMatcherTest) match(series labels.Labels, bloom filter.Checker, key []byte, combined []byte) bool { - // If the label is part of the series, we cannot check the bloom since - // the label is not structured metadata - if value := series.Get(sm.matcher.Key); value != "" { - // If the series label value is the same as the matcher value, we cannot filter out this chunk. - // Otherwise, we can filter out this chunk. - // E.g. `{env="prod"} | env="prod"` should not filter out the chunk. - // E.g. `{env="prod"} | env="dev"` should filter out the chunk. - // E.g. `{env="prod"} | env=""` should filter out the chunk. - return value == sm.matcher.Value +// match returns true if the series matches the matcher or is in the bloom filter. +func (sm stringMatcherTest) match(series labels.Labels, bloom filter.Checker, combined []byte) bool { + // If we don't have the series labels, we cannot disambiguate which labels come from the series in which case + // we may filter out chunks for queries like `{env="prod"} | env="prod"` if env=prod is not structured metadata + if len(series) == 0 { + return true } - // To this point we know the label is structured metadata so if the label name is not - // in the bloom, we can filter out the chunk. - if !bloom.Test(key) { - return false - } + // It's in the series if the key is set and has the same value. + // By checking val != "" we handle `{env="prod"} | user=""`. + val := series.Get(sm.matcher.Key) + inSeries := val != "" && val == sm.matcher.Value - return bloom.Test(combined) + inBloom := bloom.Test(combined) + return inSeries || inBloom } // appendToBuf is the equivalent of append(buf[:prefixLen], str). len(buf) must diff --git a/pkg/storage/bloom/v1/bloom_tester_test.go b/pkg/storage/bloom/v1/bloom_tester_test.go index 3afa59109c5f7..4d98c4e83fc1c 100644 --- a/pkg/storage/bloom/v1/bloom_tester_test.go +++ b/pkg/storage/bloom/v1/bloom_tester_test.go @@ -21,10 +21,11 @@ func TestLabelMatchersToBloomTest(t *testing.T) { tokenizer, push.LabelAdapter{Name: "trace_id", Value: "exists_1"}, push.LabelAdapter{Name: "trace_id", Value: "exists_2"}, + push.LabelAdapter{Name: "app", Value: "other"}, ) ) - series := labels.FromStrings("app", "fake") + series := labels.FromStrings("env", "prod", "app", "fake") tt := []struct { name string query string @@ -66,15 +67,33 @@ func TestLabelMatchersToBloomTest(t *testing.T) { match: false, }, { - name: "filter series label with different value", - query: `{app="fake"} | app="noexist"`, + name: "ignore label from series", + query: `{app="fake"} | env="prod"`, + match: true, + }, + { + name: "filter label from series", + query: `{app="fake"} | env="dev"`, // env is set to prod in the series match: false, }, + // We cannot support this test case until we can forward a list of structured metadata fields. + // We cannot check if the key is structured metadata using the bloom because these are probabilistic + // E.g. bloom.Test("env") may return true even if env is not structured metadata. + //{ + // name: "filter label from series overridden by structured metadata", + // query: `{app="fake"} | app="fake"`, // app is set to other in the structured metadata + // match: false, + //}, { - name: "ignore label from series", - query: `{app="fake"} | app="fake"`, + name: "ignore label from series and structured metadata", + query: `{app="fake"} | app="other"`, match: true, }, + { + name: "filter series label with non-existing value", + query: `{app="fake"} | app="noexist"`, + match: false, + }, { name: "ignore label from series with empty value", query: `{app="fake"} | app=""`, From 23c1cc24595ce6224cccd49677c7d7e374110b66 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Thu, 31 Oct 2024 11:55:09 +0100 Subject: [PATCH 6/8] Add todo --- pkg/storage/bloom/v1/bloom_tester.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/storage/bloom/v1/bloom_tester.go b/pkg/storage/bloom/v1/bloom_tester.go index 44c9bfefb882c..3847f26e51b9b 100644 --- a/pkg/storage/bloom/v1/bloom_tester.go +++ b/pkg/storage/bloom/v1/bloom_tester.go @@ -174,6 +174,9 @@ func (sm stringMatcherTest) MatchesWithPrefixBuf(series labels.Labels, bloom fil } // match returns true if the series matches the matcher or is in the bloom filter. +// TODO(salvacorts): support filtering out chunks for labels overriden by structurdd metadata. +// We'd need passing a list of structured metadata fields similarly to how we pass the series. +// SEE: https://github.com/grafana/loki/pull/14661#discussion_r1824228343 func (sm stringMatcherTest) match(series labels.Labels, bloom filter.Checker, combined []byte) bool { // If we don't have the series labels, we cannot disambiguate which labels come from the series in which case // we may filter out chunks for queries like `{env="prod"} | env="prod"` if env=prod is not structured metadata From 23694151fb4bb367572064309c11f61c448053b8 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Thu, 31 Oct 2024 13:49:24 +0100 Subject: [PATCH 7/8] Add log line and remove comment --- pkg/storage/bloom/v1/bloom_tester.go | 6 +++--- pkg/storage/bloom/v1/bloom_tester_test.go | 8 -------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/pkg/storage/bloom/v1/bloom_tester.go b/pkg/storage/bloom/v1/bloom_tester.go index 3847f26e51b9b..f27dfe86c3213 100644 --- a/pkg/storage/bloom/v1/bloom_tester.go +++ b/pkg/storage/bloom/v1/bloom_tester.go @@ -4,6 +4,8 @@ import ( "fmt" "unsafe" + "github.com/go-kit/log/level" + util_log "github.com/grafana/loki/v3/pkg/util/log" "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" @@ -174,13 +176,11 @@ func (sm stringMatcherTest) MatchesWithPrefixBuf(series labels.Labels, bloom fil } // match returns true if the series matches the matcher or is in the bloom filter. -// TODO(salvacorts): support filtering out chunks for labels overriden by structurdd metadata. -// We'd need passing a list of structured metadata fields similarly to how we pass the series. -// SEE: https://github.com/grafana/loki/pull/14661#discussion_r1824228343 func (sm stringMatcherTest) match(series labels.Labels, bloom filter.Checker, combined []byte) bool { // If we don't have the series labels, we cannot disambiguate which labels come from the series in which case // we may filter out chunks for queries like `{env="prod"} | env="prod"` if env=prod is not structured metadata if len(series) == 0 { + level.Warn(util_log.Logger).Log("msg", "series has no labels, cannot filter out chunks") return true } diff --git a/pkg/storage/bloom/v1/bloom_tester_test.go b/pkg/storage/bloom/v1/bloom_tester_test.go index 4d98c4e83fc1c..935da19de255a 100644 --- a/pkg/storage/bloom/v1/bloom_tester_test.go +++ b/pkg/storage/bloom/v1/bloom_tester_test.go @@ -76,14 +76,6 @@ func TestLabelMatchersToBloomTest(t *testing.T) { query: `{app="fake"} | env="dev"`, // env is set to prod in the series match: false, }, - // We cannot support this test case until we can forward a list of structured metadata fields. - // We cannot check if the key is structured metadata using the bloom because these are probabilistic - // E.g. bloom.Test("env") may return true even if env is not structured metadata. - //{ - // name: "filter label from series overridden by structured metadata", - // query: `{app="fake"} | app="fake"`, // app is set to other in the structured metadata - // match: false, - //}, { name: "ignore label from series and structured metadata", query: `{app="fake"} | app="other"`, From 054806241c2678a7e869b74d2a3f9d358af4c215 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Thu, 31 Oct 2024 14:29:23 +0100 Subject: [PATCH 8/8] lint --- pkg/storage/bloom/v1/bloom_tester.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/bloom/v1/bloom_tester.go b/pkg/storage/bloom/v1/bloom_tester.go index f27dfe86c3213..b70c64804118e 100644 --- a/pkg/storage/bloom/v1/bloom_tester.go +++ b/pkg/storage/bloom/v1/bloom_tester.go @@ -5,10 +5,10 @@ import ( "unsafe" "github.com/go-kit/log/level" - util_log "github.com/grafana/loki/v3/pkg/util/log" "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" + util_log "github.com/grafana/loki/v3/pkg/util/log" ) type BloomTest interface {