From 6cff35b4d7223b6a168a77a024e35430e76b7696 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Thu, 23 Jan 2025 11:10:35 +0100 Subject: [PATCH 1/3] Add -server.cluster flag Signed-off-by: Arve Knudsen --- cmd/mimir/config-descriptor.json | 10 ++++++ cmd/mimir/help-all.txt.tmpl | 2 ++ cmd/mimir/help.txt.tmpl | 2 ++ .../config/datasources.yaml | 3 ++ .../config/grafana-agent.yaml | 2 ++ .../config/mimir.yaml | 3 ++ .../config/prom-ha-pair-1.yaml | 2 ++ .../config/prom-ha-pair-2.yaml | 2 ++ .../config/prometheus.yaml | 2 ++ .../docker-compose.yml | 2 +- .../config/datasource-mimir.yaml | 5 ++- .../config/grafana-agent.yaml | 2 ++ .../mimir-monolithic-mode/config/mimir.yaml | 3 ++ .../config/prometheus.yaml | 2 ++ .../configuration-parameters/index.md | 5 +++ go.mod | 2 +- go.sum | 4 +-- pkg/api/api.go | 5 +++ pkg/api/middleware.go | 31 +++++++++++++++++++ .../github.com/grafana/dskit/server/server.go | 7 +++++ vendor/modules.txt | 2 +- 21 files changed, 92 insertions(+), 6 deletions(-) create mode 100644 pkg/api/middleware.go diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index b8541ad387d..73006ccea77 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -223,6 +223,16 @@ "fieldType": "boolean", "fieldCategory": "experimental" }, + { + "kind": "field", + "name": "cluster", + "required": false, + "desc": "Optionally define the server's cluster, and enable validation that requests are for the same cluster.", + "fieldValue": null, + "fieldDefaultValue": "", + "fieldFlag": "server.cluster", + "fieldType": "string" + }, { "kind": "field", "name": "tls_cipher_suites", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 46edd259604..cbe281813d5 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -3091,6 +3091,8 @@ Usage of ./cmd/mimir/mimir: Comma separated list of yaml files with the configuration that can be updated at runtime. Runtime config files will be merged from left to right. -runtime-config.reload-period duration How often to check runtime config files. (default 10s) + -server.cluster string + Optionally define the server's cluster, and enable validation that requests are for the same cluster. -server.graceful-shutdown-timeout duration Timeout for graceful shutdowns (default 30s) -server.grpc-conn-limit int diff --git a/cmd/mimir/help.txt.tmpl b/cmd/mimir/help.txt.tmpl index 28356aad079..d63b16726c8 100644 --- a/cmd/mimir/help.txt.tmpl +++ b/cmd/mimir/help.txt.tmpl @@ -763,6 +763,8 @@ Usage of ./cmd/mimir/mimir: The tenant's shard size when sharding is used by ruler. Value of 0 disables shuffle sharding for the tenant, and tenant rules will be sharded across all ruler replicas. -runtime-config.file comma-separated-list-of-strings Comma separated list of yaml files with the configuration that can be updated at runtime. Runtime config files will be merged from left to right. + -server.cluster string + Optionally define the server's cluster, and enable validation that requests are for the same cluster. -server.grpc-listen-address string gRPC server listen address. -server.grpc-listen-port int diff --git a/development/mimir-microservices-mode/config/datasources.yaml b/development/mimir-microservices-mode/config/datasources.yaml index da565b5cb13..d44c7b532b3 100644 --- a/development/mimir-microservices-mode/config/datasources.yaml +++ b/development/mimir-microservices-mode/config/datasources.yaml @@ -12,6 +12,9 @@ datasources: exemplarTraceIdDestinations: - name: traceID datasourceUid: jaeger + httpHeaderName1: 'X-Cluster' + secureJsonData: + httpHeaderValue1: 'development' - name: Prometheus type: prometheus diff --git a/development/mimir-microservices-mode/config/grafana-agent.yaml b/development/mimir-microservices-mode/config/grafana-agent.yaml index 14304e96955..27a2bd7a2a9 100644 --- a/development/mimir-microservices-mode/config/grafana-agent.yaml +++ b/development/mimir-microservices-mode/config/grafana-agent.yaml @@ -50,3 +50,5 @@ prometheus: remote_write: - url: http://distributor-1:8000/api/v1/push + headers: + 'X-Cluster': 'development' diff --git a/development/mimir-microservices-mode/config/mimir.yaml b/development/mimir-microservices-mode/config/mimir.yaml index d90ea91fede..dadf157a2c5 100644 --- a/development/mimir-microservices-mode/config/mimir.yaml +++ b/development/mimir-microservices-mode/config/mimir.yaml @@ -1,3 +1,6 @@ +server: + cluster: development + multitenancy_enabled: false distributor: diff --git a/development/mimir-microservices-mode/config/prom-ha-pair-1.yaml b/development/mimir-microservices-mode/config/prom-ha-pair-1.yaml index e6ba48b80e2..4ea75e6b8f7 100644 --- a/development/mimir-microservices-mode/config/prom-ha-pair-1.yaml +++ b/development/mimir-microservices-mode/config/prom-ha-pair-1.yaml @@ -33,3 +33,5 @@ remote_write: - url: http://distributor-2:8001/api/v1/push send_native_histograms: true send_exemplars: true + headers: + 'X-Cluster': 'development' diff --git a/development/mimir-microservices-mode/config/prom-ha-pair-2.yaml b/development/mimir-microservices-mode/config/prom-ha-pair-2.yaml index c55e31e1ab3..34c51e16327 100644 --- a/development/mimir-microservices-mode/config/prom-ha-pair-2.yaml +++ b/development/mimir-microservices-mode/config/prom-ha-pair-2.yaml @@ -33,3 +33,5 @@ remote_write: - url: http://distributor-2:8001/api/v1/push send_native_histograms: true send_exemplars: true + headers: + 'X-Cluster': 'development' diff --git a/development/mimir-microservices-mode/config/prometheus.yaml b/development/mimir-microservices-mode/config/prometheus.yaml index f2d30912e3b..a05778cafe5 100644 --- a/development/mimir-microservices-mode/config/prometheus.yaml +++ b/development/mimir-microservices-mode/config/prometheus.yaml @@ -47,6 +47,8 @@ remote_write: - url: http://distributor-1:8000/api/v1/push send_native_histograms: true send_exemplars: true + headers: + 'X-Cluster': 'development' rule_files: - '/etc/mixin/mimir-alerts.yaml' diff --git a/development/mimir-microservices-mode/docker-compose.yml b/development/mimir-microservices-mode/docker-compose.yml index d59b39e809d..d11ecde5268 100644 --- a/development/mimir-microservices-mode/docker-compose.yml +++ b/development/mimir-microservices-mode/docker-compose.yml @@ -120,7 +120,7 @@ "command": - "sh" - "-c" - - "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=continuous-test -server.http-listen-port=8090 -server.grpc-listen-port=9090 -activity-tracker.filepath=/activity/continuous-test-8090 -tests.run-interval=2m -tests.read-endpoint=http://query-frontend:8007/prometheus -tests.tenant-id=mimir-continuous-test -tests.write-endpoint=http://distributor-1:8000 -tests.write-read-series-test.max-query-age=1h -tests.write-read-series-test.num-series=100 -memberlist.nodename=continuous-test -memberlist.bind-port=10090 -ingester.ring.store=memberlist -distributor.ring.store=memberlist -compactor.ring.store=memberlist -store-gateway.sharding-ring.store=memberlist -ruler.ring.store=memberlist -alertmanager.sharding-ring.store=memberlist -blocks-storage.bucket-store.index-cache.backend=memcached -blocks-storage.bucket-store.chunks-cache.backend=memcached -blocks-storage.bucket-store.metadata-cache.backend=memcached -query-frontend.results-cache.backend=memcached -ruler-storage.cache.backend=memcached -blocks-storage.bucket-store.index-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.chunks-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.metadata-cache.memcached.addresses=dns+memcached:11211 -query-frontend.results-cache.memcached.addresses=dns+memcached:11211 -ruler-storage.cache.memcached.addresses=dns+memcached:11211" + - "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=continuous-test -server.cluster=development -server.http-listen-port=8090 -server.grpc-listen-port=9090 -activity-tracker.filepath=/activity/continuous-test-8090 -tests.run-interval=2m -tests.read-endpoint=http://query-frontend:8007/prometheus -tests.tenant-id=mimir-continuous-test -tests.write-endpoint=http://distributor-1:8000 -tests.write-read-series-test.max-query-age=1h -tests.write-read-series-test.num-series=100 -memberlist.nodename=continuous-test -memberlist.bind-port=10090 -ingester.ring.store=memberlist -distributor.ring.store=memberlist -compactor.ring.store=memberlist -store-gateway.sharding-ring.store=memberlist -ruler.ring.store=memberlist -alertmanager.sharding-ring.store=memberlist -blocks-storage.bucket-store.index-cache.backend=memcached -blocks-storage.bucket-store.chunks-cache.backend=memcached -blocks-storage.bucket-store.metadata-cache.backend=memcached -query-frontend.results-cache.backend=memcached -ruler-storage.cache.backend=memcached -blocks-storage.bucket-store.index-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.chunks-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.metadata-cache.memcached.addresses=dns+memcached:11211 -query-frontend.results-cache.memcached.addresses=dns+memcached:11211 -ruler-storage.cache.memcached.addresses=dns+memcached:11211" "depends_on": - "minio" - "distributor-1" diff --git a/development/mimir-monolithic-mode/config/datasource-mimir.yaml b/development/mimir-monolithic-mode/config/datasource-mimir.yaml index 4b3a658a5e1..e17e8d47bfa 100644 --- a/development/mimir-monolithic-mode/config/datasource-mimir.yaml +++ b/development/mimir-monolithic-mode/config/datasource-mimir.yaml @@ -12,9 +12,12 @@ datasources: exemplarTraceIdDestinations: - name: traceID datasourceUid: jaeger + httpHeaderName1: 'X-Cluster' + secureJsonData: + httpHeaderValue1: 'development' - name: Jaeger type: jaeger access: proxy uid: jaeger orgID: 1 - url: http://jaeger:16686/ \ No newline at end of file + url: http://jaeger:16686/ diff --git a/development/mimir-monolithic-mode/config/grafana-agent.yaml b/development/mimir-monolithic-mode/config/grafana-agent.yaml index 9eeddc1bac4..abf081527ad 100644 --- a/development/mimir-monolithic-mode/config/grafana-agent.yaml +++ b/development/mimir-monolithic-mode/config/grafana-agent.yaml @@ -23,3 +23,5 @@ prometheus: remote_write: - url: http://mimir-1:8001/api/v1/push + headers: + 'X-Cluster': 'development' diff --git a/development/mimir-monolithic-mode/config/mimir.yaml b/development/mimir-monolithic-mode/config/mimir.yaml index ef730e3b9da..193ef5a5156 100644 --- a/development/mimir-monolithic-mode/config/mimir.yaml +++ b/development/mimir-monolithic-mode/config/mimir.yaml @@ -1,5 +1,8 @@ multitenancy_enabled: false +server: + cluster: development + distributor: pool: health_check_ingesters: true diff --git a/development/mimir-monolithic-mode/config/prometheus.yaml b/development/mimir-monolithic-mode/config/prometheus.yaml index 86194f2cfd9..3328572ff6f 100644 --- a/development/mimir-monolithic-mode/config/prometheus.yaml +++ b/development/mimir-monolithic-mode/config/prometheus.yaml @@ -20,3 +20,5 @@ scrape_configs: remote_write: - url: http://mimir-1:8001/api/v1/push send_native_histograms: true + headers: + 'X-Cluster': 'development' diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 7ebe325c96e..d81a607a0c2 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -553,6 +553,11 @@ The `server` block configures the HTTP and gRPC server of the launched service(s # CLI flag: -server.proxy-protocol-enabled [proxy_protocol_enabled: | default = false] +# Optionally define the server's cluster, and enable validation that requests +# are for the same cluster. +# CLI flag: -server.cluster +[cluster: | default = ""] + # Comma-separated list of cipher suites to use. If blank, the default Go cipher # suites is used. # CLI flag: -server.tls-cipher-suites diff --git a/go.mod b/go.mod index 705a726dfa8..dd07c0b4bb9 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20250128193928-104df19e2080 + github.com/grafana/dskit v0.0.0-20250123101449-feb230ca9dc2 github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/influxdata/influxdb/v2 v2.7.11 diff --git a/go.sum b/go.sum index 2c05c10b0f0..7074ee24e67 100644 --- a/go.sum +++ b/go.sum @@ -1271,8 +1271,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20250113170557-b4ab2ba363a8 h1:mdI6P22PgFD7bQ0Yf4h8cfHSldak4nxogvlsTHZyZmc= github.com/grafana/alerting v0.0.0-20250113170557-b4ab2ba363a8/go.mod h1:QsnoKX/iYZxA4Cv+H+wC7uxutBD8qi8ZW5UJvD2TYmU= -github.com/grafana/dskit v0.0.0-20250128193928-104df19e2080 h1:IHRsAMdemxPu9g9zPxTFcU3hLhfd5cl6W4fqRovAzkU= -github.com/grafana/dskit v0.0.0-20250128193928-104df19e2080/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= +github.com/grafana/dskit v0.0.0-20250123101449-feb230ca9dc2 h1:2ZN2dTx3NDEvREKr5UH6qaAtI+g6wkItol9TzZMGhcQ= +github.com/grafana/dskit v0.0.0-20250123101449-feb230ca9dc2/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937 h1:fwwnG/NcygoS6XbAaEyK2QzMXI/BZIEJvQ3CD+7XZm8= diff --git a/pkg/api/api.go b/pkg/api/api.go index 9e1df52a340..a89277a2bfb 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -158,6 +158,11 @@ func (a *API) RegisterRoutesWithPrefix(prefix string, handler http.Handler, auth } func (a *API) newRoute(path string, handler http.Handler, isPrefix, auth, gzip bool, methods ...string) (route *mux.Route) { + if a.server.Cluster() != "" { + level.Debug(a.logger).Log("msg", "api: enabling cluster validation middleware", "path", path, "cluster", a.server.Cluster()) + handler = ClusterValidationMiddleware(a.server.Cluster(), a.logger).Wrap(handler) + } + // Propagate the consistency level on all HTTP routes. // They are not used everywhere, but for consistency and less surprise it's added everywhere. handler = querierapi.ConsistencyMiddleware().Wrap(handler) diff --git a/pkg/api/middleware.go b/pkg/api/middleware.go new file mode 100644 index 00000000000..0151618ea63 --- /dev/null +++ b/pkg/api/middleware.go @@ -0,0 +1,31 @@ +package api + +import ( + "fmt" + "net/http" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/middleware" +) + +// ClusterValidationMiddleware validates that requests are for the correct cluster. +func ClusterValidationMiddleware(cluster string, logger log.Logger) middleware.Interface { + return middleware.Func(func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + reqCluster := r.Header.Get(clusterHeader) + if reqCluster != cluster { + level.Warn(logger).Log("msg", "rejecting request intended for wrong cluster", "cluster", cluster, "request_cluster", reqCluster) + http.Error(w, fmt.Sprintf("request intended for cluster %q - this is cluster %q", reqCluster, cluster), + http.StatusBadRequest) + return + } + + next.ServeHTTP(w, r) + }) + }) +} + +const ( + clusterHeader = "X-Cluster" +) diff --git a/vendor/github.com/grafana/dskit/server/server.go b/vendor/github.com/grafana/dskit/server/server.go index f6c7f997f60..8921b09be96 100644 --- a/vendor/github.com/grafana/dskit/server/server.go +++ b/vendor/github.com/grafana/dskit/server/server.go @@ -88,6 +88,7 @@ type Config struct { GRPCListenPort int `yaml:"grpc_listen_port"` GRPCConnLimit int `yaml:"grpc_listen_conn_limit"` ProxyProtocolEnabled bool `yaml:"proxy_protocol_enabled"` + Cluster string `yaml:"cluster"` CipherSuites string `yaml:"tls_cipher_suites"` MinVersion string `yaml:"tls_min_version"` @@ -216,6 +217,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.LogRequestExcludeHeadersList, "server.log-request-headers-exclude-list", "", "Comma separated list of headers to exclude from loggin. Only used if server.log-request-headers is true.") f.BoolVar(&cfg.LogRequestAtInfoLevel, "server.log-request-at-info-level-enabled", false, "Optionally log requests at info level instead of debug level. Applies to request headers as well if server.log-request-headers is enabled.") f.BoolVar(&cfg.ProxyProtocolEnabled, "server.proxy-protocol-enabled", false, "Enables PROXY protocol.") + f.StringVar(&cfg.Cluster, "server.cluster", "", "Optionally define the server's cluster, and enable validation that requests are for the same cluster.") f.DurationVar(&cfg.Throughput.LatencyCutoff, "server.throughput.latency-cutoff", 0, "Requests taking over the cutoff are be observed to measure throughput. Server-Timing header is used with specified unit as the indicator, for example 'Server-Timing: unit;val=8.2'. If set to 0, the throughput is not calculated.") f.StringVar(&cfg.Throughput.Unit, "server.throughput.unit", "samples_processed", "Unit of the server throughput metric, for example 'processed_bytes' or 'samples_processed'. Observed values are gathered from the 'Server-Timing' header with the 'val' key. If set, it is appended to the request_server_throughput metric name.") } @@ -609,6 +611,11 @@ func handleGRPCError(err error, errChan chan error) { } } +// Cluster returns the configured server cluster. +func (s *Server) Cluster() string { + return s.cfg.Cluster +} + // HTTPListenAddr exposes `net.Addr` that `Server` is listening to for HTTP connections. func (s *Server) HTTPListenAddr() net.Addr { return s.httpListener.Addr() diff --git a/vendor/modules.txt b/vendor/modules.txt index 4044ea8b048..fc393cc00b8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -644,7 +644,7 @@ github.com/grafana/alerting/receivers/webex github.com/grafana/alerting/receivers/webhook github.com/grafana/alerting/receivers/wecom github.com/grafana/alerting/templates -# github.com/grafana/dskit v0.0.0-20250128193928-104df19e2080 +# github.com/grafana/dskit v0.0.0-20250123101449-feb230ca9dc2 ## explicit; go 1.21 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast From 04b0030bb86769853c4cb19288de100c564e0060 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 24 Jan 2025 09:51:14 +0100 Subject: [PATCH 2/3] Pass cluster Signed-off-by: Arve Knudsen --- Makefile | 3 +++ go.mod | 2 +- go.sum | 4 ++-- pkg/api/middleware.go | 7 ++----- pkg/frontend/config.go | 9 +++++++-- pkg/frontend/transport/roundtripper.go | 10 +++++++--- pkg/frontend/v1/frontend_test.go | 2 +- .../v2/frontend_scheduler_adapter_test.go | 6 +++--- pkg/frontend/v2/frontend_test.go | 2 +- pkg/mimir/modules.go | 6 +++++- .../grafana/dskit/clusterutil/clusterutil.go | 6 ++++++ .../grafana/dskit/httpgrpc/httpgrpc.go | 18 +++++++++++++++++- vendor/modules.txt | 3 ++- 13 files changed, 57 insertions(+), 21 deletions(-) create mode 100644 vendor/github.com/grafana/dskit/clusterutil/clusterutil.go diff --git a/Makefile b/Makefile index e4ac16b55d4..faeb5481afa 100644 --- a/Makefile +++ b/Makefile @@ -453,6 +453,9 @@ lint: check-makefiles faillint -paths \ "github.com/twmb/franz-go/pkg/kgo.{AllowAutoTopicCreation}" \ ./pkg/... ./cmd/... ./tools/... ./integration/... + # We need to ensure that when creating http-grpc requests, the X-Cluster header is included. + faillint -paths "github.com/grafana/dskit/httpgrpc.{FromHTTPRequest}=github.com/grafana/dskit/httpgrpc.FromHTTPRequestWithCluster" \ + ./pkg/... ./cmd/... ./tools/... ./integration/... format: ## Run gofmt and goimports. find . $(DONT_FIND) -name '*.pb.go' -prune -o -type f -name '*.go' -exec gofmt -w -s {} \; diff --git a/go.mod b/go.mod index dd07c0b4bb9..c490569976b 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20250123101449-feb230ca9dc2 + github.com/grafana/dskit v0.0.0-20250124130032-aff6c876915b github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/influxdata/influxdb/v2 v2.7.11 diff --git a/go.sum b/go.sum index 7074ee24e67..f6e5c864b6d 100644 --- a/go.sum +++ b/go.sum @@ -1271,8 +1271,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20250113170557-b4ab2ba363a8 h1:mdI6P22PgFD7bQ0Yf4h8cfHSldak4nxogvlsTHZyZmc= github.com/grafana/alerting v0.0.0-20250113170557-b4ab2ba363a8/go.mod h1:QsnoKX/iYZxA4Cv+H+wC7uxutBD8qi8ZW5UJvD2TYmU= -github.com/grafana/dskit v0.0.0-20250123101449-feb230ca9dc2 h1:2ZN2dTx3NDEvREKr5UH6qaAtI+g6wkItol9TzZMGhcQ= -github.com/grafana/dskit v0.0.0-20250123101449-feb230ca9dc2/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= +github.com/grafana/dskit v0.0.0-20250124130032-aff6c876915b h1:y34FUoHHxGMbsX8nsGdCdJGfazKlGjf/7QhvgFMn0HA= +github.com/grafana/dskit v0.0.0-20250124130032-aff6c876915b/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937 h1:fwwnG/NcygoS6XbAaEyK2QzMXI/BZIEJvQ3CD+7XZm8= diff --git a/pkg/api/middleware.go b/pkg/api/middleware.go index 0151618ea63..147eeb7740f 100644 --- a/pkg/api/middleware.go +++ b/pkg/api/middleware.go @@ -6,6 +6,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/clusterutil" "github.com/grafana/dskit/middleware" ) @@ -13,7 +14,7 @@ import ( func ClusterValidationMiddleware(cluster string, logger log.Logger) middleware.Interface { return middleware.Func(func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - reqCluster := r.Header.Get(clusterHeader) + reqCluster := r.Header.Get(clusterutil.ClusterHeader) if reqCluster != cluster { level.Warn(logger).Log("msg", "rejecting request intended for wrong cluster", "cluster", cluster, "request_cluster", reqCluster) http.Error(w, fmt.Sprintf("request intended for cluster %q - this is cluster %q", reqCluster, cluster), @@ -25,7 +26,3 @@ func ClusterValidationMiddleware(cluster string, logger log.Logger) middleware.I }) }) } - -const ( - clusterHeader = "X-Cluster" -) diff --git a/pkg/frontend/config.go b/pkg/frontend/config.go index a6a397f391c..0195f731da3 100644 --- a/pkg/frontend/config.go +++ b/pkg/frontend/config.go @@ -30,6 +30,8 @@ type CombinedFrontendConfig struct { QueryMiddleware querymiddleware.Config `yaml:",inline"` DownstreamURL string `yaml:"downstream_url" category:"advanced"` + + Cluster string `yaml:"-"` } func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) { @@ -66,6 +68,9 @@ func InitFrontend( reg prometheus.Registerer, codec querymiddleware.Codec, ) (http.RoundTripper, *v1.Frontend, *v2.Frontend, error) { + if cfg.Cluster == "" { + panic("cluster not defined") + } switch { case cfg.DownstreamURL != "": // If the user has specified a downstream Prometheus, then we should use that. @@ -88,7 +93,7 @@ func InitFrontend( } fr, err := v2.NewFrontend(cfg.FrontendV2, v2Limits, log, reg, codec) - return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), nil, fr, err + return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr, cfg.Cluster), nil, fr, err default: // No scheduler = use original frontend. @@ -96,6 +101,6 @@ func InitFrontend( if err != nil { return nil, nil, nil, err } - return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), fr, nil, nil + return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr, cfg.Cluster), fr, nil, nil } } diff --git a/pkg/frontend/transport/roundtripper.go b/pkg/frontend/transport/roundtripper.go index 141fade2ee3..062ba63190a 100644 --- a/pkg/frontend/transport/roundtripper.go +++ b/pkg/frontend/transport/roundtripper.go @@ -20,13 +20,17 @@ type GrpcRoundTripper interface { RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, io.ReadCloser, error) } -func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper { - return &grpcRoundTripperAdapter{roundTripper: r} +func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper, cluster string) http.RoundTripper { + return &grpcRoundTripperAdapter{ + roundTripper: r, + cluster: cluster, + } } // This adapter wraps GrpcRoundTripper and converted it into http.RoundTripper type grpcRoundTripperAdapter struct { roundTripper GrpcRoundTripper + cluster string } type buffer struct { @@ -39,7 +43,7 @@ func (b *buffer) Bytes() []byte { } func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, error) { - req, err := httpgrpc.FromHTTPRequest(r) + req, err := httpgrpc.FromHTTPRequestWithCluster(r, a.cluster) if err != nil { return nil, err } diff --git a/pkg/frontend/v1/frontend_test.go b/pkg/frontend/v1/frontend_test.go index 1ee7cfe66e4..5eaa9f27f78 100644 --- a/pkg/frontend/v1/frontend_test.go +++ b/pkg/frontend/v1/frontend_test.go @@ -337,7 +337,7 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a handlerCfg := transport.HandlerConfig{QueryStatsEnabled: true} flagext.DefaultValues(&handlerCfg) - rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(v1) + rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(v1, "") r := mux.NewRouter() r.PathPrefix("/").Handler(middleware.Merge( middleware.AuthenticateUser, diff --git a/pkg/frontend/v2/frontend_scheduler_adapter_test.go b/pkg/frontend/v2/frontend_scheduler_adapter_test.go index 7504019debb..24e767fef9b 100644 --- a/pkg/frontend/v2/frontend_scheduler_adapter_test.go +++ b/pkg/frontend/v2/frontend_scheduler_adapter_test.go @@ -136,7 +136,7 @@ func TestExtractAdditionalQueueDimensions(t *testing.T) { reqs := []*http.Request{rangeHTTPReq, labelValuesHTTPReq} for _, req := range reqs { - httpgrpcReq, err := httpgrpc.FromHTTPRequest(req) + httpgrpcReq, err := httpgrpc.FromHTTPRequestWithCluster(req, "") require.NoError(t, err) additionalQueueDimensions, err := adapter.extractAdditionalQueueDimensions( @@ -179,7 +179,7 @@ func TestExtractAdditionalQueueDimensions(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "tenant-0") instantHTTPReq := makeInstantHTTPRequest(ctx, testData.time) - httpgrpcReq, err := httpgrpc.FromHTTPRequest(instantHTTPReq) + httpgrpcReq, err := httpgrpc.FromHTTPRequestWithCluster(instantHTTPReq, "") require.NoError(t, err) additionalQueueDimensions, err := adapter.extractAdditionalQueueDimensions( @@ -229,7 +229,7 @@ func TestQueryDecoding(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "tenant-0") labelValuesHTTPReq := makeLabelValuesHTTPRequest(ctx, testData.start, testData.end) - httpgrpcReq, err := httpgrpc.FromHTTPRequest(labelValuesHTTPReq) + httpgrpcReq, err := httpgrpc.FromHTTPRequestWithCluster(labelValuesHTTPReq, "") require.NoError(t, err) additionalQueueDimensions, err := adapter.extractAdditionalQueueDimensions( diff --git a/pkg/frontend/v2/frontend_test.go b/pkg/frontend/v2/frontend_test.go index e4f8e5b91a6..e7d3d3feaf6 100644 --- a/pkg/frontend/v2/frontend_test.go +++ b/pkg/frontend/v2/frontend_test.go @@ -532,7 +532,7 @@ func TestFrontendStreamingResponse(t *testing.T) { }) req := httptest.NewRequest("GET", "/api/v1/cardinality/active_series?selector=metric", nil) - rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(f) + rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(f, "") resp, err := rt.RoundTrip(req.WithContext(user.InjectOrgID(context.Background(), userID))) require.NoError(t, err) diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index dfd4a7721a1..2b1dbe80b3e 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/clusterutil" "github.com/grafana/dskit/dns" httpgrpc_server "github.com/grafana/dskit/httpgrpc/server" "github.com/grafana/dskit/kv" @@ -718,7 +719,9 @@ func (t *Mimir) initFlusher() (serv services.Service, err error) { // initQueryFrontendCodec initializes query frontend codec. // NOTE: Grafana Enterprise Metrics depends on this. func (t *Mimir) initQueryFrontendCodec() (services.Service, error) { - t.QueryFrontendCodec = querymiddleware.NewPrometheusCodec(t.Registerer, t.Cfg.Querier.EngineConfig.LookbackDelta, t.Cfg.Frontend.QueryMiddleware.QueryResultResponseFormat, t.Cfg.Frontend.QueryMiddleware.ExtraPropagateHeaders) + // Always pass through the cluster header. + propagateHeaders := append([]string{clusterutil.ClusterHeader}, t.Cfg.Frontend.QueryMiddleware.ExtraPropagateHeaders...) + t.QueryFrontendCodec = querymiddleware.NewPrometheusCodec(t.Registerer, t.Cfg.Querier.EngineConfig.LookbackDelta, t.Cfg.Frontend.QueryMiddleware.QueryResultResponseFormat, propagateHeaders) return nil, nil } @@ -783,6 +786,7 @@ func (t *Mimir) initQueryFrontendTripperware() (serv services.Service, err error } func (t *Mimir) initQueryFrontend() (serv services.Service, err error) { + t.Cfg.Frontend.Cluster = t.Cfg.Server.Cluster t.Cfg.Frontend.FrontendV2.QuerySchedulerDiscovery = t.Cfg.QueryScheduler.ServiceDiscovery t.Cfg.Frontend.FrontendV2.LookBackDelta = t.Cfg.Querier.EngineConfig.LookbackDelta t.Cfg.Frontend.FrontendV2.QueryStoreAfter = t.Cfg.Querier.QueryStoreAfter diff --git a/vendor/github.com/grafana/dskit/clusterutil/clusterutil.go b/vendor/github.com/grafana/dskit/clusterutil/clusterutil.go new file mode 100644 index 00000000000..a02d2d4fbd2 --- /dev/null +++ b/vendor/github.com/grafana/dskit/clusterutil/clusterutil.go @@ -0,0 +1,6 @@ +package clusterutil + +const ( + // ClusterHeader is the name of the cluster identifying HTTP header. + ClusterHeader = "X-Cluster" +) diff --git a/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go b/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go index 616023899b7..350b2c8a90b 100644 --- a/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go +++ b/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go @@ -17,6 +17,7 @@ import ( "github.com/gogo/status" "google.golang.org/grpc/metadata" + "github.com/grafana/dskit/clusterutil" "github.com/grafana/dskit/grpcutil" "github.com/grafana/dskit/log" ) @@ -42,7 +43,7 @@ func (nopCloser) Close() error { return nil } // BytesBuffer returns the underlaying `bytes.buffer` used to build this io.ReadCloser. func (n nopCloser) BytesBuffer() *bytes.Buffer { return n.Buffer } -// FromHTTPRequest converts an ordinary http.Request into an httpgrpc.HTTPRequest +// FromHTTPRequest converts an ordinary http.Request into an httpgrpc.HTTPRequest. func FromHTTPRequest(r *http.Request) (*HTTPRequest, error) { body, err := io.ReadAll(r.Body) if err != nil { @@ -56,6 +57,21 @@ func FromHTTPRequest(r *http.Request) (*HTTPRequest, error) { }, nil } +// FromHTTPRequestWithCluster converts an ordinary http.Request into an httpgrpc.HTTPRequest. +// It's the same as FromHTTPRequest except that if cluster is non-empty, it has to be equal to the +// middleware.ClusterHeader header (or an error is returned). +func FromHTTPRequestWithCluster(r *http.Request, cluster string) (*HTTPRequest, error) { + if cluster != "" { + if c := r.Header.Get(clusterutil.ClusterHeader); c != cluster { + return nil, fmt.Errorf( + "httpgrpc.FromHTTPRequest: %q header should be %q, but is %q", + clusterutil.ClusterHeader, cluster, c, + ) + } + } + return FromHTTPRequest(r) +} + // ToHTTPRequest converts httpgrpc.HTTPRequest to http.Request. func ToHTTPRequest(ctx context.Context, r *HTTPRequest) (*http.Request, error) { req, err := http.NewRequest(r.Method, r.Url, nopCloser{Buffer: bytes.NewBuffer(r.Body)}) diff --git a/vendor/modules.txt b/vendor/modules.txt index fc393cc00b8..99b701c6ee2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -644,12 +644,13 @@ github.com/grafana/alerting/receivers/webex github.com/grafana/alerting/receivers/webhook github.com/grafana/alerting/receivers/wecom github.com/grafana/alerting/templates -# github.com/grafana/dskit v0.0.0-20250123101449-feb230ca9dc2 +# github.com/grafana/dskit v0.0.0-20250124130032-aff6c876915b ## explicit; go 1.21 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast github.com/grafana/dskit/cache github.com/grafana/dskit/cancellation +github.com/grafana/dskit/clusterutil github.com/grafana/dskit/concurrency github.com/grafana/dskit/crypto/tls github.com/grafana/dskit/dns From f17900eee136e2096e8fb98641fd3cc78e0f3630 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 24 Jan 2025 14:45:39 +0100 Subject: [PATCH 3/3] Move HTTP middleware to dskit Signed-off-by: Arve Knudsen --- cmd/mimir/config-descriptor.json | 2 +- cmd/mimir/help-all.txt.tmpl | 2 +- .../configuration-parameters/index.md | 2 +- go.mod | 2 +- go.sum | 4 +- pkg/api/api.go | 5 -- pkg/frontend/config.go | 3 -- .../grafana/dskit/middleware/cluster.go | 23 +++++--- .../grafana/dskit/middleware/grpc_cluster.go | 53 +++++++++++++++++++ .../github.com/grafana/dskit/server/server.go | 18 +++---- vendor/modules.txt | 2 +- 11 files changed, 86 insertions(+), 30 deletions(-) rename pkg/api/middleware.go => vendor/github.com/grafana/dskit/middleware/cluster.go (50%) create mode 100644 vendor/github.com/grafana/dskit/middleware/grpc_cluster.go diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 73006ccea77..50af7045b75 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -419,7 +419,7 @@ "kind": "field", "name": "register_instrumentation", "required": false, - "desc": "Register the intrumentation handlers (/metrics etc).", + "desc": "Register the instrumentation handlers (/metrics etc).", "fieldValue": null, "fieldDefaultValue": true, "fieldFlag": "server.register-instrumentation", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index cbe281813d5..cdb02586bc1 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -3178,7 +3178,7 @@ Usage of ./cmd/mimir/mimir: -server.proxy-protocol-enabled [experimental] Enables PROXY protocol. -server.register-instrumentation - Register the intrumentation handlers (/metrics etc). (default true) + Register the instrumentation handlers (/metrics etc). (default true) -server.report-grpc-codes-in-instrumentation-label-enabled If set to true, gRPC statuses will be reported in instrumentation labels with their string representations. Otherwise, they will be reported as "error". (default true) -server.throughput.latency-cutoff duration diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index d81a607a0c2..5562e75bc68 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -622,7 +622,7 @@ grpc_tls_config: # CLI flag: -server.grpc-tls-ca-path [client_ca_file: | default = ""] -# (advanced) Register the intrumentation handlers (/metrics etc). +# (advanced) Register the instrumentation handlers (/metrics etc). # CLI flag: -server.register-instrumentation [register_instrumentation: | default = true] diff --git a/go.mod b/go.mod index c490569976b..3f44002fe17 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20250124130032-aff6c876915b + github.com/grafana/dskit v0.0.0-20250129115452-2f8c95cf1702 github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/influxdata/influxdb/v2 v2.7.11 diff --git a/go.sum b/go.sum index f6e5c864b6d..26e7c183519 100644 --- a/go.sum +++ b/go.sum @@ -1271,8 +1271,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20250113170557-b4ab2ba363a8 h1:mdI6P22PgFD7bQ0Yf4h8cfHSldak4nxogvlsTHZyZmc= github.com/grafana/alerting v0.0.0-20250113170557-b4ab2ba363a8/go.mod h1:QsnoKX/iYZxA4Cv+H+wC7uxutBD8qi8ZW5UJvD2TYmU= -github.com/grafana/dskit v0.0.0-20250124130032-aff6c876915b h1:y34FUoHHxGMbsX8nsGdCdJGfazKlGjf/7QhvgFMn0HA= -github.com/grafana/dskit v0.0.0-20250124130032-aff6c876915b/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= +github.com/grafana/dskit v0.0.0-20250129115452-2f8c95cf1702 h1:A3R5RHZvKMoRs26luWg8IB9vfd85WgTB/I14l9lcSII= +github.com/grafana/dskit v0.0.0-20250129115452-2f8c95cf1702/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937 h1:fwwnG/NcygoS6XbAaEyK2QzMXI/BZIEJvQ3CD+7XZm8= diff --git a/pkg/api/api.go b/pkg/api/api.go index a89277a2bfb..9e1df52a340 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -158,11 +158,6 @@ func (a *API) RegisterRoutesWithPrefix(prefix string, handler http.Handler, auth } func (a *API) newRoute(path string, handler http.Handler, isPrefix, auth, gzip bool, methods ...string) (route *mux.Route) { - if a.server.Cluster() != "" { - level.Debug(a.logger).Log("msg", "api: enabling cluster validation middleware", "path", path, "cluster", a.server.Cluster()) - handler = ClusterValidationMiddleware(a.server.Cluster(), a.logger).Wrap(handler) - } - // Propagate the consistency level on all HTTP routes. // They are not used everywhere, but for consistency and less surprise it's added everywhere. handler = querierapi.ConsistencyMiddleware().Wrap(handler) diff --git a/pkg/frontend/config.go b/pkg/frontend/config.go index 0195f731da3..8dd1dc6ec3b 100644 --- a/pkg/frontend/config.go +++ b/pkg/frontend/config.go @@ -68,9 +68,6 @@ func InitFrontend( reg prometheus.Registerer, codec querymiddleware.Codec, ) (http.RoundTripper, *v1.Frontend, *v2.Frontend, error) { - if cfg.Cluster == "" { - panic("cluster not defined") - } switch { case cfg.DownstreamURL != "": // If the user has specified a downstream Prometheus, then we should use that. diff --git a/pkg/api/middleware.go b/vendor/github.com/grafana/dskit/middleware/cluster.go similarity index 50% rename from pkg/api/middleware.go rename to vendor/github.com/grafana/dskit/middleware/cluster.go index 147eeb7740f..4a7e8230c7b 100644 --- a/pkg/api/middleware.go +++ b/vendor/github.com/grafana/dskit/middleware/cluster.go @@ -1,22 +1,25 @@ -package api +package middleware import ( "fmt" "net/http" + "regexp" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/clusterutil" - "github.com/grafana/dskit/middleware" ) // ClusterValidationMiddleware validates that requests are for the correct cluster. -func ClusterValidationMiddleware(cluster string, logger log.Logger) middleware.Interface { - return middleware.Func(func(next http.Handler) http.Handler { +func ClusterValidationMiddleware(cluster string, logger log.Logger) Interface { + return Func(func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { reqCluster := r.Header.Get(clusterutil.ClusterHeader) - if reqCluster != cluster { - level.Warn(logger).Log("msg", "rejecting request intended for wrong cluster", "cluster", cluster, "request_cluster", reqCluster) + if !auxilliaryPath(r.URL.Path) && reqCluster != cluster { + level.Warn(logger).Log("msg", "rejecting request intended for wrong cluster", + "cluster", cluster, "request_cluster", reqCluster, "header", clusterutil.ClusterHeader, + "url", r.URL, "path", r.URL.Path) http.Error(w, fmt.Sprintf("request intended for cluster %q - this is cluster %q", reqCluster, cluster), http.StatusBadRequest) return @@ -26,3 +29,11 @@ func ClusterValidationMiddleware(cluster string, logger log.Logger) middleware.I }) }) } + +// Allow for a potential path prefix being configured. +// TODO: Take /backlog_replay_complete out, and allow for it to be configured instead (it's part of a readiness probe). +var reAuxPath = regexp.MustCompile(".*/(metrics|debug/pprof.*|ready|backlog_replay_complete)") + +func auxilliaryPath(pth string) bool { + return reAuxPath.MatchString(pth) +} diff --git a/vendor/github.com/grafana/dskit/middleware/grpc_cluster.go b/vendor/github.com/grafana/dskit/middleware/grpc_cluster.go new file mode 100644 index 00000000000..a705e75501f --- /dev/null +++ b/vendor/github.com/grafana/dskit/middleware/grpc_cluster.go @@ -0,0 +1,53 @@ +package middleware + +import ( + "context" + "fmt" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/gogo/status" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" +) + +const ( + MetadataClusterKey = "x-cluster" +) + +// ClusterUnaryClientInterceptor propagates the given cluster info to gRPC metadata. +func ClusterUnaryClientInterceptor(cluster string) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + if cluster != "" { + ctx = metadata.AppendToOutgoingContext(ctx, MetadataClusterKey, cluster) + } + + return invoker(ctx, method, req, reply, cc, opts...) + } +} + +// ClusterUnaryServerInterceptor checks if the incoming gRPC metadata contains any cluster information and if so, +// checks if the latter corresponds to the given cluster. If it is the case, the request is further propagated. +// Otherwise, an error is returned. +func ClusterUnaryServerInterceptor(cluster string, logger log.Logger) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + reqCluster := getClusterFromIncomingContext(ctx, logger) + if cluster != reqCluster { + msg := fmt.Sprintf("request intended for cluster %q - this is cluster %q", reqCluster, cluster) + level.Warn(logger).Log("msg", msg) + return nil, status.Error(codes.FailedPrecondition, msg) + } + return handler(ctx, req) + } +} + +func getClusterFromIncomingContext(ctx context.Context, logger log.Logger) string { + clusterIDs := metadata.ValueFromIncomingContext(ctx, MetadataClusterKey) + if len(clusterIDs) != 1 { + msg := fmt.Sprintf("gRPC metadata should contain exactly 1 value for key \"%s\", but the current set of values is %v. Returning an empty string.", MetadataClusterKey, clusterIDs) + level.Warn(logger).Log("msg", msg) + return "" + } + return clusterIDs[0] +} diff --git a/vendor/github.com/grafana/dskit/server/server.go b/vendor/github.com/grafana/dskit/server/server.go index 8921b09be96..5a4d1a0db5b 100644 --- a/vendor/github.com/grafana/dskit/server/server.go +++ b/vendor/github.com/grafana/dskit/server/server.go @@ -185,7 +185,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.GRPCListenAddress, "server.grpc-listen-address", "", "gRPC server listen address.") f.IntVar(&cfg.GRPCListenPort, "server.grpc-listen-port", 9095, "gRPC server listen port.") f.IntVar(&cfg.GRPCConnLimit, "server.grpc-conn-limit", 0, "Maximum number of simultaneous grpc connections, <=0 to disable") - f.BoolVar(&cfg.RegisterInstrumentation, "server.register-instrumentation", true, "Register the intrumentation handlers (/metrics etc).") + f.BoolVar(&cfg.RegisterInstrumentation, "server.register-instrumentation", true, "Register the instrumentation handlers (/metrics etc).") f.BoolVar(&cfg.ReportGRPCCodesInInstrumentationLabel, "server.report-grpc-codes-in-instrumentation-label-enabled", false, "If set to true, gRPC statuses will be reported in instrumentation labels with their string representations. Otherwise, they will be reported as \"error\".") f.DurationVar(&cfg.ServerGracefulShutdownTimeout, "server.graceful-shutdown-timeout", 30*time.Second, "Timeout for graceful shutdowns") f.DurationVar(&cfg.HTTPServerReadTimeout, "server.http-read-timeout", 30*time.Second, "Read timeout for entire HTTP request, including headers and body.") @@ -520,10 +520,14 @@ func BuildHTTPMiddleware(cfg Config, router *mux.Router, metrics *Metrics, logge logSourceIPs = nil } + if cfg.DoNotAddDefaultHTTPMiddleware { + return cfg.HTTPMiddleware, nil + } + defaultLogMiddleware := middleware.NewLogMiddleware(logger, cfg.LogRequestHeaders, cfg.LogRequestAtInfoLevel, logSourceIPs, strings.Split(cfg.LogRequestExcludeHeadersList, ",")) defaultLogMiddleware.DisableRequestSuccessLog = cfg.DisableRequestSuccessLog - defaultHTTPMiddleware := []middleware.Interface{ + httpMiddleware := []middleware.Interface{ middleware.RouteInjector{ RouteMatcher: router, }, @@ -543,14 +547,10 @@ func BuildHTTPMiddleware(cfg Config, router *mux.Router, metrics *Metrics, logge RequestThroughput: metrics.RequestThroughput, }, } - var httpMiddleware []middleware.Interface - if cfg.DoNotAddDefaultHTTPMiddleware { - httpMiddleware = cfg.HTTPMiddleware - } else { - httpMiddleware = append(defaultHTTPMiddleware, cfg.HTTPMiddleware...) + if cfg.Cluster != "" { + httpMiddleware = append(httpMiddleware, middleware.ClusterValidationMiddleware(cfg.Cluster, logger)) } - - return httpMiddleware, nil + return append(httpMiddleware, cfg.HTTPMiddleware...), nil } // Run the server; blocks until SIGTERM (if signal handling is enabled), an error is received, or Stop() is called. diff --git a/vendor/modules.txt b/vendor/modules.txt index 99b701c6ee2..43e8649e7ea 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -644,7 +644,7 @@ github.com/grafana/alerting/receivers/webex github.com/grafana/alerting/receivers/webhook github.com/grafana/alerting/receivers/wecom github.com/grafana/alerting/templates -# github.com/grafana/dskit v0.0.0-20250124130032-aff6c876915b +# github.com/grafana/dskit v0.0.0-20250129115452-2f8c95cf1702 ## explicit; go 1.21 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast