diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 47d63802a0..30fbf362c2 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -53,7 +53,10 @@ import ( "github.com/thanos-io/thanos/pkg/tls" ) -const compressionNone = "none" +const ( + compressionNone = "none" + metricNamesFilter = "metric-names-filter" +) func registerReceive(app *extkingpin.App) { cmd := app.Command(component.Receive.String(), "Accept Prometheus remote write API requests and write to local tsdb.") @@ -136,6 +139,14 @@ func runReceive( level.Info(logger).Log("mode", receiveMode, "msg", "running receive") + multiTSDBOptions := []receive.MultiTSDBOption{} + for _, feature := range *conf.featureList { + if feature == metricNamesFilter { + multiTSDBOptions = append(multiTSDBOptions, receive.WithMetricNameFilterEnabled()) + level.Info(logger).Log("msg", "metric name filter feature enabled") + } + } + rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), conf.rwServerCert, conf.rwServerKey, conf.rwServerClientCA) if err != nil { return err @@ -215,6 +226,7 @@ func runReceive( bkt, conf.allowOutOfOrderUpload, hashFunc, + multiTSDBOptions..., ) writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, &receive.WriterOptions{ Intern: conf.writerInterning, @@ -845,6 +857,8 @@ type receiveConfig struct { limitsConfigReloadTimer time.Duration asyncForwardWorkerCount uint + + featureList *[]string } func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -985,6 +999,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden()) cmd.Flag("receive.limits-config-reload-timer", "Minimum amount of time to pass for the limit configuration to be reloaded. Helps to avoid excessive reloads."). Default("1s").Hidden().DurationVar(&rc.limitsConfigReloadTimer) + + rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings() } // determineMode returns the ReceiverMode that this receiver is configured to run in. diff --git a/docs/components/receive.md b/docs/components/receive.md index aedfb3f5af..31091e5314 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -339,6 +339,9 @@ Flags: detected maximum container or system memory. --enable-auto-gomemlimit Enable go runtime to automatically limit memory consumption. + --enable-feature= ... Comma separated experimental feature names + to enable. The current list of features is + metric-names-filter. --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable diff --git a/go.mod b/go.mod index c3b72969a2..3e66d11b07 100644 --- a/go.mod +++ b/go.mod @@ -120,11 +120,14 @@ require ( github.com/mitchellh/go-ps v1.0.0 github.com/onsi/gomega v1.34.2 github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 + github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771 go.opentelemetry.io/contrib/propagators/autoprop v0.54.0 go4.org/intern v0.0.0-20230525184215-6c62f75575cb golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 ) +require github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect + require ( cloud.google.com/go/auth v0.5.1 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect diff --git a/go.sum b/go.sum index d3506e381a..ce27da9bb1 100644 --- a/go.sum +++ b/go.sum @@ -1508,6 +1508,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE= github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA= +github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E= +github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= github.com/digitalocean/godo v1.117.0 h1:WVlTe09melDYTd7VCVyvHcNWbgB+uI1O115+5LOtdSw= github.com/digitalocean/godo v1.117.0/go.mod h1:Vk0vpCot2HOAJwc5WE8wljZGtJ3ZtWIc8MQ8rF38sdo= github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= @@ -2175,6 +2177,8 @@ github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybL github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.27 h1:yGAraK1uUjlhSXgNMIy8o/J4LFNcy7yeipBqt9N9mVg= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.27/go.mod h1:fCa7OJZ/9DRTnOKmxvT6pn+LPWUptQAmHF/SBJUGEcg= +github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771 h1:emzAzMZ1L9iaKCTxdy3Em8Wv4ChIAGnfiz18Cda70g4= +github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg= github.com/sercand/kuberesolver/v5 v5.1.1 h1:CYH+d67G0sGBj7q5wLK61yzqJJ8gLLC8aeprPTHb6yY= github.com/sercand/kuberesolver/v5 v5.1.1/go.mod h1:Fs1KbKhVRnB2aDWN12NjKCB+RgYMWZJ294T3BtmVCpQ= github.com/shirou/gopsutil/v3 v3.21.2/go.mod h1:ghfMypLDrFSWN2c9cDYFLHyynQ+QUht0cv/18ZqVczw= @@ -3206,6 +3210,7 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= diff --git a/pkg/filter/cuckoo.go b/pkg/filter/cuckoo.go new file mode 100644 index 0000000000..0cdce6dc94 --- /dev/null +++ b/pkg/filter/cuckoo.go @@ -0,0 +1,45 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package filter + +import ( + "sync" + "unsafe" + + "github.com/prometheus/prometheus/model/labels" + cuckoo "github.com/seiflotfy/cuckoofilter" +) + +type CuckooMetricNameStoreFilter struct { + filter *cuckoo.Filter + mtx sync.RWMutex +} + +func NewCuckooMetricNameStoreFilter(capacity uint) *CuckooMetricNameStoreFilter { + return &CuckooMetricNameStoreFilter{ + filter: cuckoo.NewFilter(capacity), + } +} + +func (f *CuckooMetricNameStoreFilter) Matches(matchers []*labels.Matcher) bool { + f.mtx.RLock() + defer f.mtx.RUnlock() + + for _, m := range matchers { + if m.Type == labels.MatchEqual && m.Name == labels.MetricName { + return f.filter.Lookup([]byte(m.Value)) + } + } + + return true +} + +func (f *CuckooMetricNameStoreFilter) ResetAndSet(values ...string) { + f.mtx.Lock() + defer f.mtx.Unlock() + f.filter.Reset() + for _, value := range values { + f.filter.Insert(unsafe.Slice(unsafe.StringData(value), len(value))) + } +} diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go new file mode 100644 index 0000000000..f5cc068cf5 --- /dev/null +++ b/pkg/filter/filter.go @@ -0,0 +1,22 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package filter + +import "github.com/prometheus/prometheus/model/labels" + +type StoreFilter interface { + // Matches returns true if the filter matches the given matchers. + Matches(matchers []*labels.Matcher) bool + + // ResetAndSet resets the filter and sets it to the given values. + ResetAndSet(values ...string) +} + +type AllowAllStoreFilter struct{} + +func (f AllowAllStoreFilter) Matches(matchers []*labels.Matcher) bool { + return true +} + +func (f AllowAllStoreFilter) ResetAndSet(values ...string) {} diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 226bbb4730..59234c7afe 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -815,6 +815,10 @@ func (er *endpointRef) apisPresent() []string { return apisPresent } +func (er *endpointRef) Matches(matchers []*labels.Matcher) bool { + return true +} + type endpointMetadata struct { *infopb.InfoResponse } diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 62e39ab500..a5765604f3 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -70,6 +70,18 @@ type MultiTSDB struct { exemplarClients map[string]*exemplars.TSDB exemplarClientsNeedUpdate bool + + metricNameFilterEnabled bool +} + +// MultiTSDBOption is a functional option for MultiTSDB. +type MultiTSDBOption func(mt *MultiTSDB) + +// WithMetricNameFilterEnabled enables metric name filtering on TSDB clients. +func WithMetricNameFilterEnabled() MultiTSDBOption { + return func(s *MultiTSDB) { + s.metricNameFilterEnabled = true + } } // NewMultiTSDB creates new MultiTSDB. @@ -84,12 +96,13 @@ func NewMultiTSDB( bucket objstore.Bucket, allowOutOfOrderUpload bool, hashFunc metadata.HashFunc, + options ...MultiTSDBOption, ) *MultiTSDB { if l == nil { l = log.NewNopLogger() } - return &MultiTSDB{ + mt := &MultiTSDB{ dataDir: dataDir, logger: log.With(l, "component", "multi-tsdb"), reg: reg, @@ -104,6 +117,12 @@ func NewMultiTSDB( allowOutOfOrderUpload: allowOutOfOrderUpload, hashFunc: hashFunc, } + + for _, option := range options { + option(mt) + } + + return mt } type localClient struct { @@ -179,6 +198,10 @@ func newLocalClient(store *store.TSDBStore) *localClient { } } +func (l *localClient) Matches(matchers []*labels.Matcher) bool { + return l.store.Matches(matchers) +} + func (l *localClient) LabelSets() []labels.Labels { return labelpb.ZLabelSetsToPromLabelSets(l.store.LabelSet()...) } @@ -302,6 +325,9 @@ func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *ship } func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, tenantTSDB *tsdb.DB) { + if storeTSDB == nil && t.storeTSDB != nil { + t.storeTSDB.Close() + } t.storeTSDB = storeTSDB t.ship = ship t.exemplarsTSDB = exemplarsTSDB @@ -751,7 +777,11 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant shipper.DefaultMetaFilename, ) } - tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset), s, ship, exemplars.NewTSDB(s, lset)) + options := []store.TSDBStoreOption{} + if t.metricNameFilterEnabled { + options = append(options, store.WithCuckooMetricNameStoreFilter()) + } + tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset)) level.Info(logger).Log("msg", "TSDB is now ready") return nil } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 08cd36e485..26cc160c35 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -71,6 +71,9 @@ type Client interface { // Addr returns address of the store client. If second parameter is true, the client // represents a local client (server-as-client) and has no remote address. Addr() (addr string, isLocalClient bool) + + // Matches returns true if provided label matchers are allowed in the store. + Matches(matches []*labels.Matcher) bool } // ProxyStore implements the store API that proxies request to all given underlying stores. @@ -590,6 +593,11 @@ func storeMatches(ctx context.Context, s Client, mint, maxt int64, matchers ...* if !LabelSetsMatch(matchers, extLset...) { return false, fmt.Sprintf("external labels %v does not match request label matchers: %v", extLset, matchers) } + + if !s.Matches(matchers) { + return false, fmt.Sprintf("store does not match filter for matchers: %v", matchers) + } + return true, "" } diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 1ac6346acd..05f13d4bac 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -1925,6 +1925,30 @@ func TestStoreMatches(t *testing.T) { maxt: 1, expectedMatch: true, }, + { + s: &storetestutil.TestClient{ExtLset: []labels.Labels{labels.FromStrings("a", "b")}}, + ms: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "a", "b"), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric_name"), + }, + maxt: 1, + expectedMatch: true, + }, + { + s: &storetestutil.TestClient{ + ExtLset: []labels.Labels{ + labels.FromStrings("a", "b"), + }, + StoreFilterNotMatches: true, + }, + ms: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "a", "b"), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric_name"), + }, + maxt: 1, + expectedMatch: false, + expectedReason: "store does not match filter for matchers: [a=\"b\" __name__=\"test_metric_name\"]", + }, } { t.Run("", func(t *testing.T) { ok, reason := storeMatches(context.TODO(), c.s, c.mint, c.maxt, c.ms...) diff --git a/pkg/store/storepb/testutil/client.go b/pkg/store/storepb/testutil/client.go index 90874842d6..664819a1a2 100644 --- a/pkg/store/storepb/testutil/client.go +++ b/pkg/store/storepb/testutil/client.go @@ -21,12 +21,14 @@ type TestClient struct { WithoutReplicaLabelsEnabled bool IsLocalStore bool StoreTSDBInfos []infopb.TSDBInfo + StoreFilterNotMatches bool } -func (c TestClient) LabelSets() []labels.Labels { return c.ExtLset } -func (c TestClient) TimeRange() (mint, maxt int64) { return c.MinTime, c.MaxTime } -func (c TestClient) TSDBInfos() []infopb.TSDBInfo { return c.StoreTSDBInfos } -func (c TestClient) SupportsSharding() bool { return c.Shardable } -func (c TestClient) SupportsWithoutReplicaLabels() bool { return c.WithoutReplicaLabelsEnabled } -func (c TestClient) String() string { return c.Name } -func (c TestClient) Addr() (string, bool) { return c.Name, c.IsLocalStore } +func (c TestClient) LabelSets() []labels.Labels { return c.ExtLset } +func (c TestClient) TimeRange() (mint, maxt int64) { return c.MinTime, c.MaxTime } +func (c TestClient) TSDBInfos() []infopb.TSDBInfo { return c.StoreTSDBInfos } +func (c TestClient) SupportsSharding() bool { return c.Shardable } +func (c TestClient) SupportsWithoutReplicaLabels() bool { return c.WithoutReplicaLabelsEnabled } +func (c TestClient) String() string { return c.Name } +func (c TestClient) Addr() (string, bool) { return c.Name, c.IsLocalStore } +func (c TestClient) Matches(matches []*labels.Matcher) bool { return !c.StoreFilterNotMatches } diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index b4cea0022b..08c3122e32 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -11,9 +11,12 @@ import ( "sort" "strings" "sync" + "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "google.golang.org/grpc" @@ -21,19 +24,35 @@ import ( "google.golang.org/grpc/status" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" ) -const RemoteReadFrameLimit = 1048576 +const ( + RemoteReadFrameLimit = 1048576 + cuckooStoreFilterCapacity = 1000000 + storeFilterUpdateInterval = 15 * time.Second +) type TSDBReader interface { storage.ChunkQueryable StartTime() (int64, error) } +// TSDBStoreOption is a functional option for TSDBStore. +type TSDBStoreOption func(s *TSDBStore) + +// WithCuckooMetricNameStoreFilter returns a TSDBStoreOption that enables the Cuckoo filter for metric names. +func WithCuckooMetricNameStoreFilter() TSDBStoreOption { + return func(s *TSDBStore) { + s.storeFilter = filter.NewCuckooMetricNameStoreFilter(cuckooStoreFilterCapacity) + s.startStoreFilterUpdate = true + } +} + // TSDBStore implements the store API against a local TSDB instance. // It attaches the provided external labels to all results. It only responds with raw data // and does not support downsampling. @@ -44,8 +63,16 @@ type TSDBStore struct { buffers sync.Pool maxBytesPerFrame int - extLset labels.Labels - mtx sync.RWMutex + extLset labels.Labels + startStoreFilterUpdate bool + storeFilter filter.StoreFilter + mtx sync.RWMutex + close func() + storepb.UnimplementedStoreServer +} + +func (s *TSDBStore) Close() { + s.close() } func RegisterWritableStoreServer(storeSrv storepb.WriteableStoreServer) func(*grpc.Server) { @@ -62,21 +89,68 @@ type ReadWriteTSDBStore struct { // NewTSDBStore creates a new TSDBStore. // NOTE: Given lset has to be sorted. -func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI, extLset labels.Labels) *TSDBStore { +func NewTSDBStore( + logger log.Logger, + db TSDBReader, + component component.StoreAPI, + extLset labels.Labels, + options ...TSDBStoreOption, +) *TSDBStore { if logger == nil { logger = log.NewNopLogger() } - return &TSDBStore{ + + st := &TSDBStore{ logger: logger, db: db, component: component, extLset: extLset, maxBytesPerFrame: RemoteReadFrameLimit, + storeFilter: filter.AllowAllStoreFilter{}, + close: func() {}, buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) return &b }}, } + + for _, option := range options { + option(st) + } + + if st.startStoreFilterUpdate { + ctx, cancel := context.WithCancel(context.Background()) + + updateFilter := func(ctx context.Context) { + vals, err := st.LabelValues(ctx, &storepb.LabelValuesRequest{ + Label: model.MetricNameLabel, + End: math.MaxInt64, + }) + if err != nil { + level.Error(logger).Log("msg", "failed to update metric names", "err", err) + return + } + + st.storeFilter.ResetAndSet(vals.Values...) + } + st.close = cancel + updateFilter(ctx) + + t := time.NewTicker(storeFilterUpdateInterval) + + go func() { + for { + select { + case <-t.C: + updateFilter(ctx) + case <-ctx.Done(): + return + } + } + }() + } + + return st } func (s *TSDBStore) SetExtLset(extLset labels.Labels) { @@ -133,6 +207,10 @@ func (s *TSDBStore) TimeRange() (int64, int64) { return minTime, math.MaxInt64 } +func (s *TSDBStore) Matches(matchers []*labels.Matcher) bool { + return s.storeFilter.Matches(matchers) +} + // CloseDelegator allows to delegate close (releasing resources used by request to the server). // This is useful when we invoke StoreAPI within another StoreAPI and results are ephemeral until copied. type CloseDelegator interface {