diff --git a/Makefile b/Makefile index e4ac16b55d4..faeb5481afa 100644 --- a/Makefile +++ b/Makefile @@ -453,6 +453,9 @@ lint: check-makefiles faillint -paths \ "github.com/twmb/franz-go/pkg/kgo.{AllowAutoTopicCreation}" \ ./pkg/... ./cmd/... ./tools/... ./integration/... + # We need to ensure that when creating http-grpc requests, the X-Cluster header is included. + faillint -paths "github.com/grafana/dskit/httpgrpc.{FromHTTPRequest}=github.com/grafana/dskit/httpgrpc.FromHTTPRequestWithCluster" \ + ./pkg/... ./cmd/... ./tools/... ./integration/... format: ## Run gofmt and goimports. find . $(DONT_FIND) -name '*.pb.go' -prune -o -type f -name '*.go' -exec gofmt -w -s {} \; diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index e752bcf5f0b..6f80e9e967d 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -223,6 +223,16 @@ "fieldType": "boolean", "fieldCategory": "experimental" }, + { + "kind": "field", + "name": "cluster", + "required": false, + "desc": "Optionally define the server's cluster, and enable validation that requests are for the same cluster.", + "fieldValue": null, + "fieldDefaultValue": "", + "fieldFlag": "server.cluster", + "fieldType": "string" + }, { "kind": "field", "name": "tls_cipher_suites", @@ -409,7 +419,7 @@ "kind": "field", "name": "register_instrumentation", "required": false, - "desc": "Register the intrumentation handlers (/metrics etc).", + "desc": "Register the instrumentation handlers (/metrics etc).", "fieldValue": null, "fieldDefaultValue": true, "fieldFlag": "server.register-instrumentation", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 65c3050b746..1a73de4c718 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -3097,6 +3097,8 @@ Usage of ./cmd/mimir/mimir: Comma separated list of yaml files with the configuration that can be updated at runtime. Runtime config files will be merged from left to right. -runtime-config.reload-period duration How often to check runtime config files. (default 10s) + -server.cluster string + Optionally define the server's cluster, and enable validation that requests are for the same cluster. -server.graceful-shutdown-timeout duration Timeout for graceful shutdowns (default 30s) -server.grpc-conn-limit int @@ -3182,7 +3184,7 @@ Usage of ./cmd/mimir/mimir: -server.proxy-protocol-enabled [experimental] Enables PROXY protocol. -server.register-instrumentation - Register the intrumentation handlers (/metrics etc). (default true) + Register the instrumentation handlers (/metrics etc). (default true) -server.report-grpc-codes-in-instrumentation-label-enabled If set to true, gRPC statuses will be reported in instrumentation labels with their string representations. Otherwise, they will be reported as "error". (default true) -server.throughput.latency-cutoff duration diff --git a/cmd/mimir/help.txt.tmpl b/cmd/mimir/help.txt.tmpl index 359fc7e418e..5acc6d9cc59 100644 --- a/cmd/mimir/help.txt.tmpl +++ b/cmd/mimir/help.txt.tmpl @@ -763,6 +763,8 @@ Usage of ./cmd/mimir/mimir: The tenant's shard size when sharding is used by ruler. Value of 0 disables shuffle sharding for the tenant, and tenant rules will be sharded across all ruler replicas. -runtime-config.file comma-separated-list-of-strings Comma separated list of yaml files with the configuration that can be updated at runtime. Runtime config files will be merged from left to right. + -server.cluster string + Optionally define the server's cluster, and enable validation that requests are for the same cluster. -server.grpc-listen-address string gRPC server listen address. -server.grpc-listen-port int diff --git a/development/mimir-microservices-mode/config/datasources.yaml b/development/mimir-microservices-mode/config/datasources.yaml index da565b5cb13..d44c7b532b3 100644 --- a/development/mimir-microservices-mode/config/datasources.yaml +++ b/development/mimir-microservices-mode/config/datasources.yaml @@ -12,6 +12,9 @@ datasources: exemplarTraceIdDestinations: - name: traceID datasourceUid: jaeger + httpHeaderName1: 'X-Cluster' + secureJsonData: + httpHeaderValue1: 'development' - name: Prometheus type: prometheus diff --git a/development/mimir-microservices-mode/config/grafana-agent.yaml b/development/mimir-microservices-mode/config/grafana-agent.yaml index 14304e96955..27a2bd7a2a9 100644 --- a/development/mimir-microservices-mode/config/grafana-agent.yaml +++ b/development/mimir-microservices-mode/config/grafana-agent.yaml @@ -50,3 +50,5 @@ prometheus: remote_write: - url: http://distributor-1:8000/api/v1/push + headers: + 'X-Cluster': 'development' diff --git a/development/mimir-microservices-mode/config/mimir.yaml b/development/mimir-microservices-mode/config/mimir.yaml index d90ea91fede..dadf157a2c5 100644 --- a/development/mimir-microservices-mode/config/mimir.yaml +++ b/development/mimir-microservices-mode/config/mimir.yaml @@ -1,3 +1,6 @@ +server: + cluster: development + multitenancy_enabled: false distributor: diff --git a/development/mimir-microservices-mode/config/prom-ha-pair-1.yaml b/development/mimir-microservices-mode/config/prom-ha-pair-1.yaml index e6ba48b80e2..4ea75e6b8f7 100644 --- a/development/mimir-microservices-mode/config/prom-ha-pair-1.yaml +++ b/development/mimir-microservices-mode/config/prom-ha-pair-1.yaml @@ -33,3 +33,5 @@ remote_write: - url: http://distributor-2:8001/api/v1/push send_native_histograms: true send_exemplars: true + headers: + 'X-Cluster': 'development' diff --git a/development/mimir-microservices-mode/config/prom-ha-pair-2.yaml b/development/mimir-microservices-mode/config/prom-ha-pair-2.yaml index c55e31e1ab3..34c51e16327 100644 --- a/development/mimir-microservices-mode/config/prom-ha-pair-2.yaml +++ b/development/mimir-microservices-mode/config/prom-ha-pair-2.yaml @@ -33,3 +33,5 @@ remote_write: - url: http://distributor-2:8001/api/v1/push send_native_histograms: true send_exemplars: true + headers: + 'X-Cluster': 'development' diff --git a/development/mimir-microservices-mode/config/prometheus.yaml b/development/mimir-microservices-mode/config/prometheus.yaml index f2d30912e3b..a05778cafe5 100644 --- a/development/mimir-microservices-mode/config/prometheus.yaml +++ b/development/mimir-microservices-mode/config/prometheus.yaml @@ -47,6 +47,8 @@ remote_write: - url: http://distributor-1:8000/api/v1/push send_native_histograms: true send_exemplars: true + headers: + 'X-Cluster': 'development' rule_files: - '/etc/mixin/mimir-alerts.yaml' diff --git a/development/mimir-microservices-mode/docker-compose.yml b/development/mimir-microservices-mode/docker-compose.yml index d59b39e809d..d11ecde5268 100644 --- a/development/mimir-microservices-mode/docker-compose.yml +++ b/development/mimir-microservices-mode/docker-compose.yml @@ -120,7 +120,7 @@ "command": - "sh" - "-c" - - "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=continuous-test -server.http-listen-port=8090 -server.grpc-listen-port=9090 -activity-tracker.filepath=/activity/continuous-test-8090 -tests.run-interval=2m -tests.read-endpoint=http://query-frontend:8007/prometheus -tests.tenant-id=mimir-continuous-test -tests.write-endpoint=http://distributor-1:8000 -tests.write-read-series-test.max-query-age=1h -tests.write-read-series-test.num-series=100 -memberlist.nodename=continuous-test -memberlist.bind-port=10090 -ingester.ring.store=memberlist -distributor.ring.store=memberlist -compactor.ring.store=memberlist -store-gateway.sharding-ring.store=memberlist -ruler.ring.store=memberlist -alertmanager.sharding-ring.store=memberlist -blocks-storage.bucket-store.index-cache.backend=memcached -blocks-storage.bucket-store.chunks-cache.backend=memcached -blocks-storage.bucket-store.metadata-cache.backend=memcached -query-frontend.results-cache.backend=memcached -ruler-storage.cache.backend=memcached -blocks-storage.bucket-store.index-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.chunks-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.metadata-cache.memcached.addresses=dns+memcached:11211 -query-frontend.results-cache.memcached.addresses=dns+memcached:11211 -ruler-storage.cache.memcached.addresses=dns+memcached:11211" + - "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=continuous-test -server.cluster=development -server.http-listen-port=8090 -server.grpc-listen-port=9090 -activity-tracker.filepath=/activity/continuous-test-8090 -tests.run-interval=2m -tests.read-endpoint=http://query-frontend:8007/prometheus -tests.tenant-id=mimir-continuous-test -tests.write-endpoint=http://distributor-1:8000 -tests.write-read-series-test.max-query-age=1h -tests.write-read-series-test.num-series=100 -memberlist.nodename=continuous-test -memberlist.bind-port=10090 -ingester.ring.store=memberlist -distributor.ring.store=memberlist -compactor.ring.store=memberlist -store-gateway.sharding-ring.store=memberlist -ruler.ring.store=memberlist -alertmanager.sharding-ring.store=memberlist -blocks-storage.bucket-store.index-cache.backend=memcached -blocks-storage.bucket-store.chunks-cache.backend=memcached -blocks-storage.bucket-store.metadata-cache.backend=memcached -query-frontend.results-cache.backend=memcached -ruler-storage.cache.backend=memcached -blocks-storage.bucket-store.index-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.chunks-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.metadata-cache.memcached.addresses=dns+memcached:11211 -query-frontend.results-cache.memcached.addresses=dns+memcached:11211 -ruler-storage.cache.memcached.addresses=dns+memcached:11211" "depends_on": - "minio" - "distributor-1" diff --git a/development/mimir-monolithic-mode/config/datasource-mimir.yaml b/development/mimir-monolithic-mode/config/datasource-mimir.yaml index 4b3a658a5e1..e17e8d47bfa 100644 --- a/development/mimir-monolithic-mode/config/datasource-mimir.yaml +++ b/development/mimir-monolithic-mode/config/datasource-mimir.yaml @@ -12,9 +12,12 @@ datasources: exemplarTraceIdDestinations: - name: traceID datasourceUid: jaeger + httpHeaderName1: 'X-Cluster' + secureJsonData: + httpHeaderValue1: 'development' - name: Jaeger type: jaeger access: proxy uid: jaeger orgID: 1 - url: http://jaeger:16686/ \ No newline at end of file + url: http://jaeger:16686/ diff --git a/development/mimir-monolithic-mode/config/grafana-agent.yaml b/development/mimir-monolithic-mode/config/grafana-agent.yaml index 9eeddc1bac4..abf081527ad 100644 --- a/development/mimir-monolithic-mode/config/grafana-agent.yaml +++ b/development/mimir-monolithic-mode/config/grafana-agent.yaml @@ -23,3 +23,5 @@ prometheus: remote_write: - url: http://mimir-1:8001/api/v1/push + headers: + 'X-Cluster': 'development' diff --git a/development/mimir-monolithic-mode/config/mimir.yaml b/development/mimir-monolithic-mode/config/mimir.yaml index ef730e3b9da..193ef5a5156 100644 --- a/development/mimir-monolithic-mode/config/mimir.yaml +++ b/development/mimir-monolithic-mode/config/mimir.yaml @@ -1,5 +1,8 @@ multitenancy_enabled: false +server: + cluster: development + distributor: pool: health_check_ingesters: true diff --git a/development/mimir-monolithic-mode/config/prometheus.yaml b/development/mimir-monolithic-mode/config/prometheus.yaml index 86194f2cfd9..3328572ff6f 100644 --- a/development/mimir-monolithic-mode/config/prometheus.yaml +++ b/development/mimir-monolithic-mode/config/prometheus.yaml @@ -20,3 +20,5 @@ scrape_configs: remote_write: - url: http://mimir-1:8001/api/v1/push send_native_histograms: true + headers: + 'X-Cluster': 'development' diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index f92cf57ad40..ab7e387a2a7 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -553,6 +553,11 @@ The `server` block configures the HTTP and gRPC server of the launched service(s # CLI flag: -server.proxy-protocol-enabled [proxy_protocol_enabled: | default = false] +# Optionally define the server's cluster, and enable validation that requests +# are for the same cluster. +# CLI flag: -server.cluster +[cluster: | default = ""] + # Comma-separated list of cipher suites to use. If blank, the default Go cipher # suites is used. # CLI flag: -server.tls-cipher-suites @@ -617,7 +622,7 @@ grpc_tls_config: # CLI flag: -server.grpc-tls-ca-path [client_ca_file: | default = ""] -# (advanced) Register the intrumentation handlers (/metrics etc). +# (advanced) Register the instrumentation handlers (/metrics etc). # CLI flag: -server.register-instrumentation [register_instrumentation: | default = true] diff --git a/go.mod b/go.mod index 6322ddf1d35..a18062f303a 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20250131191929-eab36484cec2 + github.com/grafana/dskit v0.0.0-20250204153901-2447c477a34f github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/influxdata/influxdb/v2 v2.7.11 diff --git a/go.sum b/go.sum index ce34ba3f7d9..9cc290da1d3 100644 --- a/go.sum +++ b/go.sum @@ -1271,8 +1271,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20250128163937-4446935bbcce h1:lilqLsOGzo+0SuyXjaN5XRVJbnkJRB0bXMoIlYHTIPE= github.com/grafana/alerting v0.0.0-20250128163937-4446935bbcce/go.mod h1:QsnoKX/iYZxA4Cv+H+wC7uxutBD8qi8ZW5UJvD2TYmU= -github.com/grafana/dskit v0.0.0-20250131191929-eab36484cec2 h1:9xJDVoTFhzJZzvghXGDDQJapDQfYvVM+/TcWySUZ1VE= -github.com/grafana/dskit v0.0.0-20250131191929-eab36484cec2/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= +github.com/grafana/dskit v0.0.0-20250204153901-2447c477a34f h1:bhJvt/uZK7qSo1kQoiqo3NLZpQsO1vEDkLKzy3p1sSc= +github.com/grafana/dskit v0.0.0-20250204153901-2447c477a34f/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937 h1:fwwnG/NcygoS6XbAaEyK2QzMXI/BZIEJvQ3CD+7XZm8= diff --git a/integration/ingester_test.go b/integration/ingester_test.go index 343344567b6..2f6a24d23de 100644 --- a/integration/ingester_test.go +++ b/integration/ingester_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/grafana/dskit/clusterutil" "github.com/grafana/e2e" e2edb "github.com/grafana/e2e/db" "github.com/prometheus/common/model" @@ -783,3 +784,73 @@ func TestIngesterReportGRPCStatusCodes(t *testing.T) { require.Equalf(t, 1.0, successfulQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests) require.Equalf(t, 0.0, cancelledQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests) } + +func TestInvalidCluster(t *testing.T) { + series := []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + { + Name: "__name__", + Value: "not_foobar", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: time.Now().Round(time.Second).UnixMilli(), + Value: 100, + }, + }, + }, + } + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + baseFlags := map[string]string{ + "-distributor.ingestion-tenant-shard-size": "0", + "-ingester.ring.heartbeat-period": "1s", + } + + flags := mergeFlags( + BlocksStorageFlags(), + BlocksStorageS3Flags(), + baseFlags, + ) + + distributorFlags := mergeFlags( + flags, + map[string]string{ + "-server.cluster": "distributor-cluster", + }, + ) + + ingesterFlags := mergeFlags( + flags, + map[string]string{ + "-server.cluster": "ingester-cluster", + }, + ) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Mimir components. + distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), distributorFlags) + ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), ingesterFlags) + require.NoError(t, s.StartAndWaitReady(distributor, ingester)) + + // Wait until distributor has updated the ring. + require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), "", "", "", userID, e2emimir.WithAddHeader(clusterutil.ClusterHeader, "distributor-cluster")) + require.NoError(t, err) + + res, err := client.Push(series) + require.NoError(t, err) + require.Equal(t, http.StatusBadRequest, res.StatusCode) +} diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 721f633684d..93948c679f7 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1787,7 +1787,32 @@ func (d *Distributor) sendWriteRequestToIngesters(ctx context.Context, tenantRin ctx = grpcutil.AppendMessageSizeToOutgoingContext(ctx, req) // Let ingester know the size of the message, without needing to read the message first. _, err = c.Push(ctx, req) + if err != nil { + stat, ok := grpcutil.ErrorToStatus(err) + if ok { + details := stat.Details() + detailsAsString := make([]string, 0, len(details)) + if len(details) != 0 { + for _, det := range details { + if errDetails, ok := det.(*mimirpb.ErrorDetails); ok { + detailsAsString = append(detailsAsString, errDetails.Cause.String()) + } + if errDetails, ok := det.(*grpcutil.ErrorDetails); ok { + detailsAsString = append(detailsAsString, errDetails.Cause.String()) + } + } + } + dets := fmt.Sprintf("%v", detailsAsString) + level.Info(d.log).Log("msg", "error is convertible to status", "code", stat.Code(), "errMessage", stat.Message(), "details", dets) + } else { + level.Info(d.log).Log("msg", "error is not convertible to status", "err", err) + } + } err = wrapIngesterPushError(err, ingester.Id) + var ingesterPushErr ingesterPushError + if errors.As(err, &ingesterPushErr) { + level.Info(d.log).Log("msg", "error is an ingesterPushError", "cause", ingesterPushErr.Cause().String(), "errorMessage", ingesterPushErr.Error()) + } err = wrapDeadlineExceededPushError(err) return err diff --git a/pkg/distributor/errors.go b/pkg/distributor/errors.go index e0eb803dd75..d9e81fc2c43 100644 --- a/pkg/distributor/errors.go +++ b/pkg/distributor/errors.go @@ -182,6 +182,11 @@ func newIngesterPushError(stat *status.Status, ingesterID string) ingesterPushEr if errorDetails, ok := details[0].(*mimirpb.ErrorDetails); ok { errorCause = errorDetails.GetCause() } + if errorDetails, ok := details[0].(*grpcutil.ErrorDetails); ok { + if errorDetails.Cause == grpcutil.WRONG_CLUSTER_NAME { + errorCause = mimirpb.BAD_DATA + } + } } message := fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, stat.Message()) return ingesterPushError{ diff --git a/pkg/distributor/errors_test.go b/pkg/distributor/errors_test.go index 06160124207..088a7367202 100644 --- a/pkg/distributor/errors_test.go +++ b/pkg/distributor/errors_test.go @@ -422,6 +422,10 @@ func TestWrapIngesterPushError(t *testing.T) { ingesterPushError: createStatusWithDetails(t, codes.Unavailable, testErrorMsg, mimirpb.UNKNOWN_CAUSE).Err(), expectedIngesterPushError: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, testErrorMsg, mimirpb.UNKNOWN_CAUSE), ingesterID), }, + "a gRPC error with details with grpcutil.WRONG_CLUSTER_NAME cause gives an ingesterPushError with BAD_DATA cause": { + ingesterPushError: grpcutil.Status(codes.FailedPrecondition, testErrorMsg, &grpcutil.ErrorDetails{Cause: grpcutil.WRONG_CLUSTER_NAME}).Err(), + expectedIngesterPushError: newIngesterPushError(createStatusWithDetails(t, codes.FailedPrecondition, testErrorMsg, mimirpb.BAD_DATA), ingesterID), + }, "a DeadlineExceeded gRPC ingester error gives an ingesterPushError with UNKNOWN_CAUSE cause": { // This is how context.DeadlineExceeded error is translated into a gRPC error. ingesterPushError: status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error()), diff --git a/pkg/frontend/config.go b/pkg/frontend/config.go index a6a397f391c..8eb4290bb6a 100644 --- a/pkg/frontend/config.go +++ b/pkg/frontend/config.go @@ -30,6 +30,8 @@ type CombinedFrontendConfig struct { QueryMiddleware querymiddleware.Config `yaml:",inline"` DownstreamURL string `yaml:"downstream_url" category:"advanced"` + + Cluster string `yaml:"-"` } func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) { @@ -57,15 +59,7 @@ func (cfg *CombinedFrontendConfig) Validate() error { // Returned RoundTripper can be wrapped in more round-tripper middlewares, and then eventually registered // into HTTP server using the Handler from this package. Returned RoundTripper is always non-nil // (if there are no errors), and it uses the returned frontend (if any). -func InitFrontend( - cfg CombinedFrontendConfig, - v1Limits v1.Limits, - v2Limits v2.Limits, - grpcListenPort int, - log log.Logger, - reg prometheus.Registerer, - codec querymiddleware.Codec, -) (http.RoundTripper, *v1.Frontend, *v2.Frontend, error) { +func InitFrontend(cfg CombinedFrontendConfig, v1Limits v1.Limits, v2Limits v2.Limits, grpcListenPort int, log log.Logger, reg prometheus.Registerer, codec querymiddleware.Codec, cluster string) (http.RoundTripper, *v1.Frontend, *v2.Frontend, error) { switch { case cfg.DownstreamURL != "": // If the user has specified a downstream Prometheus, then we should use that. @@ -87,8 +81,8 @@ func InitFrontend( cfg.FrontendV2.Port = grpcListenPort } - fr, err := v2.NewFrontend(cfg.FrontendV2, v2Limits, log, reg, codec) - return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), nil, fr, err + fr, err := v2.NewFrontend(cfg.FrontendV2, v2Limits, log, reg, codec, cluster) + return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr, cfg.Cluster), nil, fr, err default: // No scheduler = use original frontend. @@ -96,6 +90,6 @@ func InitFrontend( if err != nil { return nil, nil, nil, err } - return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), fr, nil, nil + return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr, cfg.Cluster), fr, nil, nil } } diff --git a/pkg/frontend/frontend_test.go b/pkg/frontend/frontend_test.go index 2c7498b846d..3261efd42b0 100644 --- a/pkg/frontend/frontend_test.go +++ b/pkg/frontend/frontend_test.go @@ -238,7 +238,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand httpListen, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) - rt, v1, v2, err := InitFrontend(config, limits{}, limits{}, 0, logger, nil, codec) + rt, v1, v2, err := InitFrontend(config, limits{}, limits{}, 0, logger, nil, codec, "") require.NoError(t, err) require.NotNil(t, rt) // v1 will be nil if DownstreamURL is defined. @@ -276,7 +276,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand go grpcServer.Serve(grpcListen) //nolint:errcheck var worker services.Service - worker, err = querier_worker.NewQuerierWorker(workerConfig, httpgrpc_server.NewServer(handler, httpgrpc_server.WithReturn4XXErrors), logger, nil) + worker, err = querier_worker.NewQuerierWorker(workerConfig, httpgrpc_server.NewServer(handler, httpgrpc_server.WithReturn4XXErrors), logger, nil, "") require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker)) diff --git a/pkg/frontend/transport/roundtripper.go b/pkg/frontend/transport/roundtripper.go index 141fade2ee3..062ba63190a 100644 --- a/pkg/frontend/transport/roundtripper.go +++ b/pkg/frontend/transport/roundtripper.go @@ -20,13 +20,17 @@ type GrpcRoundTripper interface { RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, io.ReadCloser, error) } -func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper { - return &grpcRoundTripperAdapter{roundTripper: r} +func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper, cluster string) http.RoundTripper { + return &grpcRoundTripperAdapter{ + roundTripper: r, + cluster: cluster, + } } // This adapter wraps GrpcRoundTripper and converted it into http.RoundTripper type grpcRoundTripperAdapter struct { roundTripper GrpcRoundTripper + cluster string } type buffer struct { @@ -39,7 +43,7 @@ func (b *buffer) Bytes() []byte { } func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, error) { - req, err := httpgrpc.FromHTTPRequest(r) + req, err := httpgrpc.FromHTTPRequestWithCluster(r, a.cluster) if err != nil { return nil, err } diff --git a/pkg/frontend/v1/frontend_test.go b/pkg/frontend/v1/frontend_test.go index 1ee7cfe66e4..4154fcc7857 100644 --- a/pkg/frontend/v1/frontend_test.go +++ b/pkg/frontend/v1/frontend_test.go @@ -337,7 +337,7 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a handlerCfg := transport.HandlerConfig{QueryStatsEnabled: true} flagext.DefaultValues(&handlerCfg) - rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(v1) + rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(v1, "") r := mux.NewRouter() r.PathPrefix("/").Handler(middleware.Merge( middleware.AuthenticateUser, @@ -355,7 +355,7 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a go grpcServer.Serve(grpcListen) //nolint:errcheck var worker services.Service - worker, err = querier_worker.NewQuerierWorker(workerConfig, httpgrpc_server.NewServer(handler, httpgrpc_server.WithReturn4XXErrors), logger, nil) + worker, err = querier_worker.NewQuerierWorker(workerConfig, httpgrpc_server.NewServer(handler, httpgrpc_server.WithReturn4XXErrors), logger, nil, "") require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker)) diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index 2d029271c46..24dde998789 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -152,7 +152,7 @@ type enqueueResult struct { } // NewFrontend creates a new frontend. -func NewFrontend(cfg Config, limits Limits, log log.Logger, reg prometheus.Registerer, codec querymiddleware.Codec) (*Frontend, error) { +func NewFrontend(cfg Config, limits Limits, log log.Logger, reg prometheus.Registerer, codec querymiddleware.Codec, cluster string) (*Frontend, error) { requestsCh := make(chan *frontendRequest) toSchedulerAdapter := frontendToSchedulerAdapter{ log: log, @@ -161,7 +161,7 @@ func NewFrontend(cfg Config, limits Limits, log log.Logger, reg prometheus.Regis codec: codec, } - schedulerWorkers, err := newFrontendSchedulerWorkers(cfg, net.JoinHostPort(cfg.Addr, strconv.Itoa(cfg.Port)), requestsCh, toSchedulerAdapter, log, reg) + schedulerWorkers, err := newFrontendSchedulerWorkers(cfg, net.JoinHostPort(cfg.Addr, strconv.Itoa(cfg.Port)), requestsCh, toSchedulerAdapter, log, reg, cluster) if err != nil { return nil, err } diff --git a/pkg/frontend/v2/frontend_scheduler_adapter_test.go b/pkg/frontend/v2/frontend_scheduler_adapter_test.go index 7504019debb..24e767fef9b 100644 --- a/pkg/frontend/v2/frontend_scheduler_adapter_test.go +++ b/pkg/frontend/v2/frontend_scheduler_adapter_test.go @@ -136,7 +136,7 @@ func TestExtractAdditionalQueueDimensions(t *testing.T) { reqs := []*http.Request{rangeHTTPReq, labelValuesHTTPReq} for _, req := range reqs { - httpgrpcReq, err := httpgrpc.FromHTTPRequest(req) + httpgrpcReq, err := httpgrpc.FromHTTPRequestWithCluster(req, "") require.NoError(t, err) additionalQueueDimensions, err := adapter.extractAdditionalQueueDimensions( @@ -179,7 +179,7 @@ func TestExtractAdditionalQueueDimensions(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "tenant-0") instantHTTPReq := makeInstantHTTPRequest(ctx, testData.time) - httpgrpcReq, err := httpgrpc.FromHTTPRequest(instantHTTPReq) + httpgrpcReq, err := httpgrpc.FromHTTPRequestWithCluster(instantHTTPReq, "") require.NoError(t, err) additionalQueueDimensions, err := adapter.extractAdditionalQueueDimensions( @@ -229,7 +229,7 @@ func TestQueryDecoding(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "tenant-0") labelValuesHTTPReq := makeLabelValuesHTTPRequest(ctx, testData.start, testData.end) - httpgrpcReq, err := httpgrpc.FromHTTPRequest(labelValuesHTTPReq) + httpgrpcReq, err := httpgrpc.FromHTTPRequestWithCluster(labelValuesHTTPReq, "") require.NoError(t, err) additionalQueueDimensions, err := adapter.extractAdditionalQueueDimensions( diff --git a/pkg/frontend/v2/frontend_scheduler_worker.go b/pkg/frontend/v2/frontend_scheduler_worker.go index 597ada88674..e152eba7b31 100644 --- a/pkg/frontend/v2/frontend_scheduler_worker.go +++ b/pkg/frontend/v2/frontend_scheduler_worker.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/cancellation" "github.com/grafana/dskit/httpgrpc" + "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/servicediscovery" "github.com/grafana/dskit/services" "github.com/pkg/errors" @@ -46,6 +47,7 @@ type frontendSchedulerWorkers struct { cfg Config log log.Logger frontendAddress string + cluster string // Channel with requests that should be forwarded to the scheduler. requestsCh <-chan *frontendRequest @@ -61,18 +63,12 @@ type frontendSchedulerWorkers struct { enqueueDuration *prometheus.HistogramVec } -func newFrontendSchedulerWorkers( - cfg Config, - frontendAddress string, - requestsCh <-chan *frontendRequest, - toSchedulerAdapter frontendToSchedulerAdapter, - log log.Logger, - reg prometheus.Registerer, -) (*frontendSchedulerWorkers, error) { +func newFrontendSchedulerWorkers(cfg Config, frontendAddress string, requestsCh <-chan *frontendRequest, toSchedulerAdapter frontendToSchedulerAdapter, log log.Logger, reg prometheus.Registerer, cluster string) (*frontendSchedulerWorkers, error) { f := &frontendSchedulerWorkers{ cfg: cfg, log: log, frontendAddress: frontendAddress, + cluster: cluster, requestsCh: requestsCh, toSchedulerAdapter: toSchedulerAdapter, workers: map[string]*frontendSchedulerWorker{}, @@ -219,7 +215,7 @@ func (f *frontendSchedulerWorkers) getWorkersCount() int { func (f *frontendSchedulerWorkers) connectToScheduler(ctx context.Context, address string) (*grpc.ClientConn, error) { // Because we only use single long-running method, it doesn't make sense to inject user ID, send over tracing or add metrics. - opts, err := f.cfg.GRPCClientConfig.DialOption(nil, nil) + opts, err := f.cfg.GRPCClientConfig.DialOption([]grpc.UnaryClientInterceptor{middleware.ClusterUnaryClientInterceptor(f.cluster)}, nil) if err != nil { return nil, err } diff --git a/pkg/frontend/v2/frontend_test.go b/pkg/frontend/v2/frontend_test.go index e4f8e5b91a6..05af9cce832 100644 --- a/pkg/frontend/v2/frontend_test.go +++ b/pkg/frontend/v2/frontend_test.go @@ -78,7 +78,7 @@ func setupFrontendWithConcurrencyAndServerOptions(t *testing.T, reg prometheus.R logger := log.NewLogfmtLogger(os.Stdout) codec := querymiddleware.NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, "json", nil) - f, err := NewFrontend(cfg, limits{}, logger, reg, codec) + f, err := NewFrontend(cfg, limits{}, logger, reg, codec, "") require.NoError(t, err) frontendv2pb.RegisterFrontendForQuerierServer(server, f) @@ -532,7 +532,7 @@ func TestFrontendStreamingResponse(t *testing.T) { }) req := httptest.NewRequest("GET", "/api/v1/cardinality/active_series?selector=metric", nil) - rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(f) + rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(f, "") resp, err := rt.RoundTrip(req.WithContext(user.InjectOrgID(context.Background(), userID))) require.NoError(t, err) diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 262dcdda789..1872a19eba8 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -36,7 +36,7 @@ type closableHealthAndIngesterClient struct { func MakeIngesterClient(inst ring.InstanceDesc, cfg Config, metrics *Metrics) (HealthAndIngesterClient, error) { reportGRPCStatusesOptions := []middleware.InstrumentationOption{middleware.ReportGRPCStatusOption} unary, stream := grpcclient.Instrument(metrics.requestDuration, reportGRPCStatusesOptions...) - unary = append(unary, querierapi.ReadConsistencyClientUnaryInterceptor) + unary = append(unary, querierapi.ReadConsistencyClientUnaryInterceptor, middleware.ClusterUnaryClientInterceptor(cfg.Cluster)) stream = append(stream, querierapi.ReadConsistencyClientStreamInterceptor) dialOpts, err := cfg.GRPCClientConfig.DialOption(unary, stream) @@ -67,6 +67,7 @@ func (c *closableHealthAndIngesterClient) Close() error { // Config is the configuration struct for the ingester client type Config struct { GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with ingesters from distributors, queriers and rulers."` + Cluster string `yaml:"-"` } // RegisterFlags registers configuration settings used by the ingester client config. diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 84d8b43e4cb..3a21bd8e784 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1125,7 +1125,6 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques // NOTE: because we use `unsafe` in deserialisation, we must not // retain anything from `req` past the exit from this function. defer cleanUp() - start := time.Now() // Only start/finish request here when the request comes NOT from grpc handlers (i.e., from ingest.Store). // NOTE: request coming from grpc handler may end up calling start multiple times during its lifetime (e.g., when migrating to ingest storage). diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index c5a507b545d..3b3c59cf639 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/clusterutil" "github.com/grafana/dskit/dns" httpgrpc_server "github.com/grafana/dskit/httpgrpc/server" "github.com/grafana/dskit/kv" @@ -466,6 +467,7 @@ func (t *Mimir) initDistributorService() (serv services.Service, err error) { t.Cfg.Distributor.MinimiseIngesterRequestsHedgingDelay = t.Cfg.Querier.MinimiseIngesterRequestsHedgingDelay t.Cfg.Distributor.PreferAvailabilityZone = t.Cfg.Querier.PreferAvailabilityZone t.Cfg.Distributor.IngestStorageConfig = t.Cfg.IngestStorage + t.Cfg.IngesterClient.Cluster = t.Cfg.Server.Cluster t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.ActiveGroupsCleanup, t.CostAttributionManager, t.IngesterRing, t.IngesterPartitionInstanceRing, @@ -636,13 +638,11 @@ func (t *Mimir) initQuerier() (serv services.Service, err error) { return nil, nil } - return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter, httpgrpc_server.WithReturn4XXErrors), util_log.Logger, t.Registerer) + return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter, httpgrpc_server.WithReturn4XXErrors), util_log.Logger, t.Registerer, t.Cfg.Server.Cluster) } func (t *Mimir) initStoreQueryable() (services.Service, error) { - q, err := querier.NewBlocksStoreQueryableFromConfig( - t.Cfg.Querier, t.Cfg.StoreGateway, t.Cfg.BlocksStorage, t.Overrides, util_log.Logger, t.Registerer, - ) + q, err := querier.NewBlocksStoreQueryableFromConfig(t.Cfg.Querier, t.Cfg.StoreGateway, t.Cfg.BlocksStorage, t.Overrides, util_log.Logger, t.Registerer, t.Cfg.Server.Cluster) if err != nil { return nil, fmt.Errorf("failed to initialize block store queryable: %v", err) } @@ -722,7 +722,9 @@ func (t *Mimir) initFlusher() (serv services.Service, err error) { // initQueryFrontendCodec initializes query frontend codec. // NOTE: Grafana Enterprise Metrics depends on this. func (t *Mimir) initQueryFrontendCodec() (services.Service, error) { - t.QueryFrontendCodec = querymiddleware.NewPrometheusCodec(t.Registerer, t.Cfg.Querier.EngineConfig.LookbackDelta, t.Cfg.Frontend.QueryMiddleware.QueryResultResponseFormat, t.Cfg.Frontend.QueryMiddleware.ExtraPropagateHeaders) + // Always pass through the cluster header. + propagateHeaders := append([]string{clusterutil.ClusterHeader}, t.Cfg.Frontend.QueryMiddleware.ExtraPropagateHeaders...) + t.QueryFrontendCodec = querymiddleware.NewPrometheusCodec(t.Registerer, t.Cfg.Querier.EngineConfig.LookbackDelta, t.Cfg.Frontend.QueryMiddleware.QueryResultResponseFormat, propagateHeaders) return nil, nil } @@ -787,19 +789,12 @@ func (t *Mimir) initQueryFrontendTripperware() (serv services.Service, err error } func (t *Mimir) initQueryFrontend() (serv services.Service, err error) { + t.Cfg.Frontend.Cluster = t.Cfg.Server.Cluster t.Cfg.Frontend.FrontendV2.QuerySchedulerDiscovery = t.Cfg.QueryScheduler.ServiceDiscovery t.Cfg.Frontend.FrontendV2.LookBackDelta = t.Cfg.Querier.EngineConfig.LookbackDelta t.Cfg.Frontend.FrontendV2.QueryStoreAfter = t.Cfg.Querier.QueryStoreAfter - roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend( - t.Cfg.Frontend, - t.Overrides, - t.Overrides, - t.Cfg.Server.GRPCListenPort, - util_log.Logger, - t.Registerer, - t.QueryFrontendCodec, - ) + roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(t.Cfg.Frontend, t.Overrides, t.Overrides, t.Cfg.Server.GRPCListenPort, util_log.Logger, t.Registerer, t.QueryFrontendCodec, t.Cfg.Server.Cluster) if err != nil { return nil, err } @@ -878,11 +873,12 @@ func (t *Mimir) initRuler() (serv services.Service, err error) { var queryFunc rules.QueryFunc if t.Cfg.Ruler.QueryFrontend.Address != "" { - queryFrontendClient, err := ruler.DialQueryFrontend(t.Cfg.Ruler.QueryFrontend) + logger := util_log.Logger + queryFrontendClient, err := ruler.DialQueryFrontend(t.Cfg.Ruler.QueryFrontend, t.Cfg.Server.Cluster, logger) if err != nil { return nil, err } - remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, t.Cfg.Querier.EngineConfig.Timeout, t.Cfg.Ruler.QueryFrontend.MaxRetriesRate, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.API.PrometheusHTTPPrefix, util_log.Logger, ruler.WithOrgIDMiddleware) + remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, t.Cfg.Querier.EngineConfig.Timeout, t.Cfg.Ruler.QueryFrontend.MaxRetriesRate, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.API.PrometheusHTTPPrefix, logger, ruler.WithOrgIDMiddleware) embeddedQueryable = prom_remote.NewSampleAndChunkQueryableClient( remoteQuerier, @@ -965,6 +961,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) { return nil, err } + t.Cfg.Ruler.ClientTLSConfig.Cluster = t.Cfg.Server.Cluster t.Ruler, err = ruler.NewRuler( t.Cfg.Ruler, manager, @@ -1084,7 +1081,7 @@ func (t *Mimir) initMemberlistKV() (services.Service, error) { func (t *Mimir) initQueryScheduler() (services.Service, error) { t.Cfg.QueryScheduler.ServiceDiscovery.SchedulerRing.ListenPort = t.Cfg.Server.GRPCListenPort - s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, t.Registerer) + s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, t.Registerer, t.Cfg.Server.Cluster) if err != nil { return nil, errors.Wrap(err, "query-scheduler init") } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index d34e3e2d56f..534b60f2451 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -198,7 +198,7 @@ func NewBlocksStoreQueryable( return q, nil } -func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegateway.Config, storageCfg mimir_tsdb.BlocksStorageConfig, limits BlocksStoreLimits, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) { +func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegateway.Config, storageCfg mimir_tsdb.BlocksStorageConfig, limits BlocksStoreLimits, logger log.Logger, reg prometheus.Registerer, cluster string) (*BlocksStoreQueryable, error) { var ( stores BlocksStoreSet bucketClient objstore.Bucket @@ -254,7 +254,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa ) } - stores, err = newBlocksStoreReplicationSet(storesRing, randomLoadBalancing, dynamicReplication, limits, querierCfg.StoreGatewayClient, logger, reg) + stores, err = newBlocksStoreReplicationSet(storesRing, randomLoadBalancing, dynamicReplication, limits, querierCfg.StoreGatewayClient, logger, reg, cluster) if err != nil { return nil, errors.Wrap(err, "failed to create store set") } diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index d692cd0d7e4..d967560f98d 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -1779,7 +1779,7 @@ func TestBlocksStoreQuerier_ShouldReturnContextCanceledIfContextWasCanceledWhile clientCfg := grpcclient.Config{} flagext.DefaultValues(&clientCfg) - client, err := dialStoreGatewayClient(clientCfg, ring.InstanceDesc{Addr: listener.Addr().String()}, promauto.With(nil).NewHistogramVec(prometheus.HistogramOpts{}, []string{"route", "status_code"})) + client, err := dialStoreGatewayClient(clientCfg, ring.InstanceDesc{Addr: listener.Addr().String()}, promauto.With(nil).NewHistogramVec(prometheus.HistogramOpts{}, []string{"route", "status_code"}), "", nil) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, client.Close()) diff --git a/pkg/querier/blocks_store_replicated_set.go b/pkg/querier/blocks_store_replicated_set.go index 37f51431e81..7b99b4a52f5 100644 --- a/pkg/querier/blocks_store_replicated_set.go +++ b/pkg/querier/blocks_store_replicated_set.go @@ -47,18 +47,10 @@ type blocksStoreReplicationSet struct { subservicesWatcher *services.FailureWatcher } -func newBlocksStoreReplicationSet( - storesRing *ring.Ring, - balancingStrategy loadBalancingStrategy, - dynamicReplication storegateway.DynamicReplication, - limits BlocksStoreLimits, - clientConfig ClientConfig, - logger log.Logger, - reg prometheus.Registerer, -) (*blocksStoreReplicationSet, error) { +func newBlocksStoreReplicationSet(storesRing *ring.Ring, balancingStrategy loadBalancingStrategy, dynamicReplication storegateway.DynamicReplication, limits BlocksStoreLimits, clientConfig ClientConfig, logger log.Logger, reg prometheus.Registerer, cluster string) (*blocksStoreReplicationSet, error) { s := &blocksStoreReplicationSet{ storesRing: storesRing, - clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg), + clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg, cluster), dynamicReplication: dynamicReplication, balancingStrategy: balancingStrategy, limits: limits, diff --git a/pkg/querier/blocks_store_replicated_set_test.go b/pkg/querier/blocks_store_replicated_set_test.go index 4f1012731e9..cc5b31df5de 100644 --- a/pkg/querier/blocks_store_replicated_set_test.go +++ b/pkg/querier/blocks_store_replicated_set_test.go @@ -356,7 +356,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { } reg := prometheus.NewPedanticRegistry() - s, err := newBlocksStoreReplicationSet(r, noLoadBalancing, storegateway.NewNopDynamicReplication(), limits, ClientConfig{}, log.NewNopLogger(), reg) + s, err := newBlocksStoreReplicationSet(r, noLoadBalancing, storegateway.NewNopDynamicReplication(), limits, ClientConfig{}, log.NewNopLogger(), reg, "") require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, s)) defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck @@ -426,7 +426,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor_ShouldSupportRandomLoadBalancin limits := &blocksStoreLimitsMock{storeGatewayTenantShardSize: 0} reg := prometheus.NewPedanticRegistry() - s, err := newBlocksStoreReplicationSet(r, randomLoadBalancing, storegateway.NewNopDynamicReplication(), limits, ClientConfig{}, log.NewNopLogger(), reg) + s, err := newBlocksStoreReplicationSet(r, randomLoadBalancing, storegateway.NewNopDynamicReplication(), limits, ClientConfig{}, log.NewNopLogger(), reg, "") require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, s)) defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck diff --git a/pkg/querier/store_gateway_client.go b/pkg/querier/store_gateway_client.go index 583d9c4067a..b25fdc639bc 100644 --- a/pkg/querier/store_gateway_client.go +++ b/pkg/querier/store_gateway_client.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/crypto/tls" "github.com/grafana/dskit/grpcclient" + "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/ring/client" "github.com/pkg/errors" @@ -23,7 +24,7 @@ import ( "github.com/grafana/mimir/pkg/storegateway/storegatewaypb" ) -func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory { +func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer, cluster string, logger log.Logger) client.PoolFactory { requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", Name: "storegateway_client_request_duration_seconds", @@ -33,12 +34,14 @@ func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Re }, []string{"operation", "status_code"}) return client.PoolInstFunc(func(inst ring.InstanceDesc) (client.PoolClient, error) { - return dialStoreGatewayClient(clientCfg, inst, requestDuration) + return dialStoreGatewayClient(clientCfg, inst, requestDuration, cluster, logger) }) } -func dialStoreGatewayClient(clientCfg grpcclient.Config, inst ring.InstanceDesc, requestDuration *prometheus.HistogramVec) (*storeGatewayClient, error) { - opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration)) +func dialStoreGatewayClient(clientCfg grpcclient.Config, inst ring.InstanceDesc, requestDuration *prometheus.HistogramVec, cluster string, logger log.Logger) (*storeGatewayClient, error) { + unary, stream := grpcclient.Instrument(requestDuration) + unary = append(unary, middleware.ClusterUnaryClientInterceptor(cluster)) + opts, err := clientCfg.DialOption(unary, stream) if err != nil { return nil, err } @@ -74,7 +77,7 @@ func (c *storeGatewayClient) RemoteAddress() string { return c.conn.Target() } -func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConfig ClientConfig, logger log.Logger, reg prometheus.Registerer) *client.Pool { +func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConfig ClientConfig, logger log.Logger, reg prometheus.Registerer, cluster string) *client.Pool { // We prefer sane defaults instead of exposing further config options. clientCfg := grpcclient.Config{ MaxRecvMsgSize: 100 << 20, @@ -99,7 +102,7 @@ func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConf ConstLabels: map[string]string{"client": "querier"}, }) - return client.NewPool("store-gateway", poolCfg, discovery, newStoreGatewayClientFactory(clientCfg, reg), clientsCount, logger) + return client.NewPool("store-gateway", poolCfg, discovery, newStoreGatewayClientFactory(clientCfg, reg, cluster, logger), clientsCount, logger) } type ClientConfig struct { diff --git a/pkg/querier/store_gateway_client_test.go b/pkg/querier/store_gateway_client_test.go index 3bcdfce2677..15082608412 100644 --- a/pkg/querier/store_gateway_client_test.go +++ b/pkg/querier/store_gateway_client_test.go @@ -45,7 +45,7 @@ func Test_newStoreGatewayClientFactory(t *testing.T) { flagext.DefaultValues(&cfg) reg := prometheus.NewPedanticRegistry() - factory := newStoreGatewayClientFactory(cfg, reg) + factory := newStoreGatewayClientFactory(cfg, reg, "", nil) for i := 0; i < 2; i++ { inst := ring.InstanceDesc{Addr: listener.Addr().String()} diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go index d1ec2da6990..074482d5f39 100644 --- a/pkg/querier/worker/scheduler_processor.go +++ b/pkg/querier/worker/scheduler_processor.go @@ -19,6 +19,7 @@ import ( "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/grpcutil" "github.com/grafana/dskit/httpgrpc" + "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/services" "github.com/grafana/dskit/user" @@ -50,13 +51,14 @@ const ( var errQuerierQuerySchedulerProcessingLoopTerminated = cancellation.NewErrorf("querier query-scheduler processing loop terminated") var errQueryEvaluationFinished = cancellation.NewErrorf("query evaluation finished") -func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (*schedulerProcessor, []services.Service) { +func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer, cluster string) (*schedulerProcessor, []services.Service) { p := &schedulerProcessor{ log: log, handler: handler, streamResponse: streamResponse, maxMessageSize: cfg.QueryFrontendGRPCClientConfig.MaxSendMsgSize, querierID: cfg.QuerierID, + cluster: cluster, grpcConfig: cfg.QueryFrontendGRPCClientConfig, streamingEnabled: cfg.ResponseStreamingEnabled, @@ -103,6 +105,7 @@ type schedulerProcessor struct { grpcConfig grpcclient.Config maxMessageSize int querierID string + cluster string streamingEnabled bool frontendPool *client.Pool @@ -434,7 +437,9 @@ func (w httpGrpcHeaderWriter) Set(key, val string) { } func (sp *schedulerProcessor) createFrontendClient(addr string) (client.PoolClient, error) { - opts, err := sp.grpcConfig.DialOption(grpcclient.Instrument(sp.frontendClientRequestDuration)) + unary, stream := grpcclient.Instrument(sp.frontendClientRequestDuration) + unary = append(unary, middleware.ClusterUnaryClientInterceptor(sp.cluster)) + opts, err := sp.grpcConfig.DialOption(unary, stream) if err != nil { return nil, err } diff --git a/pkg/querier/worker/scheduler_processor_test.go b/pkg/querier/worker/scheduler_processor_test.go index 26d45ccdcd0..8888c320697 100644 --- a/pkg/querier/worker/scheduler_processor_test.go +++ b/pkg/querier/worker/scheduler_processor_test.go @@ -339,7 +339,7 @@ func TestCreateSchedulerProcessor(t *testing.T) { QueryFrontendGRPCClientConfig: conf, QuerySchedulerGRPCClientConfig: grpcclient.Config{MaxSendMsgSize: 5 * 1024}, // schedulerProcessor should ignore this. MaxConcurrentRequests: 5, - }, nil, nil, nil) + }, nil, nil, nil, "") assert.Equal(t, 1*1024*1024, sp.maxMessageSize) assert.Equal(t, conf, sp.grpcConfig) @@ -620,7 +620,7 @@ func prepareSchedulerProcessor(t *testing.T) (*schedulerProcessor, *querierLoopC requestHandler := &requestHandlerMock{} - sp, _ := newSchedulerProcessor(Config{QuerierID: "test-querier-id"}, requestHandler, log.NewNopLogger(), nil) + sp, _ := newSchedulerProcessor(Config{QuerierID: "test-querier-id"}, requestHandler, log.NewNopLogger(), nil, "") sp.schedulerClientFactory = func(_ *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient { return schedulerClient } diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index 2b9459a116e..3163c3799bd 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -17,6 +17,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/httpgrpc" + "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/servicediscovery" "github.com/grafana/dskit/services" "github.com/pkg/errors" @@ -106,6 +107,7 @@ type querierWorker struct { maxConcurrentRequests int grpcClientConfig grpcclient.Config + cluster string log log.Logger processor processor @@ -119,7 +121,7 @@ type querierWorker struct { instances map[string]servicediscovery.Instance } -func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (services.Service, error) { +func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer, cluster string) (services.Service, error) { if cfg.QuerierID == "" { hostname, err := os.Hostname() if err != nil { @@ -142,7 +144,7 @@ func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg pr } grpcCfg = cfg.QuerySchedulerGRPCClientConfig - processor, servs = newSchedulerProcessor(cfg, handler, log, reg) + processor, servs = newSchedulerProcessor(cfg, handler, log, reg, cluster) case cfg.FrontendAddress != "": level.Info(log).Log("msg", "Starting querier worker connected to query-frontend", "frontend", cfg.FrontendAddress) @@ -158,13 +160,14 @@ func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg pr return nil, errors.New("no query-scheduler or query-frontend address") } - return newQuerierWorkerWithProcessor(grpcCfg, cfg.MaxConcurrentRequests, log, processor, factory, servs) + return newQuerierWorkerWithProcessor(grpcCfg, cfg.MaxConcurrentRequests, log, processor, factory, servs, cluster) } -func newQuerierWorkerWithProcessor(grpcCfg grpcclient.Config, maxConcReq int, log log.Logger, processor processor, newServiceDiscovery serviceDiscoveryFactory, servs []services.Service) (*querierWorker, error) { +func newQuerierWorkerWithProcessor(grpcCfg grpcclient.Config, maxConcReq int, log log.Logger, processor processor, newServiceDiscovery serviceDiscoveryFactory, servs []services.Service, cluster string) (*querierWorker, error) { f := &querierWorker{ grpcClientConfig: grpcCfg, maxConcurrentRequests: maxConcReq, + cluster: cluster, log: log, managers: map[string]*processorManager{}, instances: map[string]servicediscovery.Instance{}, @@ -389,7 +392,7 @@ func (w *querierWorker) getDesiredConcurrency() map[string]int { func (w *querierWorker) connect(ctx context.Context, address string) (*grpc.ClientConn, error) { // Because we only use single long-running method, it doesn't make sense to inject user ID, send over tracing or add metrics. - opts, err := w.grpcClientConfig.DialOption(nil, nil) + opts, err := w.grpcClientConfig.DialOption([]grpc.UnaryClientInterceptor{middleware.ClusterUnaryClientInterceptor(w.cluster)}, nil) if err != nil { return nil, err diff --git a/pkg/querier/worker/worker_test.go b/pkg/querier/worker/worker_test.go index 7a6c937a99d..0b26dac1ed6 100644 --- a/pkg/querier/worker/worker_test.go +++ b/pkg/querier/worker/worker_test.go @@ -174,7 +174,7 @@ func TestResetConcurrency(t *testing.T) { MaxConcurrentRequests: tt.maxConcurrent, } - w, err := newQuerierWorkerWithProcessor(cfg.QuerySchedulerGRPCClientConfig, cfg.MaxConcurrentRequests, log.NewNopLogger(), &mockProcessor{}, nil, nil) + w, err := newQuerierWorkerWithProcessor(cfg.QuerySchedulerGRPCClientConfig, cfg.MaxConcurrentRequests, log.NewNopLogger(), &mockProcessor{}, nil, nil, "") require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), w)) @@ -261,7 +261,7 @@ func TestQuerierWorker_getDesiredConcurrency(t *testing.T) { MaxConcurrentRequests: testData.maxConcurrent, } - w, err := newQuerierWorkerWithProcessor(cfg.QueryFrontendGRPCClientConfig, cfg.MaxConcurrentRequests, log.NewNopLogger(), &mockProcessor{}, nil, nil) + w, err := newQuerierWorkerWithProcessor(cfg.QueryFrontendGRPCClientConfig, cfg.MaxConcurrentRequests, log.NewNopLogger(), &mockProcessor{}, nil, nil, "") require.NoError(t, err) for _, instance := range testData.instances { diff --git a/pkg/ruler/client_pool.go b/pkg/ruler/client_pool.go index 78927875f6c..11cf87b38c8 100644 --- a/pkg/ruler/client_pool.go +++ b/pkg/ruler/client_pool.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/grpcclient" + "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/services" @@ -20,6 +21,11 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" ) +type ClientConfig struct { + grpcclient.Config + Cluster string `yaml:"-"` +} + // ClientsPool is the interface used to get the client from the pool for a specified address. type ClientsPool interface { services.Service @@ -39,7 +45,7 @@ func (p *rulerClientsPool) GetClientForInstance(inst ring.InstanceDesc) (RulerCl return c.(RulerClient), nil } -func newRulerClientPool(clientCfg grpcclient.Config, logger log.Logger, reg prometheus.Registerer) ClientsPool { +func newRulerClientPool(clientCfg ClientConfig, logger log.Logger, reg prometheus.Registerer) ClientsPool { // We prefer sane defaults instead of exposing further config options. poolCfg := client.PoolConfig{ CheckInterval: 10 * time.Second, @@ -53,11 +59,11 @@ func newRulerClientPool(clientCfg grpcclient.Config, logger log.Logger, reg prom }) return &rulerClientsPool{ - client.NewPool("ruler", poolCfg, nil, newRulerClientFactory(clientCfg, reg), clientsCount, logger), + client.NewPool("ruler", poolCfg, nil, newRulerClientFactory(clientCfg, reg, logger), clientsCount, logger), } } -func newRulerClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory { +func newRulerClientFactory(clientCfg ClientConfig, reg prometheus.Registerer, logger log.Logger) client.PoolFactory { requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "cortex_ruler_client_request_duration_seconds", Help: "Time spent executing requests to the ruler.", @@ -65,12 +71,14 @@ func newRulerClientFactory(clientCfg grpcclient.Config, reg prometheus.Registere }, []string{"operation", "status_code"}) return client.PoolInstFunc(func(inst ring.InstanceDesc) (client.PoolClient, error) { - return dialRulerClient(clientCfg, inst, requestDuration) + return dialRulerClient(clientCfg, inst, requestDuration, logger) }) } -func dialRulerClient(clientCfg grpcclient.Config, inst ring.InstanceDesc, requestDuration *prometheus.HistogramVec) (*rulerExtendedClient, error) { - opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration)) +func dialRulerClient(clientCfg ClientConfig, inst ring.InstanceDesc, requestDuration *prometheus.HistogramVec, logger log.Logger) (*rulerExtendedClient, error) { + unary, stream := grpcclient.Instrument(requestDuration) + unary = append(unary, middleware.ClusterUnaryClientInterceptor(clientCfg.Cluster)) + opts, err := clientCfg.DialOption(unary, stream) if err != nil { return nil, err } diff --git a/pkg/ruler/client_pool_test.go b/pkg/ruler/client_pool_test.go index fd1e8a40d56..081a56b94c1 100644 --- a/pkg/ruler/client_pool_test.go +++ b/pkg/ruler/client_pool_test.go @@ -42,7 +42,7 @@ func Test_newRulerClientFactory(t *testing.T) { flagext.DefaultValues(&cfg) reg := prometheus.NewPedanticRegistry() - factory := newRulerClientFactory(cfg, reg) + factory := newRulerClientFactory(cfg, reg, nil) for i := 0; i < 2; i++ { inst := ring.InstanceDesc{Addr: listener.Addr().String()} diff --git a/pkg/ruler/remotequerier.go b/pkg/ruler/remotequerier.go index b9bd51101dd..63929301449 100644 --- a/pkg/ruler/remotequerier.go +++ b/pkg/ruler/remotequerier.go @@ -95,10 +95,11 @@ func (c *QueryFrontendConfig) Validate() error { } // DialQueryFrontend creates and initializes a new httpgrpc.HTTPClient taking a QueryFrontendConfig configuration. -func DialQueryFrontend(cfg QueryFrontendConfig) (httpgrpc.HTTPClient, error) { +func DialQueryFrontend(cfg QueryFrontendConfig, cluster string, logger log.Logger) (httpgrpc.HTTPClient, error) { opts, err := cfg.GRPCClientConfig.DialOption([]grpc.UnaryClientInterceptor{ otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), middleware.ClientUserHeaderInterceptor, + middleware.ClusterUnaryClientInterceptor(cluster), }, nil) if err != nil { return nil, err diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 3e6493b0285..13c3a1b60fb 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -23,7 +23,6 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/kv" "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/ring" @@ -94,7 +93,7 @@ type Config struct { // This is used for template expansion in alerts; must be a valid URL. ExternalURL flagext.URLValue `yaml:"external_url"` // GRPC Client configuration. - ClientTLSConfig grpcclient.Config `yaml:"ruler_client" doc:"description=Configures the gRPC client used to communicate between ruler instances."` + ClientTLSConfig ClientConfig `yaml:"ruler_client" doc:"description=Configures the gRPC client used to communicate between ruler instances."` // How frequently to evaluate rules by default. EvaluationInterval time.Duration `yaml:"evaluation_interval" category:"advanced"` // How frequently to poll for updated rules. diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 10d5aa6ac4e..5a3c398916d 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -86,6 +86,7 @@ type Scheduler struct { connectedFrontendClients prometheus.GaugeFunc queueDuration *prometheus.HistogramVec inflightRequests prometheus.Summary + cluster string } type connectedFrontend struct { @@ -119,13 +120,14 @@ func (cfg *Config) Validate() error { } // NewScheduler creates a new Scheduler. -func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Scheduler, error) { +func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer, cluster string) (*Scheduler, error) { var err error s := &Scheduler{ - cfg: cfg, - log: log, - limits: limits, + cfg: cfg, + log: log, + limits: limits, + cluster: cluster, schedulerInflightRequests: map[queue.RequestKey]*queue.SchedulerRequest{}, connectedFrontends: map[string]*connectedFrontend{}, @@ -562,7 +564,8 @@ func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuer func (s *Scheduler) forwardErrorToFrontend(ctx context.Context, req *queue.SchedulerRequest, requestErr error) { opts, err := s.cfg.GRPCClientConfig.DialOption([]grpc.UnaryClientInterceptor{ otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), - middleware.ClientUserHeaderInterceptor}, + middleware.ClientUserHeaderInterceptor, + middleware.ClusterUnaryClientInterceptor(s.cluster)}, nil) if err != nil { level.Warn(s.log).Log("msg", "failed to create gRPC options for the connection to frontend to report error", "frontend", req.FrontendAddr, "err", err, "requestErr", requestErr) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 7f4a458d2e5..6eff68c7cd4 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -50,7 +50,7 @@ func setupScheduler(t *testing.T, reg prometheus.Registerer) (*Scheduler, schedu flagext.DefaultValues(&cfg) cfg.MaxOutstandingPerTenant = testMaxOutstandingPerTenant - s, err := NewScheduler(cfg, &limits{queriers: 2}, log.NewNopLogger(), reg) + s, err := NewScheduler(cfg, &limits{queriers: 2}, log.NewNopLogger(), reg, "") require.NoError(t, err) server := grpc.NewServer() diff --git a/vendor/github.com/grafana/dskit/clusterutil/clusterutil.go b/vendor/github.com/grafana/dskit/clusterutil/clusterutil.go new file mode 100644 index 00000000000..c3650c08460 --- /dev/null +++ b/vendor/github.com/grafana/dskit/clusterutil/clusterutil.go @@ -0,0 +1,9 @@ +package clusterutil + +const ( + // ClusterHeader is the name of the cluster identifying HTTP header. + ClusterHeader = "X-Cluster" + + // MetadataClusterKey is the key of the cluster gRPC metadata. + MetadataClusterKey = "x-cluster" +) diff --git a/vendor/github.com/grafana/dskit/grpcutil/error_details.pb.go b/vendor/github.com/grafana/dskit/grpcutil/error_details.pb.go new file mode 100644 index 00000000000..87b831ae1e9 --- /dev/null +++ b/vendor/github.com/grafana/dskit/grpcutil/error_details.pb.go @@ -0,0 +1,421 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: error_details.proto + +package grpcutil + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" + math_bits "math/bits" + reflect "reflect" + strconv "strconv" + strings "strings" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type ErrorCause int32 + +const ( + UNKNOWN_CAUSE ErrorCause = 0 + WRONG_CLUSTER_NAME ErrorCause = 1 +) + +var ErrorCause_name = map[int32]string{ + 0: "UNKNOWN_CAUSE", + 1: "WRONG_CLUSTER_NAME", +} + +var ErrorCause_value = map[string]int32{ + "UNKNOWN_CAUSE": 0, + "WRONG_CLUSTER_NAME": 1, +} + +func (ErrorCause) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_bbac13548d6353a4, []int{0} +} + +type ErrorDetails struct { + Cause ErrorCause `protobuf:"varint,1,opt,name=Cause,proto3,enum=grpcutil.ErrorCause" json:"Cause,omitempty"` +} + +func (m *ErrorDetails) Reset() { *m = ErrorDetails{} } +func (*ErrorDetails) ProtoMessage() {} +func (*ErrorDetails) Descriptor() ([]byte, []int) { + return fileDescriptor_bbac13548d6353a4, []int{0} +} +func (m *ErrorDetails) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ErrorDetails) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ErrorDetails.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ErrorDetails) XXX_Merge(src proto.Message) { + xxx_messageInfo_ErrorDetails.Merge(m, src) +} +func (m *ErrorDetails) XXX_Size() int { + return m.Size() +} +func (m *ErrorDetails) XXX_DiscardUnknown() { + xxx_messageInfo_ErrorDetails.DiscardUnknown(m) +} + +var xxx_messageInfo_ErrorDetails proto.InternalMessageInfo + +func (m *ErrorDetails) GetCause() ErrorCause { + if m != nil { + return m.Cause + } + return UNKNOWN_CAUSE +} + +func init() { + proto.RegisterEnum("grpcutil.ErrorCause", ErrorCause_name, ErrorCause_value) + proto.RegisterType((*ErrorDetails)(nil), "grpcutil.ErrorDetails") +} + +func init() { proto.RegisterFile("error_details.proto", fileDescriptor_bbac13548d6353a4) } + +var fileDescriptor_bbac13548d6353a4 = []byte{ + // 207 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4e, 0x2d, 0x2a, 0xca, + 0x2f, 0x8a, 0x4f, 0x49, 0x2d, 0x49, 0xcc, 0xcc, 0x29, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, + 0xe2, 0x48, 0x2f, 0x2a, 0x48, 0x2e, 0x2d, 0xc9, 0xcc, 0x51, 0xb2, 0xe2, 0xe2, 0x71, 0x05, 0x29, + 0x70, 0x81, 0xc8, 0x0b, 0x69, 0x71, 0xb1, 0x3a, 0x27, 0x96, 0x16, 0xa7, 0x4a, 0x30, 0x2a, 0x30, + 0x6a, 0xf0, 0x19, 0x89, 0xe8, 0xc1, 0x54, 0xea, 0x81, 0x95, 0x81, 0xe5, 0x82, 0x20, 0x4a, 0xb4, + 0xcc, 0xb9, 0xb8, 0x10, 0x82, 0x42, 0x82, 0x5c, 0xbc, 0xa1, 0x7e, 0xde, 0x7e, 0xfe, 0xe1, 0x7e, + 0xf1, 0xce, 0x8e, 0xa1, 0xc1, 0xae, 0x02, 0x0c, 0x42, 0x62, 0x5c, 0x42, 0xe1, 0x41, 0xfe, 0x7e, + 0xee, 0xf1, 0xce, 0x3e, 0xa1, 0xc1, 0x21, 0xae, 0x41, 0xf1, 0x7e, 0x8e, 0xbe, 0xae, 0x02, 0x8c, + 0x4e, 0x76, 0x17, 0x1e, 0xca, 0x31, 0xdc, 0x78, 0x28, 0xc7, 0xf0, 0xe1, 0xa1, 0x1c, 0x63, 0xc3, + 0x23, 0x39, 0xc6, 0x15, 0x8f, 0xe4, 0x18, 0x4f, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1, + 0xc1, 0x23, 0x39, 0xc6, 0x17, 0x8f, 0xe4, 0x18, 0x3e, 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, + 0xe1, 0xc2, 0x63, 0x39, 0x86, 0x1b, 0x8f, 0xe5, 0x18, 0xa2, 0xe0, 0x8e, 0x4e, 0x62, 0x03, 0xfb, + 0xc2, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x16, 0x1b, 0xdc, 0x89, 0xdc, 0x00, 0x00, 0x00, +} + +func (x ErrorCause) String() string { + s, ok := ErrorCause_name[int32(x)] + if ok { + return s + } + return strconv.Itoa(int(x)) +} +func (this *ErrorDetails) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*ErrorDetails) + if !ok { + that2, ok := that.(ErrorDetails) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Cause != that1.Cause { + return false + } + return true +} +func (this *ErrorDetails) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&grpcutil.ErrorDetails{") + s = append(s, "Cause: "+fmt.Sprintf("%#v", this.Cause)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringErrorDetails(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} +func (m *ErrorDetails) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ErrorDetails) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ErrorDetails) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Cause != 0 { + i = encodeVarintErrorDetails(dAtA, i, uint64(m.Cause)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func encodeVarintErrorDetails(dAtA []byte, offset int, v uint64) int { + offset -= sovErrorDetails(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *ErrorDetails) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Cause != 0 { + n += 1 + sovErrorDetails(uint64(m.Cause)) + } + return n +} + +func sovErrorDetails(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozErrorDetails(x uint64) (n int) { + return sovErrorDetails(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *ErrorDetails) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ErrorDetails{`, + `Cause:` + fmt.Sprintf("%v", this.Cause) + `,`, + `}`, + }, "") + return s +} +func valueToStringErrorDetails(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *ErrorDetails) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowErrorDetails + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ErrorDetails: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ErrorDetails: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Cause", wireType) + } + m.Cause = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowErrorDetails + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Cause |= ErrorCause(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipErrorDetails(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthErrorDetails + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthErrorDetails + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipErrorDetails(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowErrorDetails + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowErrorDetails + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowErrorDetails + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthErrorDetails + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthErrorDetails + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowErrorDetails + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipErrorDetails(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthErrorDetails + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthErrorDetails = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowErrorDetails = fmt.Errorf("proto: integer overflow") +) diff --git a/vendor/github.com/grafana/dskit/grpcutil/error_details.proto b/vendor/github.com/grafana/dskit/grpcutil/error_details.proto new file mode 100644 index 00000000000..81dfc422524 --- /dev/null +++ b/vendor/github.com/grafana/dskit/grpcutil/error_details.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package grpcutil; + +option go_package = "grpcutil"; + +enum ErrorCause { + UNKNOWN_CAUSE = 0; + WRONG_CLUSTER_NAME = 1; +} + +message ErrorDetails { + ErrorCause Cause = 1; +} diff --git a/vendor/github.com/grafana/dskit/grpcutil/status.go b/vendor/github.com/grafana/dskit/grpcutil/status.go index a9e9aab249a..0b279474998 100644 --- a/vendor/github.com/grafana/dskit/grpcutil/status.go +++ b/vendor/github.com/grafana/dskit/grpcutil/status.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "github.com/gogo/protobuf/proto" "github.com/gogo/status" "google.golang.org/grpc/codes" grpcstatus "google.golang.org/grpc/status" @@ -60,6 +61,17 @@ func ErrorToStatusCode(err error) codes.Code { return codes.Unknown } +func Status(errCode codes.Code, errMessage string, details ...proto.Message) *status.Status { + stat := status.New(errCode, errMessage) + if details != nil { + statWithDetails, err := stat.WithDetails(details...) + if err == nil { + return statWithDetails + } + } + return stat +} + // IsCanceled checks whether an error comes from an operation being canceled. func IsCanceled(err error) bool { if errors.Is(err, context.Canceled) { diff --git a/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go b/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go index 616023899b7..350b2c8a90b 100644 --- a/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go +++ b/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go @@ -17,6 +17,7 @@ import ( "github.com/gogo/status" "google.golang.org/grpc/metadata" + "github.com/grafana/dskit/clusterutil" "github.com/grafana/dskit/grpcutil" "github.com/grafana/dskit/log" ) @@ -42,7 +43,7 @@ func (nopCloser) Close() error { return nil } // BytesBuffer returns the underlaying `bytes.buffer` used to build this io.ReadCloser. func (n nopCloser) BytesBuffer() *bytes.Buffer { return n.Buffer } -// FromHTTPRequest converts an ordinary http.Request into an httpgrpc.HTTPRequest +// FromHTTPRequest converts an ordinary http.Request into an httpgrpc.HTTPRequest. func FromHTTPRequest(r *http.Request) (*HTTPRequest, error) { body, err := io.ReadAll(r.Body) if err != nil { @@ -56,6 +57,21 @@ func FromHTTPRequest(r *http.Request) (*HTTPRequest, error) { }, nil } +// FromHTTPRequestWithCluster converts an ordinary http.Request into an httpgrpc.HTTPRequest. +// It's the same as FromHTTPRequest except that if cluster is non-empty, it has to be equal to the +// middleware.ClusterHeader header (or an error is returned). +func FromHTTPRequestWithCluster(r *http.Request, cluster string) (*HTTPRequest, error) { + if cluster != "" { + if c := r.Header.Get(clusterutil.ClusterHeader); c != cluster { + return nil, fmt.Errorf( + "httpgrpc.FromHTTPRequest: %q header should be %q, but is %q", + clusterutil.ClusterHeader, cluster, c, + ) + } + } + return FromHTTPRequest(r) +} + // ToHTTPRequest converts httpgrpc.HTTPRequest to http.Request. func ToHTTPRequest(ctx context.Context, r *HTTPRequest) (*http.Request, error) { req, err := http.NewRequest(r.Method, r.Url, nopCloser{Buffer: bytes.NewBuffer(r.Body)}) diff --git a/vendor/github.com/grafana/dskit/middleware/cluster.go b/vendor/github.com/grafana/dskit/middleware/cluster.go new file mode 100644 index 00000000000..c370f6aac80 --- /dev/null +++ b/vendor/github.com/grafana/dskit/middleware/cluster.go @@ -0,0 +1,44 @@ +package middleware + +import ( + "fmt" + "net/http" + "regexp" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/dskit/clusterutil" +) + +// ClusterValidationMiddleware validates that requests are for the correct cluster. +func ClusterValidationMiddleware(cluster string, invalidClusters *prometheus.CounterVec, logger log.Logger) Interface { + return Func(func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + reqCluster := r.Header.Get(clusterutil.ClusterHeader) + if !auxilliaryPath(r.URL.Path) && reqCluster != cluster { + level.Warn(logger).Log("msg", "rejecting request intended for wrong cluster", + "cluster", cluster, "request_cluster", reqCluster, "header", clusterutil.ClusterHeader, + "url", r.URL, "path", r.URL.Path) + if invalidClusters != nil { + invalidClusters.WithLabelValues("http", r.URL.Path, reqCluster).Inc() + } + http.Error(w, fmt.Sprintf("request intended for cluster %q - this is cluster %q", reqCluster, cluster), + http.StatusBadRequest) + return + } + + next.ServeHTTP(w, r) + }) + }) +} + +// Allow for a potential path prefix being configured. +// TODO: Take /backlog_replay_complete out, and allow for it to be configured instead (it's part of a readiness probe). +// TODO: Take /admission/* out, and allow for them to be configured instead (they're rollout operator k8s webhooks). +var reAuxPath = regexp.MustCompile(".*/(metrics|debug/pprof.*|ready|backlog_replay_complete|admission/no-downscale|admission/prepare-downscale)") + +func auxilliaryPath(pth string) bool { + return reAuxPath.MatchString(pth) +} diff --git a/vendor/github.com/grafana/dskit/middleware/grpc_cluster.go b/vendor/github.com/grafana/dskit/middleware/grpc_cluster.go index a705e75501f..5cedfb08b8b 100644 --- a/vendor/github.com/grafana/dskit/middleware/grpc_cluster.go +++ b/vendor/github.com/grafana/dskit/middleware/grpc_cluster.go @@ -6,7 +6,11 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/gogo/status" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/grafana/dskit/grpcutil" + + "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" @@ -30,13 +34,21 @@ func ClusterUnaryClientInterceptor(cluster string) grpc.UnaryClientInterceptor { // ClusterUnaryServerInterceptor checks if the incoming gRPC metadata contains any cluster information and if so, // checks if the latter corresponds to the given cluster. If it is the case, the request is further propagated. // Otherwise, an error is returned. -func ClusterUnaryServerInterceptor(cluster string, logger log.Logger) grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { +func ClusterUnaryServerInterceptor(cluster string, invalidClusters *prometheus.CounterVec, logger log.Logger) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + if _, ok := info.Server.(healthpb.HealthServer); ok { + return handler(ctx, req) + } reqCluster := getClusterFromIncomingContext(ctx, logger) if cluster != reqCluster { + level.Warn(logger).Log("msg", "rejecting request intended for wrong cluster", + "cluster", cluster, "request_cluster", reqCluster, "method", info.FullMethod) + if invalidClusters != nil { + invalidClusters.WithLabelValues("grpc", info.FullMethod, reqCluster).Inc() + } msg := fmt.Sprintf("request intended for cluster %q - this is cluster %q", reqCluster, cluster) - level.Warn(logger).Log("msg", msg) - return nil, status.Error(codes.FailedPrecondition, msg) + stat := grpcutil.Status(codes.FailedPrecondition, msg, &grpcutil.ErrorDetails{Cause: grpcutil.WRONG_CLUSTER_NAME}) + return nil, stat.Err() } return handler(ctx, req) } diff --git a/vendor/github.com/grafana/dskit/server/metrics.go b/vendor/github.com/grafana/dskit/server/metrics.go index b4a8ec662eb..3aff8f476a3 100644 --- a/vendor/github.com/grafana/dskit/server/metrics.go +++ b/vendor/github.com/grafana/dskit/server/metrics.go @@ -25,6 +25,7 @@ type Metrics struct { SentMessageSize *prometheus.HistogramVec InflightRequests *prometheus.GaugeVec RequestThroughput *prometheus.HistogramVec + InvalidClusters *prometheus.CounterVec } func NewServerMetrics(cfg Config) *Metrics { @@ -91,5 +92,11 @@ func NewServerMetrics(cfg Config) *Metrics { NativeHistogramMaxBucketNumber: 100, NativeHistogramMinResetDuration: time.Hour, }, []string{"method", "route"}), + InvalidClusters: reg.NewCounterVec(prometheus.CounterOpts{ + Namespace: cfg.MetricsNamespace, + Name: "request_invalid_clusters_total", + Help: "Number of requests with invalid cluster.", + ConstLabels: nil, + }, []string{"protocol", "method", "request_cluster"}), } } diff --git a/vendor/github.com/grafana/dskit/server/server.go b/vendor/github.com/grafana/dskit/server/server.go index 4ad92379304..3268a1afe6f 100644 --- a/vendor/github.com/grafana/dskit/server/server.go +++ b/vendor/github.com/grafana/dskit/server/server.go @@ -88,6 +88,7 @@ type Config struct { GRPCListenPort int `yaml:"grpc_listen_port"` GRPCConnLimit int `yaml:"grpc_listen_conn_limit"` ProxyProtocolEnabled bool `yaml:"proxy_protocol_enabled"` + Cluster string `yaml:"cluster"` CipherSuites string `yaml:"tls_cipher_suites"` MinVersion string `yaml:"tls_min_version"` @@ -184,7 +185,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.GRPCListenAddress, "server.grpc-listen-address", "", "gRPC server listen address.") f.IntVar(&cfg.GRPCListenPort, "server.grpc-listen-port", 9095, "gRPC server listen port.") f.IntVar(&cfg.GRPCConnLimit, "server.grpc-conn-limit", 0, "Maximum number of simultaneous grpc connections, <=0 to disable") - f.BoolVar(&cfg.RegisterInstrumentation, "server.register-instrumentation", true, "Register the intrumentation handlers (/metrics etc).") + f.BoolVar(&cfg.RegisterInstrumentation, "server.register-instrumentation", true, "Register the instrumentation handlers (/metrics etc).") f.BoolVar(&cfg.ReportGRPCCodesInInstrumentationLabel, "server.report-grpc-codes-in-instrumentation-label-enabled", false, "If set to true, gRPC statuses will be reported in instrumentation labels with their string representations. Otherwise, they will be reported as \"error\".") f.DurationVar(&cfg.ServerGracefulShutdownTimeout, "server.graceful-shutdown-timeout", 30*time.Second, "Timeout for graceful shutdowns") f.DurationVar(&cfg.HTTPServerReadTimeout, "server.http-read-timeout", 30*time.Second, "Read timeout for entire HTTP request, including headers and body.") @@ -216,6 +217,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.LogRequestExcludeHeadersList, "server.log-request-headers-exclude-list", "", "Comma separated list of headers to exclude from loggin. Only used if server.log-request-headers is true.") f.BoolVar(&cfg.LogRequestAtInfoLevel, "server.log-request-at-info-level-enabled", false, "Optionally log requests at info level instead of debug level. Applies to request headers as well if server.log-request-headers is enabled.") f.BoolVar(&cfg.ProxyProtocolEnabled, "server.proxy-protocol-enabled", false, "Enables PROXY protocol.") + f.StringVar(&cfg.Cluster, "server.cluster", "", "Optionally define the server's cluster, and enable validation that requests are for the same cluster.") f.DurationVar(&cfg.Throughput.LatencyCutoff, "server.throughput.latency-cutoff", 0, "Requests taking over the cutoff are be observed to measure throughput. Server-Timing header is used with specified unit as the indicator, for example 'Server-Timing: unit;val=8.2'. If set to 0, the throughput is not calculated.") f.StringVar(&cfg.Throughput.Unit, "server.throughput.unit", "samples_processed", "Unit of the server throughput metric, for example 'processed_bytes' or 'samples_processed'. Observed values are gathered from the 'Server-Timing' header with the 'val' key. If set, it is appended to the request_server_throughput metric name.") } @@ -399,6 +401,9 @@ func newServer(cfg Config, metrics *Metrics) (*Server, error) { middleware.HTTPGRPCTracingInterceptor(router), // This must appear after the OpenTracingServerInterceptor. middleware.UnaryServerInstrumentInterceptor(metrics.RequestDuration, grpcInstrumentationOptions...), } + if cfg.Cluster != "" { + grpcMiddleware = append(grpcMiddleware, middleware.ClusterUnaryServerInterceptor(cfg.Cluster, metrics.InvalidClusters, logger)) + } grpcMiddleware = append(grpcMiddleware, cfg.GRPCMiddleware...) grpcStreamMiddleware := []grpc.StreamServerInterceptor{ @@ -528,10 +533,14 @@ func BuildHTTPMiddleware(cfg Config, router *mux.Router, metrics *Metrics, logge logSourceIPs = nil } + if cfg.DoNotAddDefaultHTTPMiddleware { + return cfg.HTTPMiddleware, nil + } + defaultLogMiddleware := middleware.NewLogMiddleware(logger, cfg.LogRequestHeaders, cfg.LogRequestAtInfoLevel, logSourceIPs, strings.Split(cfg.LogRequestExcludeHeadersList, ",")) defaultLogMiddleware.DisableRequestSuccessLog = cfg.DisableRequestSuccessLog - defaultHTTPMiddleware := []middleware.Interface{ + httpMiddleware := []middleware.Interface{ middleware.RouteInjector{ RouteMatcher: router, }, @@ -552,14 +561,10 @@ func BuildHTTPMiddleware(cfg Config, router *mux.Router, metrics *Metrics, logge RequestThroughput: metrics.RequestThroughput, }, } - var httpMiddleware []middleware.Interface - if cfg.DoNotAddDefaultHTTPMiddleware { - httpMiddleware = cfg.HTTPMiddleware - } else { - httpMiddleware = append(defaultHTTPMiddleware, cfg.HTTPMiddleware...) + if cfg.Cluster != "" { + httpMiddleware = append(httpMiddleware, middleware.ClusterValidationMiddleware(cfg.Cluster, metrics.InvalidClusters, logger)) } - - return httpMiddleware, nil + return append(httpMiddleware, cfg.HTTPMiddleware...), nil } // Run the server; blocks until SIGTERM (if signal handling is enabled), an error is received, or Stop() is called. @@ -620,6 +625,11 @@ func handleGRPCError(err error, errChan chan error) { } } +// Cluster returns the configured server cluster. +func (s *Server) Cluster() string { + return s.cfg.Cluster +} + // HTTPListenAddr exposes `net.Addr` that `Server` is listening to for HTTP connections. func (s *Server) HTTPListenAddr() net.Addr { return s.httpListener.Addr() diff --git a/vendor/modules.txt b/vendor/modules.txt index f5ce82d638e..69edf27b2d5 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -644,12 +644,13 @@ github.com/grafana/alerting/receivers/webex github.com/grafana/alerting/receivers/webhook github.com/grafana/alerting/receivers/wecom github.com/grafana/alerting/templates -# github.com/grafana/dskit v0.0.0-20250131191929-eab36484cec2 +# github.com/grafana/dskit v0.0.0-20250204153901-2447c477a34f ## explicit; go 1.21 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast github.com/grafana/dskit/cache github.com/grafana/dskit/cancellation +github.com/grafana/dskit/clusterutil github.com/grafana/dskit/concurrency github.com/grafana/dskit/crypto/tls github.com/grafana/dskit/dns