From 1cd8d90d4f231630e96b93c92dda6e2795d0ebd0 Mon Sep 17 00:00:00 2001 From: Mindaugas Niaura Date: Fri, 27 Sep 2024 15:50:39 +0300 Subject: [PATCH 1/8] receive/multitsdb: add cuckoo filter on metric names Signed-off-by: Mindaugas Niaura --- cmd/thanos/receive.go | 18 +++++++- cmd/thanos/rule.go | 2 +- go.mod | 3 ++ go.sum | 5 ++ pkg/filter/cuckoo.go | 36 +++++++++++++++ pkg/filter/filter.go | 17 +++++++ pkg/query/endpointset.go | 6 ++- pkg/receive/handler_test.go | 1 + pkg/receive/multitsdb.go | 13 +++++- pkg/receive/multitsdb_test.go | 10 ++++ pkg/receive/receive_test.go | 1 + pkg/receive/writer_test.go | 2 + pkg/store/acceptance_test.go | 4 +- pkg/store/proxy.go | 13 ++++++ pkg/store/storepb/testutil/client.go | 1 + pkg/store/tsdb.go | 69 ++++++++++++++++++++++++++-- pkg/store/tsdb_test.go | 10 ++-- 17 files changed, 196 insertions(+), 15 deletions(-) create mode 100644 pkg/filter/cuckoo.go create mode 100644 pkg/filter/filter.go diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 47d63802a0..96ef917dc5 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -53,7 +53,10 @@ import ( "github.com/thanos-io/thanos/pkg/tls" ) -const compressionNone = "none" +const ( + compressionNone = "none" + metricNamesFilter = "metric-names-filter" +) func registerReceive(app *extkingpin.App) { cmd := app.Command(component.Receive.String(), "Accept Prometheus remote write API requests and write to local tsdb.") @@ -136,6 +139,14 @@ func runReceive( level.Info(logger).Log("mode", receiveMode, "msg", "running receive") + var metricNameFilterEnabled bool + for _, feature := range *conf.featureList { + if feature == metricNamesFilter { + metricNameFilterEnabled = true + level.Info(logger).Log("msg", "metric name filter feature enabled") + } + } + rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), conf.rwServerCert, conf.rwServerKey, conf.rwServerClientCA) if err != nil { return err @@ -215,6 +226,7 @@ func runReceive( bkt, conf.allowOutOfOrderUpload, hashFunc, + metricNameFilterEnabled, ) writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, &receive.WriterOptions{ Intern: conf.writerInterning, @@ -845,6 +857,8 @@ type receiveConfig struct { limitsConfigReloadTimer time.Duration asyncForwardWorkerCount uint + + featureList *[]string } func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -985,6 +999,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden()) cmd.Flag("receive.limits-config-reload-timer", "Minimum amount of time to pass for the limit configuration to be reloaded. Helps to avoid excessive reloads."). Default("1s").Hidden().DurationVar(&rc.limitsConfigReloadTimer) + + rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings() } // determineMode returns the ReceiverMode that this receiver is configured to run in. diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 3fcc452ac6..351664c2ab 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -736,7 +736,7 @@ func runRule( } infoOptions := []info.ServerOptionFunc{info.WithRulesInfoFunc()} if tsdbDB != nil { - tsdbStore := store.NewTSDBStore(logger, tsdbDB, component.Rule, conf.lset) + tsdbStore := store.NewTSDBStore(logger, tsdbDB, component.Rule, conf.lset, false) infoOptions = append( infoOptions, info.WithLabelSetFunc(func() []labelpb.ZLabelSet { diff --git a/go.mod b/go.mod index c3b72969a2..3e66d11b07 100644 --- a/go.mod +++ b/go.mod @@ -120,11 +120,14 @@ require ( github.com/mitchellh/go-ps v1.0.0 github.com/onsi/gomega v1.34.2 github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 + github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771 go.opentelemetry.io/contrib/propagators/autoprop v0.54.0 go4.org/intern v0.0.0-20230525184215-6c62f75575cb golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 ) +require github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect + require ( cloud.google.com/go/auth v0.5.1 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect diff --git a/go.sum b/go.sum index d3506e381a..ce27da9bb1 100644 --- a/go.sum +++ b/go.sum @@ -1508,6 +1508,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE= github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA= +github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E= +github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= github.com/digitalocean/godo v1.117.0 h1:WVlTe09melDYTd7VCVyvHcNWbgB+uI1O115+5LOtdSw= github.com/digitalocean/godo v1.117.0/go.mod h1:Vk0vpCot2HOAJwc5WE8wljZGtJ3ZtWIc8MQ8rF38sdo= github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= @@ -2175,6 +2177,8 @@ github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybL github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.27 h1:yGAraK1uUjlhSXgNMIy8o/J4LFNcy7yeipBqt9N9mVg= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.27/go.mod h1:fCa7OJZ/9DRTnOKmxvT6pn+LPWUptQAmHF/SBJUGEcg= +github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771 h1:emzAzMZ1L9iaKCTxdy3Em8Wv4ChIAGnfiz18Cda70g4= +github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg= github.com/sercand/kuberesolver/v5 v5.1.1 h1:CYH+d67G0sGBj7q5wLK61yzqJJ8gLLC8aeprPTHb6yY= github.com/sercand/kuberesolver/v5 v5.1.1/go.mod h1:Fs1KbKhVRnB2aDWN12NjKCB+RgYMWZJ294T3BtmVCpQ= github.com/shirou/gopsutil/v3 v3.21.2/go.mod h1:ghfMypLDrFSWN2c9cDYFLHyynQ+QUht0cv/18ZqVczw= @@ -3206,6 +3210,7 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= diff --git a/pkg/filter/cuckoo.go b/pkg/filter/cuckoo.go new file mode 100644 index 0000000000..5440a680ad --- /dev/null +++ b/pkg/filter/cuckoo.go @@ -0,0 +1,36 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package filter + +import ( + "sync" + + cuckoo "github.com/seiflotfy/cuckoofilter" +) + +type CuckooFilterMetricNameFilter struct { + filter *cuckoo.Filter + mtx sync.RWMutex +} + +func NewCuckooFilterMetricNameFilter(capacity uint) *CuckooFilterMetricNameFilter { + return &CuckooFilterMetricNameFilter{ + filter: cuckoo.NewFilter(capacity), + } +} + +func (f *CuckooFilterMetricNameFilter) MatchesMetricName(metricName string) bool { + f.mtx.RLock() + defer f.mtx.RUnlock() + return f.filter.Lookup([]byte(metricName)) +} + +func (f *CuckooFilterMetricNameFilter) ResetAddMetricName(metricNames ...string) { + f.mtx.Lock() + defer f.mtx.Unlock() + f.filter.Reset() + for _, metricName := range metricNames { + f.filter.Insert([]byte(metricName)) + } +} diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go new file mode 100644 index 0000000000..c22809da53 --- /dev/null +++ b/pkg/filter/filter.go @@ -0,0 +1,17 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package filter + +type MetricNameFilter interface { + MatchesMetricName(metricName string) bool + ResetAddMetricName(metricNames ...string) +} + +type AllowAllMetricNameFilter struct{} + +func (f AllowAllMetricNameFilter) MatchesMetricName(metricName string) bool { + return true +} + +func (f AllowAllMetricNameFilter) ResetAddMetricName(metricNames ...string) {} diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 226bbb4730..b9f9b13508 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -568,7 +568,7 @@ type endpointRef struct { logger log.Logger } -// newEndpointRef creates a new endpointRef with a gRPC channel to the given the IP address. +// newndpointRef creates a new endpointRef with a gRPC channel to the given the IP address. // The call to newEndpointRef will return an error if establishing the channel fails. func (e *EndpointSet) newEndpointRef(spec *GRPCEndpointSpec) (*endpointRef, error) { var dialOpts []grpc.DialOption @@ -815,6 +815,10 @@ func (er *endpointRef) apisPresent() []string { return apisPresent } +func (er *endpointRef) MatchesMetricName(name string) bool { + return true +} + type endpointMetadata struct { *infopb.InfoResponse } diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 4f81a3d1ca..b5f78a3e00 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -999,6 +999,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { nil, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(b, m.Close()) }() handler.writer = NewWriter(logger, m, &WriterOptions{}) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 62e39ab500..de16e4a596 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -70,6 +70,8 @@ type MultiTSDB struct { exemplarClients map[string]*exemplars.TSDB exemplarClientsNeedUpdate bool + + metricNameFilterEnabled bool } // NewMultiTSDB creates new MultiTSDB. @@ -84,6 +86,7 @@ func NewMultiTSDB( bucket objstore.Bucket, allowOutOfOrderUpload bool, hashFunc metadata.HashFunc, + metricNameFilterEnabled bool, ) *MultiTSDB { if l == nil { l = log.NewNopLogger() @@ -97,6 +100,7 @@ func NewMultiTSDB( mtx: &sync.RWMutex{}, tenants: map[string]*tenant{}, labels: labels, + metricNameFilterEnabled: metricNameFilterEnabled, tsdbClientsNeedUpdate: true, exemplarClientsNeedUpdate: true, tenantLabelName: tenantLabelName, @@ -179,6 +183,10 @@ func newLocalClient(store *store.TSDBStore) *localClient { } } +func (l *localClient) MatchesMetricName(metricName string) bool { + return l.store.MatchesMetricName(metricName) +} + func (l *localClient) LabelSets() []labels.Labels { return labelpb.ZLabelSetsToPromLabelSets(l.store.LabelSet()...) } @@ -302,6 +310,9 @@ func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *ship } func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, tenantTSDB *tsdb.DB) { + if storeTSDB == nil && t.storeTSDB != nil { + t.storeTSDB.Close() + } t.storeTSDB = storeTSDB t.ship = ship t.exemplarsTSDB = exemplarsTSDB @@ -751,7 +762,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant shipper.DefaultMetaFilename, ) } - tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset), s, ship, exemplars.NewTSDB(s, lset)) + tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, t.metricNameFilterEnabled), s, ship, exemplars.NewTSDB(s, lset)) level.Info(logger).Log("msg", "TSDB is now ready") return nil } diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index eb82c22281..1ede2d8ac1 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -57,6 +57,7 @@ func TestMultiTSDB(t *testing.T) { nil, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -141,6 +142,7 @@ func TestMultiTSDB(t *testing.T) { nil, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -184,6 +186,7 @@ func TestMultiTSDB(t *testing.T) { nil, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -451,6 +454,7 @@ func TestMultiTSDBPrune(t *testing.T) { test.bucket, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -524,6 +528,7 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) { objstore.NewInMemBucket(), false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -585,6 +590,7 @@ func TestAlignedHeadFlush(t *testing.T) { test.bucket, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -659,6 +665,7 @@ func TestMultiTSDBStats(t *testing.T) { nil, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -688,6 +695,7 @@ func TestMultiTSDBWithNilStore(t *testing.T) { nil, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -729,6 +737,7 @@ func TestProxyLabelValues(t *testing.T) { nil, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -819,6 +828,7 @@ func BenchmarkMultiTSDB(b *testing.B) { nil, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(b, m.Close()) }() diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index ea1b6d81fd..e4fb686554 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -810,6 +810,7 @@ func initializeMultiTSDB(dir string) *MultiTSDB { bucket, false, metadata.NoneFunc, + false, ) return m diff --git a/pkg/receive/writer_test.go b/pkg/receive/writer_test.go index 34613794b8..468bac516c 100644 --- a/pkg/receive/writer_test.go +++ b/pkg/receive/writer_test.go @@ -344,6 +344,7 @@ func TestWriter(t *testing.T) { nil, false, metadata.NoneFunc, + false, ) t.Cleanup(func() { testutil.Ok(t, m.Close()) }) @@ -436,6 +437,7 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr nil, false, metadata.NoneFunc, + false, ) b.Cleanup(func() { testutil.Ok(b, m.Close()) }) diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index 793f16eb60..6e61231772 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -1019,7 +1019,7 @@ func TestTSDBStore_Acceptance(t *testing.T) { tt.Cleanup(func() { testutil.Ok(tt, db.Close()) }) appendFn(db.Appender(context.Background())) - return NewTSDBStore(nil, db, component.Rule, extLset) + return NewTSDBStore(nil, db, component.Rule, extLset, false) } testStoreAPIsAcceptance(t, startStore) @@ -1173,7 +1173,7 @@ func TestProxyStoreWithReplicas_Acceptance(t *testing.T) { tt.Cleanup(func() { testutil.Ok(tt, db.Close()) }) appendFn(db.Appender(context.Background())) - return NewTSDBStore(nil, db, component.Rule, extLset) + return NewTSDBStore(nil, db, component.Rule, extLset, false) } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 08cd36e485..28580ab60e 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -71,6 +71,9 @@ type Client interface { // Addr returns address of the store client. If second parameter is true, the client // represents a local client (server-as-client) and has no remote address. Addr() (addr string, isLocalClient bool) + + // MatchesMetricName returns true if the metric name is allowed in the store. + MatchesMetricName(metricName string) bool } // ProxyStore implements the store API that proxies request to all given underlying stores. @@ -590,6 +593,16 @@ func storeMatches(ctx context.Context, s Client, mint, maxt int64, matchers ...* if !LabelSetsMatch(matchers, extLset...) { return false, fmt.Sprintf("external labels %v does not match request label matchers: %v", extLset, matchers) } + + for _, m := range matchers { + if m.Type == labels.MatchEqual && m.Name == labels.MetricName { + if !s.MatchesMetricName(m.Value) { + return false, fmt.Sprintf("metric name %v does not match filter", m.Value) + } + break + } + } + return true, "" } diff --git a/pkg/store/storepb/testutil/client.go b/pkg/store/storepb/testutil/client.go index 90874842d6..e3bbbaa917 100644 --- a/pkg/store/storepb/testutil/client.go +++ b/pkg/store/storepb/testutil/client.go @@ -30,3 +30,4 @@ func (c TestClient) SupportsSharding() bool { return c.Shardable } func (c TestClient) SupportsWithoutReplicaLabels() bool { return c.WithoutReplicaLabelsEnabled } func (c TestClient) String() string { return c.Name } func (c TestClient) Addr() (string, bool) { return c.Name, c.IsLocalStore } +func (c TestClient) MatchesMetricName(_ string) bool { return true } diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index b4cea0022b..1217b94a8e 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -11,9 +11,12 @@ import ( "sort" "strings" "sync" + "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "google.golang.org/grpc" @@ -21,6 +24,7 @@ import ( "google.golang.org/grpc/status" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -44,8 +48,15 @@ type TSDBStore struct { buffers sync.Pool maxBytesPerFrame int - extLset labels.Labels - mtx sync.RWMutex + extLset labels.Labels + mtx sync.RWMutex + metricNameFilter filter.MetricNameFilter + close func() + storepb.UnimplementedStoreServer +} + +func (s *TSDBStore) Close() { + s.close() } func RegisterWritableStoreServer(storeSrv storepb.WriteableStoreServer) func(*grpc.Server) { @@ -62,21 +73,67 @@ type ReadWriteTSDBStore struct { // NewTSDBStore creates a new TSDBStore. // NOTE: Given lset has to be sorted. -func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI, extLset labels.Labels) *TSDBStore { +func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI, extLset labels.Labels, metricNameFilterEnabled bool) *TSDBStore { if logger == nil { logger = log.NewNopLogger() } - return &TSDBStore{ + + var ( + metricNameFilter filter.MetricNameFilter + startMetricNamesUpdate bool + ) + + metricNameFilter = filter.AllowAllMetricNameFilter{} + if metricNameFilterEnabled { + startMetricNamesUpdate = true + metricNameFilter = filter.NewCuckooFilterMetricNameFilter(1000000) // about 1MB on 64bit machines. + } + + st := &TSDBStore{ logger: logger, db: db, component: component, extLset: extLset, + metricNameFilter: metricNameFilter, maxBytesPerFrame: RemoteReadFrameLimit, buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) return &b }}, } + + if startMetricNamesUpdate { + t := time.NewTicker(15 * time.Second) + ctx, cancel := context.WithCancel(context.Background()) + updateMetricNames := func() { + vals, err := st.LabelValues(context.Background(), &storepb.LabelValuesRequest{ + Label: model.MetricNameLabel, + Start: 0, + End: math.MaxInt64, + }) + if err != nil { + level.Error(logger).Log("msg", "failed to update metric names", "err", err) + return + } + + st.metricNameFilter.ResetAddMetricName(vals.Values...) + } + st.close = cancel + updateMetricNames() + + go func() { + for { + select { + case <-t.C: + updateMetricNames() + case <-ctx.Done(): + return + } + } + }() + } + + return st } func (s *TSDBStore) SetExtLset(extLset labels.Labels) { @@ -133,6 +190,10 @@ func (s *TSDBStore) TimeRange() (int64, int64) { return minTime, math.MaxInt64 } +func (s *TSDBStore) MatchesMetricName(metricName string) bool { + return s.metricNameFilter.MatchesMetricName(metricName) +} + // CloseDelegator allows to delegate close (releasing resources used by request to the server). // This is useful when we invoke StoreAPI within another StoreAPI and results are ephemeral until copied. type CloseDelegator interface { diff --git a/pkg/store/tsdb_test.go b/pkg/store/tsdb_test.go index 5a6457a6fe..5c723a7cbd 100644 --- a/pkg/store/tsdb_test.go +++ b/pkg/store/tsdb_test.go @@ -39,7 +39,7 @@ func TestTSDBStore_Series_ChunkChecksum(t *testing.T) { defer func() { testutil.Ok(t, db.Close()) }() testutil.Ok(t, err) - tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west")) + tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west"), false) appender := db.Appender(context.Background()) @@ -79,7 +79,7 @@ func TestTSDBStore_Series(t *testing.T) { defer func() { testutil.Ok(t, db.Close()) }() testutil.Ok(t, err) - tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west")) + tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west"), false) appender := db.Appender(context.Background()) @@ -251,7 +251,7 @@ func TestTSDBStore_SeriesAccessWithDelegateClosing(t *testing.T) { }) extLabels := labels.FromStrings("ext", "1") - store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels) + store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels, false) srv := storetestutil.NewSeriesServer(context.Background()) csrv := &delegatorServer{SeriesServer: srv} @@ -414,7 +414,7 @@ func TestTSDBStore_SeriesAccessWithoutDelegateClosing(t *testing.T) { }) extLabels := labels.FromStrings("ext", "1") - store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels) + store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels, false) srv := storetestutil.NewSeriesServer(context.Background()) t.Run("call series and access results", func(t *testing.T) { @@ -555,7 +555,7 @@ func benchTSDBStoreSeries(t testutil.TB, totalSamples, totalSeries int) { defer func() { testutil.Ok(t, db.Close()) }() extLabels := labels.FromStrings("ext", "1") - store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels) + store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels, false) var expected []*storepb.Series for _, resp := range resps { From bc3a1828fa64d61ad0c3a8f58e5da95f1e336a6f Mon Sep 17 00:00:00 2001 From: Mindaugas Niaura Date: Mon, 30 Sep 2024 11:44:14 +0300 Subject: [PATCH 2/8] add enable-feature flag to Receiver docs, fix newEndpointRef typo Signed-off-by: Mindaugas Niaura --- docs/components/receive.md | 3 +++ pkg/query/endpointset.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/components/receive.md b/docs/components/receive.md index aedfb3f5af..31091e5314 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -339,6 +339,9 @@ Flags: detected maximum container or system memory. --enable-auto-gomemlimit Enable go runtime to automatically limit memory consumption. + --enable-feature= ... Comma separated experimental feature names + to enable. The current list of features is + metric-names-filter. --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index b9f9b13508..76e41c79fd 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -568,7 +568,7 @@ type endpointRef struct { logger log.Logger } -// newndpointRef creates a new endpointRef with a gRPC channel to the given the IP address. +// newEndpointRef creates a new endpointRef with a gRPC channel to the given the IP address. // The call to newEndpointRef will return an error if establishing the channel fails. func (e *EndpointSet) newEndpointRef(spec *GRPCEndpointSpec) (*endpointRef, error) { var dialOpts []grpc.DialOption From a4298bb85036580b3b88f8e0d40356417eee4e0a Mon Sep 17 00:00:00 2001 From: Mindaugas Niaura Date: Mon, 30 Sep 2024 14:58:00 +0300 Subject: [PATCH 3/8] add test cases for testFilter Signed-off-by: Mindaugas Niaura --- pkg/store/proxy_test.go | 55 ++++++++++++++++++++++++++++ pkg/store/storepb/testutil/client.go | 6 ++- 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 1ac6346acd..0d7e42dea6 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -31,6 +31,7 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -1804,6 +1805,10 @@ func seriesEquals(t *testing.T, expected []rawSeries, got []storepb.Series) { } func TestStoreMatches(t *testing.T) { + testMetricName := "test_metric" + testFilter := filter.NewCuckooFilterMetricNameFilter(10) + testFilter.ResetAddMetricName(testMetricName) + for _, c := range []struct { s Client mint, maxt int64 @@ -1925,6 +1930,35 @@ func TestStoreMatches(t *testing.T) { maxt: 1, expectedMatch: true, }, + { + s: &storetestutil.TestClient{ + ExtLset: []labels.Labels{ + labels.FromStrings("a", "b"), + }, + MetricNameFilter: testFilter, + }, + ms: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "a", "b"), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testMetricName), + }, + maxt: 1, + expectedMatch: true, + }, + { + s: &storetestutil.TestClient{ + ExtLset: []labels.Labels{ + labels.FromStrings("a", "b"), + }, + MetricNameFilter: testFilter, + }, + ms: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "a", "b"), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "some_other_metric"), + }, + maxt: 1, + expectedMatch: false, + expectedReason: "metric name some_other_metric does not match filter", + }, } { t.Run("", func(t *testing.T) { ok, reason := storeMatches(context.TODO(), c.s, c.mint, c.maxt, c.ms...) @@ -2273,6 +2307,27 @@ func TestProxyStore_storeMatchMetadata(t *testing.T) { testutil.Equals(t, "", reason) } +func TestProxyStore_MatchesMetricName(t *testing.T) { + c := storetestutil.TestClient{Name: "matchesmetricname"} + c.MetricNameFilter = filter.AllowAllMetricNameFilter{} + c.IsLocalStore = true + + ok, reason := storeMatchDebugMetadata(c, [][]*labels.Matcher{{}}) + testutil.Assert(t, !ok) + testutil.Equals(t, "the store is not remote, cannot match __address__", reason) + + // Change client to remote. + c.IsLocalStore = false + + ok, reason = storeMatchDebugMetadata(c, [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "__address__", "wrong")}}) + testutil.Assert(t, !ok) + testutil.Equals(t, "__address__ testaddr does not match debug store metadata matchers: [[__address__=\"wrong\"]]", reason) + + ok, reason = storeMatchDebugMetadata(c, [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "__address__", "testaddr")}}) + testutil.Assert(t, ok) + testutil.Equals(t, "", reason) +} + func TestDedupRespHeap_Deduplication(t *testing.T) { t.Parallel() diff --git a/pkg/store/storepb/testutil/client.go b/pkg/store/storepb/testutil/client.go index e3bbbaa917..055dc7fd76 100644 --- a/pkg/store/storepb/testutil/client.go +++ b/pkg/store/storepb/testutil/client.go @@ -6,6 +6,7 @@ package storetestutil import ( "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -21,6 +22,7 @@ type TestClient struct { WithoutReplicaLabelsEnabled bool IsLocalStore bool StoreTSDBInfos []infopb.TSDBInfo + MetricNameFilter filter.MetricNameFilter } func (c TestClient) LabelSets() []labels.Labels { return c.ExtLset } @@ -30,4 +32,6 @@ func (c TestClient) SupportsSharding() bool { return c.Shardable } func (c TestClient) SupportsWithoutReplicaLabels() bool { return c.WithoutReplicaLabelsEnabled } func (c TestClient) String() string { return c.Name } func (c TestClient) Addr() (string, bool) { return c.Name, c.IsLocalStore } -func (c TestClient) MatchesMetricName(_ string) bool { return true } +func (c TestClient) MatchesMetricName(metricName string) bool { + return c.MetricNameFilter.MatchesMetricName(metricName) +} From 068c92cf3bda929177b9f0e0d49517b47bc79edb Mon Sep 17 00:00:00 2001 From: Mindaugas Niaura Date: Mon, 30 Sep 2024 15:02:21 +0300 Subject: [PATCH 4/8] avoid copy in CuckooFilterMetricNameFilter Signed-off-by: Mindaugas Niaura --- pkg/filter/cuckoo.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/filter/cuckoo.go b/pkg/filter/cuckoo.go index 5440a680ad..7ad415641a 100644 --- a/pkg/filter/cuckoo.go +++ b/pkg/filter/cuckoo.go @@ -5,6 +5,7 @@ package filter import ( "sync" + "unsafe" cuckoo "github.com/seiflotfy/cuckoofilter" ) @@ -31,6 +32,6 @@ func (f *CuckooFilterMetricNameFilter) ResetAddMetricName(metricNames ...string) defer f.mtx.Unlock() f.filter.Reset() for _, metricName := range metricNames { - f.filter.Insert([]byte(metricName)) + f.filter.Insert(unsafe.Slice(unsafe.StringData(metricName), len(metricName))) } } From f3493a1f92d11b913be21045f4c64735a0102bf8 Mon Sep 17 00:00:00 2001 From: Mindaugas Niaura Date: Mon, 30 Sep 2024 17:14:40 +0300 Subject: [PATCH 5/8] use matchers in store filter Signed-off-by: Mindaugas Niaura --- pkg/filter/cuckoo.go | 24 ++++++++++------ pkg/filter/filter.go | 17 ++++++++---- pkg/query/endpointset.go | 2 +- pkg/receive/multitsdb.go | 4 +-- pkg/store/proxy.go | 13 +++------ pkg/store/proxy_test.go | 41 ++++------------------------ pkg/store/storepb/testutil/client.go | 23 +++++++--------- pkg/store/tsdb.go | 34 +++++++++++------------ 8 files changed, 66 insertions(+), 92 deletions(-) diff --git a/pkg/filter/cuckoo.go b/pkg/filter/cuckoo.go index 7ad415641a..0cdce6dc94 100644 --- a/pkg/filter/cuckoo.go +++ b/pkg/filter/cuckoo.go @@ -7,31 +7,39 @@ import ( "sync" "unsafe" + "github.com/prometheus/prometheus/model/labels" cuckoo "github.com/seiflotfy/cuckoofilter" ) -type CuckooFilterMetricNameFilter struct { +type CuckooMetricNameStoreFilter struct { filter *cuckoo.Filter mtx sync.RWMutex } -func NewCuckooFilterMetricNameFilter(capacity uint) *CuckooFilterMetricNameFilter { - return &CuckooFilterMetricNameFilter{ +func NewCuckooMetricNameStoreFilter(capacity uint) *CuckooMetricNameStoreFilter { + return &CuckooMetricNameStoreFilter{ filter: cuckoo.NewFilter(capacity), } } -func (f *CuckooFilterMetricNameFilter) MatchesMetricName(metricName string) bool { +func (f *CuckooMetricNameStoreFilter) Matches(matchers []*labels.Matcher) bool { f.mtx.RLock() defer f.mtx.RUnlock() - return f.filter.Lookup([]byte(metricName)) + + for _, m := range matchers { + if m.Type == labels.MatchEqual && m.Name == labels.MetricName { + return f.filter.Lookup([]byte(m.Value)) + } + } + + return true } -func (f *CuckooFilterMetricNameFilter) ResetAddMetricName(metricNames ...string) { +func (f *CuckooMetricNameStoreFilter) ResetAndSet(values ...string) { f.mtx.Lock() defer f.mtx.Unlock() f.filter.Reset() - for _, metricName := range metricNames { - f.filter.Insert(unsafe.Slice(unsafe.StringData(metricName), len(metricName))) + for _, value := range values { + f.filter.Insert(unsafe.Slice(unsafe.StringData(value), len(value))) } } diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index c22809da53..f5cc068cf5 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -3,15 +3,20 @@ package filter -type MetricNameFilter interface { - MatchesMetricName(metricName string) bool - ResetAddMetricName(metricNames ...string) +import "github.com/prometheus/prometheus/model/labels" + +type StoreFilter interface { + // Matches returns true if the filter matches the given matchers. + Matches(matchers []*labels.Matcher) bool + + // ResetAndSet resets the filter and sets it to the given values. + ResetAndSet(values ...string) } -type AllowAllMetricNameFilter struct{} +type AllowAllStoreFilter struct{} -func (f AllowAllMetricNameFilter) MatchesMetricName(metricName string) bool { +func (f AllowAllStoreFilter) Matches(matchers []*labels.Matcher) bool { return true } -func (f AllowAllMetricNameFilter) ResetAddMetricName(metricNames ...string) {} +func (f AllowAllStoreFilter) ResetAndSet(values ...string) {} diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 76e41c79fd..59234c7afe 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -815,7 +815,7 @@ func (er *endpointRef) apisPresent() []string { return apisPresent } -func (er *endpointRef) MatchesMetricName(name string) bool { +func (er *endpointRef) Matches(matchers []*labels.Matcher) bool { return true } diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index de16e4a596..61b2b5f1ba 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -183,8 +183,8 @@ func newLocalClient(store *store.TSDBStore) *localClient { } } -func (l *localClient) MatchesMetricName(metricName string) bool { - return l.store.MatchesMetricName(metricName) +func (l *localClient) Matches(matchers []*labels.Matcher) bool { + return l.store.Matches(matchers) } func (l *localClient) LabelSets() []labels.Labels { diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 28580ab60e..26cc160c35 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -72,8 +72,8 @@ type Client interface { // represents a local client (server-as-client) and has no remote address. Addr() (addr string, isLocalClient bool) - // MatchesMetricName returns true if the metric name is allowed in the store. - MatchesMetricName(metricName string) bool + // Matches returns true if provided label matchers are allowed in the store. + Matches(matches []*labels.Matcher) bool } // ProxyStore implements the store API that proxies request to all given underlying stores. @@ -594,13 +594,8 @@ func storeMatches(ctx context.Context, s Client, mint, maxt int64, matchers ...* return false, fmt.Sprintf("external labels %v does not match request label matchers: %v", extLset, matchers) } - for _, m := range matchers { - if m.Type == labels.MatchEqual && m.Name == labels.MetricName { - if !s.MatchesMetricName(m.Value) { - return false, fmt.Sprintf("metric name %v does not match filter", m.Value) - } - break - } + if !s.Matches(matchers) { + return false, fmt.Sprintf("store does not match filter for matchers: %v", matchers) } return true, "" diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 0d7e42dea6..05f13d4bac 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -31,7 +31,6 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -1805,10 +1804,6 @@ func seriesEquals(t *testing.T, expected []rawSeries, got []storepb.Series) { } func TestStoreMatches(t *testing.T) { - testMetricName := "test_metric" - testFilter := filter.NewCuckooFilterMetricNameFilter(10) - testFilter.ResetAddMetricName(testMetricName) - for _, c := range []struct { s Client mint, maxt int64 @@ -1931,15 +1926,10 @@ func TestStoreMatches(t *testing.T) { expectedMatch: true, }, { - s: &storetestutil.TestClient{ - ExtLset: []labels.Labels{ - labels.FromStrings("a", "b"), - }, - MetricNameFilter: testFilter, - }, + s: &storetestutil.TestClient{ExtLset: []labels.Labels{labels.FromStrings("a", "b")}}, ms: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, "a", "b"), - labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, testMetricName), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric_name"), }, maxt: 1, expectedMatch: true, @@ -1949,15 +1939,15 @@ func TestStoreMatches(t *testing.T) { ExtLset: []labels.Labels{ labels.FromStrings("a", "b"), }, - MetricNameFilter: testFilter, + StoreFilterNotMatches: true, }, ms: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, "a", "b"), - labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "some_other_metric"), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric_name"), }, maxt: 1, expectedMatch: false, - expectedReason: "metric name some_other_metric does not match filter", + expectedReason: "store does not match filter for matchers: [a=\"b\" __name__=\"test_metric_name\"]", }, } { t.Run("", func(t *testing.T) { @@ -2307,27 +2297,6 @@ func TestProxyStore_storeMatchMetadata(t *testing.T) { testutil.Equals(t, "", reason) } -func TestProxyStore_MatchesMetricName(t *testing.T) { - c := storetestutil.TestClient{Name: "matchesmetricname"} - c.MetricNameFilter = filter.AllowAllMetricNameFilter{} - c.IsLocalStore = true - - ok, reason := storeMatchDebugMetadata(c, [][]*labels.Matcher{{}}) - testutil.Assert(t, !ok) - testutil.Equals(t, "the store is not remote, cannot match __address__", reason) - - // Change client to remote. - c.IsLocalStore = false - - ok, reason = storeMatchDebugMetadata(c, [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "__address__", "wrong")}}) - testutil.Assert(t, !ok) - testutil.Equals(t, "__address__ testaddr does not match debug store metadata matchers: [[__address__=\"wrong\"]]", reason) - - ok, reason = storeMatchDebugMetadata(c, [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "__address__", "testaddr")}}) - testutil.Assert(t, ok) - testutil.Equals(t, "", reason) -} - func TestDedupRespHeap_Deduplication(t *testing.T) { t.Parallel() diff --git a/pkg/store/storepb/testutil/client.go b/pkg/store/storepb/testutil/client.go index 055dc7fd76..419c3e0a0e 100644 --- a/pkg/store/storepb/testutil/client.go +++ b/pkg/store/storepb/testutil/client.go @@ -6,7 +6,6 @@ package storetestutil import ( "github.com/prometheus/prometheus/model/labels" - "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -21,17 +20,15 @@ type TestClient struct { Shardable bool WithoutReplicaLabelsEnabled bool IsLocalStore bool - StoreTSDBInfos []infopb.TSDBInfo - MetricNameFilter filter.MetricNameFilter + StoreTSDBInfos []*infopb.TSDBInfo + StoreFilterNotMatches bool } -func (c TestClient) LabelSets() []labels.Labels { return c.ExtLset } -func (c TestClient) TimeRange() (mint, maxt int64) { return c.MinTime, c.MaxTime } -func (c TestClient) TSDBInfos() []infopb.TSDBInfo { return c.StoreTSDBInfos } -func (c TestClient) SupportsSharding() bool { return c.Shardable } -func (c TestClient) SupportsWithoutReplicaLabels() bool { return c.WithoutReplicaLabelsEnabled } -func (c TestClient) String() string { return c.Name } -func (c TestClient) Addr() (string, bool) { return c.Name, c.IsLocalStore } -func (c TestClient) MatchesMetricName(metricName string) bool { - return c.MetricNameFilter.MatchesMetricName(metricName) -} +func (c TestClient) LabelSets() []labels.Labels { return c.ExtLset } +func (c TestClient) TimeRange() (mint, maxt int64) { return c.MinTime, c.MaxTime } +func (c TestClient) TSDBInfos() []*infopb.TSDBInfo { return c.StoreTSDBInfos } +func (c TestClient) SupportsSharding() bool { return c.Shardable } +func (c TestClient) SupportsWithoutReplicaLabels() bool { return c.WithoutReplicaLabelsEnabled } +func (c TestClient) String() string { return c.Name } +func (c TestClient) Addr() (string, bool) { return c.Name, c.IsLocalStore } +func (c TestClient) Matches(matches []*labels.Matcher) bool { return !c.StoreFilterNotMatches } diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 1217b94a8e..462b144097 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -48,10 +48,10 @@ type TSDBStore struct { buffers sync.Pool maxBytesPerFrame int - extLset labels.Labels - mtx sync.RWMutex - metricNameFilter filter.MetricNameFilter - close func() + extLset labels.Labels + storeFilter filter.StoreFilter + mtx sync.RWMutex + close func() storepb.UnimplementedStoreServer } @@ -79,14 +79,14 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI } var ( - metricNameFilter filter.MetricNameFilter - startMetricNamesUpdate bool + storeFilter filter.StoreFilter + startFilterUpdate bool ) - metricNameFilter = filter.AllowAllMetricNameFilter{} + storeFilter = filter.AllowAllStoreFilter{} if metricNameFilterEnabled { - startMetricNamesUpdate = true - metricNameFilter = filter.NewCuckooFilterMetricNameFilter(1000000) // about 1MB on 64bit machines. + startFilterUpdate = true + storeFilter = filter.NewCuckooMetricNameStoreFilter(1000000) // about 1MB on 64bit machines. } st := &TSDBStore{ @@ -94,7 +94,7 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI db: db, component: component, extLset: extLset, - metricNameFilter: metricNameFilter, + storeFilter: storeFilter, maxBytesPerFrame: RemoteReadFrameLimit, buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) @@ -102,10 +102,10 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI }}, } - if startMetricNamesUpdate { + if startFilterUpdate { t := time.NewTicker(15 * time.Second) ctx, cancel := context.WithCancel(context.Background()) - updateMetricNames := func() { + updateFilter := func() { vals, err := st.LabelValues(context.Background(), &storepb.LabelValuesRequest{ Label: model.MetricNameLabel, Start: 0, @@ -116,16 +116,16 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI return } - st.metricNameFilter.ResetAddMetricName(vals.Values...) + st.storeFilter.ResetAndSet(vals.Values...) } st.close = cancel - updateMetricNames() + updateFilter() go func() { for { select { case <-t.C: - updateMetricNames() + updateFilter() case <-ctx.Done(): return } @@ -190,8 +190,8 @@ func (s *TSDBStore) TimeRange() (int64, int64) { return minTime, math.MaxInt64 } -func (s *TSDBStore) MatchesMetricName(metricName string) bool { - return s.metricNameFilter.MatchesMetricName(metricName) +func (s *TSDBStore) Matches(matchers []*labels.Matcher) bool { + return s.storeFilter.Matches(matchers) } // CloseDelegator allows to delegate close (releasing resources used by request to the server). From d2b6089d583e2e10ff3bc7ce3259946a03d35829 Mon Sep 17 00:00:00 2001 From: Mindaugas Niaura Date: Tue, 1 Oct 2024 14:10:11 +0300 Subject: [PATCH 6/8] fix TSDB pruning Signed-off-by: Mindaugas Niaura --- pkg/store/tsdb.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 462b144097..7eeddadf4e 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -96,6 +96,7 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI extLset: extLset, storeFilter: storeFilter, maxBytesPerFrame: RemoteReadFrameLimit, + close: func() {}, buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) return &b From a659d10d3cf1bbc47ace5812536a00f657b07975 Mon Sep 17 00:00:00 2001 From: Mindaugas Niaura Date: Tue, 1 Oct 2024 16:24:36 +0300 Subject: [PATCH 7/8] address PR comments, use options in tsbd initializations Signed-off-by: Mindaugas Niaura --- cmd/thanos/receive.go | 6 ++-- cmd/thanos/rule.go | 2 +- pkg/api/query/v1_test.go | 2 +- pkg/receive/handler_test.go | 1 - pkg/receive/multitsdb.go | 27 +++++++++++--- pkg/receive/multitsdb_test.go | 10 ------ pkg/receive/receive_test.go | 1 - pkg/receive/writer_test.go | 2 -- pkg/store/acceptance_test.go | 4 +-- pkg/store/tsdb.go | 66 ++++++++++++++++++++++------------- pkg/store/tsdb_test.go | 10 +++--- 11 files changed, 76 insertions(+), 55 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 96ef917dc5..30fbf362c2 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -139,10 +139,10 @@ func runReceive( level.Info(logger).Log("mode", receiveMode, "msg", "running receive") - var metricNameFilterEnabled bool + multiTSDBOptions := []receive.MultiTSDBOption{} for _, feature := range *conf.featureList { if feature == metricNamesFilter { - metricNameFilterEnabled = true + multiTSDBOptions = append(multiTSDBOptions, receive.WithMetricNameFilterEnabled()) level.Info(logger).Log("msg", "metric name filter feature enabled") } } @@ -226,7 +226,7 @@ func runReceive( bkt, conf.allowOutOfOrderUpload, hashFunc, - metricNameFilterEnabled, + multiTSDBOptions..., ) writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, &receive.WriterOptions{ Intern: conf.writerInterning, diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 351664c2ab..3fcc452ac6 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -736,7 +736,7 @@ func runRule( } infoOptions := []info.ServerOptionFunc{info.WithRulesInfoFunc()} if tsdbDB != nil { - tsdbStore := store.NewTSDBStore(logger, tsdbDB, component.Rule, conf.lset, false) + tsdbStore := store.NewTSDBStore(logger, tsdbDB, component.Rule, conf.lset) infoOptions = append( infoOptions, info.WithLabelSetFunc(func() []labelpb.ZLabelSet { diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index 923a9cf411..e36bea8ea4 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -785,7 +785,7 @@ func TestQueryAnalyzeEndpoints(t *testing.T) { func newProxyStoreWithTSDBStore(db store.TSDBReader) *store.ProxyStore { c := &storetestutil.TestClient{ Name: "1", - StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, nil)), + StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, labels.EmptyLabels())), MinTime: math.MinInt64, MaxTime: math.MaxInt64, } diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index b5f78a3e00..4f81a3d1ca 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -999,7 +999,6 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { nil, false, metadata.NoneFunc, - false, ) defer func() { testutil.Ok(b, m.Close()) }() handler.writer = NewWriter(logger, m, &WriterOptions{}) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 61b2b5f1ba..a5765604f3 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -74,6 +74,16 @@ type MultiTSDB struct { metricNameFilterEnabled bool } +// MultiTSDBOption is a functional option for MultiTSDB. +type MultiTSDBOption func(mt *MultiTSDB) + +// WithMetricNameFilterEnabled enables metric name filtering on TSDB clients. +func WithMetricNameFilterEnabled() MultiTSDBOption { + return func(s *MultiTSDB) { + s.metricNameFilterEnabled = true + } +} + // NewMultiTSDB creates new MultiTSDB. // NOTE: Passed labels must be sorted lexicographically (alphabetically). func NewMultiTSDB( @@ -86,13 +96,13 @@ func NewMultiTSDB( bucket objstore.Bucket, allowOutOfOrderUpload bool, hashFunc metadata.HashFunc, - metricNameFilterEnabled bool, + options ...MultiTSDBOption, ) *MultiTSDB { if l == nil { l = log.NewNopLogger() } - return &MultiTSDB{ + mt := &MultiTSDB{ dataDir: dataDir, logger: log.With(l, "component", "multi-tsdb"), reg: reg, @@ -100,7 +110,6 @@ func NewMultiTSDB( mtx: &sync.RWMutex{}, tenants: map[string]*tenant{}, labels: labels, - metricNameFilterEnabled: metricNameFilterEnabled, tsdbClientsNeedUpdate: true, exemplarClientsNeedUpdate: true, tenantLabelName: tenantLabelName, @@ -108,6 +117,12 @@ func NewMultiTSDB( allowOutOfOrderUpload: allowOutOfOrderUpload, hashFunc: hashFunc, } + + for _, option := range options { + option(mt) + } + + return mt } type localClient struct { @@ -762,7 +777,11 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant shipper.DefaultMetaFilename, ) } - tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, t.metricNameFilterEnabled), s, ship, exemplars.NewTSDB(s, lset)) + options := []store.TSDBStoreOption{} + if t.metricNameFilterEnabled { + options = append(options, store.WithCuckooMetricNameStoreFilter()) + } + tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset)) level.Info(logger).Log("msg", "TSDB is now ready") return nil } diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 1ede2d8ac1..eb82c22281 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -57,7 +57,6 @@ func TestMultiTSDB(t *testing.T) { nil, false, metadata.NoneFunc, - false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -142,7 +141,6 @@ func TestMultiTSDB(t *testing.T) { nil, false, metadata.NoneFunc, - false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -186,7 +184,6 @@ func TestMultiTSDB(t *testing.T) { nil, false, metadata.NoneFunc, - false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -454,7 +451,6 @@ func TestMultiTSDBPrune(t *testing.T) { test.bucket, false, metadata.NoneFunc, - false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -528,7 +524,6 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) { objstore.NewInMemBucket(), false, metadata.NoneFunc, - false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -590,7 +585,6 @@ func TestAlignedHeadFlush(t *testing.T) { test.bucket, false, metadata.NoneFunc, - false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -665,7 +659,6 @@ func TestMultiTSDBStats(t *testing.T) { nil, false, metadata.NoneFunc, - false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -695,7 +688,6 @@ func TestMultiTSDBWithNilStore(t *testing.T) { nil, false, metadata.NoneFunc, - false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -737,7 +729,6 @@ func TestProxyLabelValues(t *testing.T) { nil, false, metadata.NoneFunc, - false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -828,7 +819,6 @@ func BenchmarkMultiTSDB(b *testing.B) { nil, false, metadata.NoneFunc, - false, ) defer func() { testutil.Ok(b, m.Close()) }() diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index e4fb686554..ea1b6d81fd 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -810,7 +810,6 @@ func initializeMultiTSDB(dir string) *MultiTSDB { bucket, false, metadata.NoneFunc, - false, ) return m diff --git a/pkg/receive/writer_test.go b/pkg/receive/writer_test.go index 468bac516c..34613794b8 100644 --- a/pkg/receive/writer_test.go +++ b/pkg/receive/writer_test.go @@ -344,7 +344,6 @@ func TestWriter(t *testing.T) { nil, false, metadata.NoneFunc, - false, ) t.Cleanup(func() { testutil.Ok(t, m.Close()) }) @@ -437,7 +436,6 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr nil, false, metadata.NoneFunc, - false, ) b.Cleanup(func() { testutil.Ok(b, m.Close()) }) diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index 6e61231772..793f16eb60 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -1019,7 +1019,7 @@ func TestTSDBStore_Acceptance(t *testing.T) { tt.Cleanup(func() { testutil.Ok(tt, db.Close()) }) appendFn(db.Appender(context.Background())) - return NewTSDBStore(nil, db, component.Rule, extLset, false) + return NewTSDBStore(nil, db, component.Rule, extLset) } testStoreAPIsAcceptance(t, startStore) @@ -1173,7 +1173,7 @@ func TestProxyStoreWithReplicas_Acceptance(t *testing.T) { tt.Cleanup(func() { testutil.Ok(tt, db.Close()) }) appendFn(db.Appender(context.Background())) - return NewTSDBStore(nil, db, component.Rule, extLset, false) + return NewTSDBStore(nil, db, component.Rule, extLset) } diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 7eeddadf4e..08c3122e32 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -31,13 +31,28 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" ) -const RemoteReadFrameLimit = 1048576 +const ( + RemoteReadFrameLimit = 1048576 + cuckooStoreFilterCapacity = 1000000 + storeFilterUpdateInterval = 15 * time.Second +) type TSDBReader interface { storage.ChunkQueryable StartTime() (int64, error) } +// TSDBStoreOption is a functional option for TSDBStore. +type TSDBStoreOption func(s *TSDBStore) + +// WithCuckooMetricNameStoreFilter returns a TSDBStoreOption that enables the Cuckoo filter for metric names. +func WithCuckooMetricNameStoreFilter() TSDBStoreOption { + return func(s *TSDBStore) { + s.storeFilter = filter.NewCuckooMetricNameStoreFilter(cuckooStoreFilterCapacity) + s.startStoreFilterUpdate = true + } +} + // TSDBStore implements the store API against a local TSDB instance. // It attaches the provided external labels to all results. It only responds with raw data // and does not support downsampling. @@ -48,10 +63,11 @@ type TSDBStore struct { buffers sync.Pool maxBytesPerFrame int - extLset labels.Labels - storeFilter filter.StoreFilter - mtx sync.RWMutex - close func() + extLset labels.Labels + startStoreFilterUpdate bool + storeFilter filter.StoreFilter + mtx sync.RWMutex + close func() storepb.UnimplementedStoreServer } @@ -73,29 +89,24 @@ type ReadWriteTSDBStore struct { // NewTSDBStore creates a new TSDBStore. // NOTE: Given lset has to be sorted. -func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI, extLset labels.Labels, metricNameFilterEnabled bool) *TSDBStore { +func NewTSDBStore( + logger log.Logger, + db TSDBReader, + component component.StoreAPI, + extLset labels.Labels, + options ...TSDBStoreOption, +) *TSDBStore { if logger == nil { logger = log.NewNopLogger() } - var ( - storeFilter filter.StoreFilter - startFilterUpdate bool - ) - - storeFilter = filter.AllowAllStoreFilter{} - if metricNameFilterEnabled { - startFilterUpdate = true - storeFilter = filter.NewCuckooMetricNameStoreFilter(1000000) // about 1MB on 64bit machines. - } - st := &TSDBStore{ logger: logger, db: db, component: component, extLset: extLset, - storeFilter: storeFilter, maxBytesPerFrame: RemoteReadFrameLimit, + storeFilter: filter.AllowAllStoreFilter{}, close: func() {}, buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) @@ -103,13 +114,16 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI }}, } - if startFilterUpdate { - t := time.NewTicker(15 * time.Second) + for _, option := range options { + option(st) + } + + if st.startStoreFilterUpdate { ctx, cancel := context.WithCancel(context.Background()) - updateFilter := func() { - vals, err := st.LabelValues(context.Background(), &storepb.LabelValuesRequest{ + + updateFilter := func(ctx context.Context) { + vals, err := st.LabelValues(ctx, &storepb.LabelValuesRequest{ Label: model.MetricNameLabel, - Start: 0, End: math.MaxInt64, }) if err != nil { @@ -120,13 +134,15 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI st.storeFilter.ResetAndSet(vals.Values...) } st.close = cancel - updateFilter() + updateFilter(ctx) + + t := time.NewTicker(storeFilterUpdateInterval) go func() { for { select { case <-t.C: - updateFilter() + updateFilter(ctx) case <-ctx.Done(): return } diff --git a/pkg/store/tsdb_test.go b/pkg/store/tsdb_test.go index 5c723a7cbd..5a6457a6fe 100644 --- a/pkg/store/tsdb_test.go +++ b/pkg/store/tsdb_test.go @@ -39,7 +39,7 @@ func TestTSDBStore_Series_ChunkChecksum(t *testing.T) { defer func() { testutil.Ok(t, db.Close()) }() testutil.Ok(t, err) - tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west"), false) + tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west")) appender := db.Appender(context.Background()) @@ -79,7 +79,7 @@ func TestTSDBStore_Series(t *testing.T) { defer func() { testutil.Ok(t, db.Close()) }() testutil.Ok(t, err) - tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west"), false) + tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west")) appender := db.Appender(context.Background()) @@ -251,7 +251,7 @@ func TestTSDBStore_SeriesAccessWithDelegateClosing(t *testing.T) { }) extLabels := labels.FromStrings("ext", "1") - store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels, false) + store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels) srv := storetestutil.NewSeriesServer(context.Background()) csrv := &delegatorServer{SeriesServer: srv} @@ -414,7 +414,7 @@ func TestTSDBStore_SeriesAccessWithoutDelegateClosing(t *testing.T) { }) extLabels := labels.FromStrings("ext", "1") - store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels, false) + store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels) srv := storetestutil.NewSeriesServer(context.Background()) t.Run("call series and access results", func(t *testing.T) { @@ -555,7 +555,7 @@ func benchTSDBStoreSeries(t testutil.TB, totalSamples, totalSeries int) { defer func() { testutil.Ok(t, db.Close()) }() extLabels := labels.FromStrings("ext", "1") - store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels, false) + store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels) var expected []*storepb.Series for _, resp := range resps { From 701d312a1b0979cd352953775f6b94bf9196ae89 Mon Sep 17 00:00:00 2001 From: Mindaugas Niaura Date: Tue, 1 Oct 2024 16:42:58 +0300 Subject: [PATCH 8/8] rebase on main Signed-off-by: Mindaugas Niaura --- pkg/api/query/v1_test.go | 2 +- pkg/store/storepb/testutil/client.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index e36bea8ea4..923a9cf411 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -785,7 +785,7 @@ func TestQueryAnalyzeEndpoints(t *testing.T) { func newProxyStoreWithTSDBStore(db store.TSDBReader) *store.ProxyStore { c := &storetestutil.TestClient{ Name: "1", - StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, labels.EmptyLabels())), + StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, nil)), MinTime: math.MinInt64, MaxTime: math.MaxInt64, } diff --git a/pkg/store/storepb/testutil/client.go b/pkg/store/storepb/testutil/client.go index 419c3e0a0e..664819a1a2 100644 --- a/pkg/store/storepb/testutil/client.go +++ b/pkg/store/storepb/testutil/client.go @@ -20,13 +20,13 @@ type TestClient struct { Shardable bool WithoutReplicaLabelsEnabled bool IsLocalStore bool - StoreTSDBInfos []*infopb.TSDBInfo + StoreTSDBInfos []infopb.TSDBInfo StoreFilterNotMatches bool } func (c TestClient) LabelSets() []labels.Labels { return c.ExtLset } func (c TestClient) TimeRange() (mint, maxt int64) { return c.MinTime, c.MaxTime } -func (c TestClient) TSDBInfos() []*infopb.TSDBInfo { return c.StoreTSDBInfos } +func (c TestClient) TSDBInfos() []infopb.TSDBInfo { return c.StoreTSDBInfos } func (c TestClient) SupportsSharding() bool { return c.Shardable } func (c TestClient) SupportsWithoutReplicaLabels() bool { return c.WithoutReplicaLabelsEnabled } func (c TestClient) String() string { return c.Name }