Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add -server.cluster flag #10504

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ lint: check-makefiles
faillint -paths \
"github.com/twmb/franz-go/pkg/kgo.{AllowAutoTopicCreation}" \
./pkg/... ./cmd/... ./tools/... ./integration/...
# We need to ensure that when creating http-grpc requests, the X-Cluster header is included.
faillint -paths "github.com/grafana/dskit/httpgrpc.{FromHTTPRequest}=github.com/grafana/dskit/httpgrpc.FromHTTPRequestWithCluster" \
./pkg/... ./cmd/... ./tools/... ./integration/...

format: ## Run gofmt and goimports.
find . $(DONT_FIND) -name '*.pb.go' -prune -o -type f -name '*.go' -exec gofmt -w -s {} \;
Expand Down
12 changes: 11 additions & 1 deletion cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,16 @@
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "cluster",
"required": false,
"desc": "Optionally define the server's cluster, and enable validation that requests are for the same cluster.",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldFlag": "server.cluster",
"fieldType": "string"
},
{
"kind": "field",
"name": "tls_cipher_suites",
Expand Down Expand Up @@ -409,7 +419,7 @@
"kind": "field",
"name": "register_instrumentation",
"required": false,
"desc": "Register the intrumentation handlers (/metrics etc).",
"desc": "Register the instrumentation handlers (/metrics etc).",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "server.register-instrumentation",
Expand Down
4 changes: 3 additions & 1 deletion cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -3091,6 +3091,8 @@ Usage of ./cmd/mimir/mimir:
Comma separated list of yaml files with the configuration that can be updated at runtime. Runtime config files will be merged from left to right.
-runtime-config.reload-period duration
How often to check runtime config files. (default 10s)
-server.cluster string
Optionally define the server's cluster, and enable validation that requests are for the same cluster.
-server.graceful-shutdown-timeout duration
Timeout for graceful shutdowns (default 30s)
-server.grpc-conn-limit int
Expand Down Expand Up @@ -3176,7 +3178,7 @@ Usage of ./cmd/mimir/mimir:
-server.proxy-protocol-enabled
[experimental] Enables PROXY protocol.
-server.register-instrumentation
Register the intrumentation handlers (/metrics etc). (default true)
Register the instrumentation handlers (/metrics etc). (default true)
-server.report-grpc-codes-in-instrumentation-label-enabled
If set to true, gRPC statuses will be reported in instrumentation labels with their string representations. Otherwise, they will be reported as "error". (default true)
-server.throughput.latency-cutoff duration
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,8 @@ Usage of ./cmd/mimir/mimir:
The tenant's shard size when sharding is used by ruler. Value of 0 disables shuffle sharding for the tenant, and tenant rules will be sharded across all ruler replicas.
-runtime-config.file comma-separated-list-of-strings
Comma separated list of yaml files with the configuration that can be updated at runtime. Runtime config files will be merged from left to right.
-server.cluster string
Optionally define the server's cluster, and enable validation that requests are for the same cluster.
-server.grpc-listen-address string
gRPC server listen address.
-server.grpc-listen-port int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ datasources:
exemplarTraceIdDestinations:
- name: traceID
datasourceUid: jaeger
httpHeaderName1: 'X-Cluster'
secureJsonData:
httpHeaderValue1: 'development'

- name: Prometheus
type: prometheus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,5 @@ prometheus:

remote_write:
- url: http://distributor-1:8000/api/v1/push
headers:
'X-Cluster': 'development'
3 changes: 3 additions & 0 deletions development/mimir-microservices-mode/config/mimir.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
server:
cluster: development

multitenancy_enabled: false

distributor:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ remote_write:
- url: http://distributor-2:8001/api/v1/push
send_native_histograms: true
send_exemplars: true
headers:
'X-Cluster': 'development'
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ remote_write:
- url: http://distributor-2:8001/api/v1/push
send_native_histograms: true
send_exemplars: true
headers:
'X-Cluster': 'development'
2 changes: 2 additions & 0 deletions development/mimir-microservices-mode/config/prometheus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ remote_write:
- url: http://distributor-1:8000/api/v1/push
send_native_histograms: true
send_exemplars: true
headers:
'X-Cluster': 'development'

rule_files:
- '/etc/mixin/mimir-alerts.yaml'
Expand Down
2 changes: 1 addition & 1 deletion development/mimir-microservices-mode/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@
"command":
- "sh"
- "-c"
- "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=continuous-test -server.http-listen-port=8090 -server.grpc-listen-port=9090 -activity-tracker.filepath=/activity/continuous-test-8090 -tests.run-interval=2m -tests.read-endpoint=http://query-frontend:8007/prometheus -tests.tenant-id=mimir-continuous-test -tests.write-endpoint=http://distributor-1:8000 -tests.write-read-series-test.max-query-age=1h -tests.write-read-series-test.num-series=100 -memberlist.nodename=continuous-test -memberlist.bind-port=10090 -ingester.ring.store=memberlist -distributor.ring.store=memberlist -compactor.ring.store=memberlist -store-gateway.sharding-ring.store=memberlist -ruler.ring.store=memberlist -alertmanager.sharding-ring.store=memberlist -blocks-storage.bucket-store.index-cache.backend=memcached -blocks-storage.bucket-store.chunks-cache.backend=memcached -blocks-storage.bucket-store.metadata-cache.backend=memcached -query-frontend.results-cache.backend=memcached -ruler-storage.cache.backend=memcached -blocks-storage.bucket-store.index-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.chunks-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.metadata-cache.memcached.addresses=dns+memcached:11211 -query-frontend.results-cache.memcached.addresses=dns+memcached:11211 -ruler-storage.cache.memcached.addresses=dns+memcached:11211"
- "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=continuous-test -server.cluster=development -server.http-listen-port=8090 -server.grpc-listen-port=9090 -activity-tracker.filepath=/activity/continuous-test-8090 -tests.run-interval=2m -tests.read-endpoint=http://query-frontend:8007/prometheus -tests.tenant-id=mimir-continuous-test -tests.write-endpoint=http://distributor-1:8000 -tests.write-read-series-test.max-query-age=1h -tests.write-read-series-test.num-series=100 -memberlist.nodename=continuous-test -memberlist.bind-port=10090 -ingester.ring.store=memberlist -distributor.ring.store=memberlist -compactor.ring.store=memberlist -store-gateway.sharding-ring.store=memberlist -ruler.ring.store=memberlist -alertmanager.sharding-ring.store=memberlist -blocks-storage.bucket-store.index-cache.backend=memcached -blocks-storage.bucket-store.chunks-cache.backend=memcached -blocks-storage.bucket-store.metadata-cache.backend=memcached -query-frontend.results-cache.backend=memcached -ruler-storage.cache.backend=memcached -blocks-storage.bucket-store.index-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.chunks-cache.memcached.addresses=dns+memcached:11211 -blocks-storage.bucket-store.metadata-cache.memcached.addresses=dns+memcached:11211 -query-frontend.results-cache.memcached.addresses=dns+memcached:11211 -ruler-storage.cache.memcached.addresses=dns+memcached:11211"
"depends_on":
- "minio"
- "distributor-1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ datasources:
exemplarTraceIdDestinations:
- name: traceID
datasourceUid: jaeger
httpHeaderName1: 'X-Cluster'
secureJsonData:
httpHeaderValue1: 'development'
- name: Jaeger
type: jaeger
access: proxy
uid: jaeger
orgID: 1
url: http://jaeger:16686/
url: http://jaeger:16686/
2 changes: 2 additions & 0 deletions development/mimir-monolithic-mode/config/grafana-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ prometheus:

remote_write:
- url: http://mimir-1:8001/api/v1/push
headers:
'X-Cluster': 'development'
3 changes: 3 additions & 0 deletions development/mimir-monolithic-mode/config/mimir.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
multitenancy_enabled: false

server:
cluster: development

distributor:
pool:
health_check_ingesters: true
Expand Down
2 changes: 2 additions & 0 deletions development/mimir-monolithic-mode/config/prometheus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ scrape_configs:
remote_write:
- url: http://mimir-1:8001/api/v1/push
send_native_histograms: true
headers:
'X-Cluster': 'development'
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,11 @@ The `server` block configures the HTTP and gRPC server of the launched service(s
# CLI flag: -server.proxy-protocol-enabled
[proxy_protocol_enabled: <boolean> | default = false]

# Optionally define the server's cluster, and enable validation that requests
# are for the same cluster.
# CLI flag: -server.cluster
[cluster: <string> | default = ""]

# Comma-separated list of cipher suites to use. If blank, the default Go cipher
# suites is used.
# CLI flag: -server.tls-cipher-suites
Expand Down Expand Up @@ -617,7 +622,7 @@ grpc_tls_config:
# CLI flag: -server.grpc-tls-ca-path
[client_ca_file: <string> | default = ""]

# (advanced) Register the intrumentation handlers (/metrics etc).
# (advanced) Register the instrumentation handlers (/metrics etc).
# CLI flag: -server.register-instrumentation
[register_instrumentation: <boolean> | default = true]

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/golang/snappy v0.0.4
github.com/google/gopacket v1.1.19
github.com/gorilla/mux v1.8.1
github.com/grafana/dskit v0.0.0-20250128193928-104df19e2080
github.com/grafana/dskit v0.0.0-20250129115452-2f8c95cf1702
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/influxdata/influxdb/v2 v2.7.11
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1271,8 +1271,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4=
github.com/grafana/alerting v0.0.0-20250113170557-b4ab2ba363a8 h1:mdI6P22PgFD7bQ0Yf4h8cfHSldak4nxogvlsTHZyZmc=
github.com/grafana/alerting v0.0.0-20250113170557-b4ab2ba363a8/go.mod h1:QsnoKX/iYZxA4Cv+H+wC7uxutBD8qi8ZW5UJvD2TYmU=
github.com/grafana/dskit v0.0.0-20250128193928-104df19e2080 h1:IHRsAMdemxPu9g9zPxTFcU3hLhfd5cl6W4fqRovAzkU=
github.com/grafana/dskit v0.0.0-20250128193928-104df19e2080/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ=
github.com/grafana/dskit v0.0.0-20250129115452-2f8c95cf1702 h1:A3R5RHZvKMoRs26luWg8IB9vfd85WgTB/I14l9lcSII=
github.com/grafana/dskit v0.0.0-20250129115452-2f8c95cf1702/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI=
github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937 h1:fwwnG/NcygoS6XbAaEyK2QzMXI/BZIEJvQ3CD+7XZm8=
Expand Down
6 changes: 4 additions & 2 deletions pkg/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type CombinedFrontendConfig struct {
QueryMiddleware querymiddleware.Config `yaml:",inline"`

DownstreamURL string `yaml:"downstream_url" category:"advanced"`

Cluster string `yaml:"-"`
}

func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
Expand Down Expand Up @@ -88,14 +90,14 @@ func InitFrontend(
}

fr, err := v2.NewFrontend(cfg.FrontendV2, v2Limits, log, reg, codec)
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), nil, fr, err
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr, cfg.Cluster), nil, fr, err

default:
// No scheduler = use original frontend.
fr, err := v1.New(cfg.FrontendV1, v1Limits, log, reg)
if err != nil {
return nil, nil, nil, err
}
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), fr, nil, nil
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr, cfg.Cluster), fr, nil, nil
}
}
10 changes: 7 additions & 3 deletions pkg/frontend/transport/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ type GrpcRoundTripper interface {
RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, io.ReadCloser, error)
}

func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper {
return &grpcRoundTripperAdapter{roundTripper: r}
func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper, cluster string) http.RoundTripper {
return &grpcRoundTripperAdapter{
roundTripper: r,
cluster: cluster,
}
}

// This adapter wraps GrpcRoundTripper and converted it into http.RoundTripper
type grpcRoundTripperAdapter struct {
roundTripper GrpcRoundTripper
cluster string
}

type buffer struct {
Expand All @@ -39,7 +43,7 @@ func (b *buffer) Bytes() []byte {
}

func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, error) {
req, err := httpgrpc.FromHTTPRequest(r)
req, err := httpgrpc.FromHTTPRequestWithCluster(r, a.cluster)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a
handlerCfg := transport.HandlerConfig{QueryStatsEnabled: true}
flagext.DefaultValues(&handlerCfg)

rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(v1)
rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(v1, "")
r := mux.NewRouter()
r.PathPrefix("/").Handler(middleware.Merge(
middleware.AuthenticateUser,
Expand Down
6 changes: 3 additions & 3 deletions pkg/frontend/v2/frontend_scheduler_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestExtractAdditionalQueueDimensions(t *testing.T) {
reqs := []*http.Request{rangeHTTPReq, labelValuesHTTPReq}

for _, req := range reqs {
httpgrpcReq, err := httpgrpc.FromHTTPRequest(req)
httpgrpcReq, err := httpgrpc.FromHTTPRequestWithCluster(req, "")
require.NoError(t, err)

additionalQueueDimensions, err := adapter.extractAdditionalQueueDimensions(
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestExtractAdditionalQueueDimensions(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "tenant-0")

instantHTTPReq := makeInstantHTTPRequest(ctx, testData.time)
httpgrpcReq, err := httpgrpc.FromHTTPRequest(instantHTTPReq)
httpgrpcReq, err := httpgrpc.FromHTTPRequestWithCluster(instantHTTPReq, "")
require.NoError(t, err)

additionalQueueDimensions, err := adapter.extractAdditionalQueueDimensions(
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestQueryDecoding(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "tenant-0")

labelValuesHTTPReq := makeLabelValuesHTTPRequest(ctx, testData.start, testData.end)
httpgrpcReq, err := httpgrpc.FromHTTPRequest(labelValuesHTTPReq)
httpgrpcReq, err := httpgrpc.FromHTTPRequestWithCluster(labelValuesHTTPReq, "")
require.NoError(t, err)

additionalQueueDimensions, err := adapter.extractAdditionalQueueDimensions(
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func TestFrontendStreamingResponse(t *testing.T) {
})

req := httptest.NewRequest("GET", "/api/v1/cardinality/active_series?selector=metric", nil)
rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(f)
rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(f, "")

resp, err := rt.RoundTrip(req.WithContext(user.InjectOrgID(context.Background(), userID)))
require.NoError(t, err)
Expand Down
6 changes: 5 additions & 1 deletion pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/clusterutil"
"github.com/grafana/dskit/dns"
httpgrpc_server "github.com/grafana/dskit/httpgrpc/server"
"github.com/grafana/dskit/kv"
Expand Down Expand Up @@ -718,7 +719,9 @@ func (t *Mimir) initFlusher() (serv services.Service, err error) {
// initQueryFrontendCodec initializes query frontend codec.
// NOTE: Grafana Enterprise Metrics depends on this.
func (t *Mimir) initQueryFrontendCodec() (services.Service, error) {
t.QueryFrontendCodec = querymiddleware.NewPrometheusCodec(t.Registerer, t.Cfg.Querier.EngineConfig.LookbackDelta, t.Cfg.Frontend.QueryMiddleware.QueryResultResponseFormat, t.Cfg.Frontend.QueryMiddleware.ExtraPropagateHeaders)
// Always pass through the cluster header.
propagateHeaders := append([]string{clusterutil.ClusterHeader}, t.Cfg.Frontend.QueryMiddleware.ExtraPropagateHeaders...)
t.QueryFrontendCodec = querymiddleware.NewPrometheusCodec(t.Registerer, t.Cfg.Querier.EngineConfig.LookbackDelta, t.Cfg.Frontend.QueryMiddleware.QueryResultResponseFormat, propagateHeaders)
return nil, nil
}

Expand Down Expand Up @@ -783,6 +786,7 @@ func (t *Mimir) initQueryFrontendTripperware() (serv services.Service, err error
}

func (t *Mimir) initQueryFrontend() (serv services.Service, err error) {
t.Cfg.Frontend.Cluster = t.Cfg.Server.Cluster
t.Cfg.Frontend.FrontendV2.QuerySchedulerDiscovery = t.Cfg.QueryScheduler.ServiceDiscovery
t.Cfg.Frontend.FrontendV2.LookBackDelta = t.Cfg.Querier.EngineConfig.LookbackDelta
t.Cfg.Frontend.FrontendV2.QueryStoreAfter = t.Cfg.Querier.QueryStoreAfter
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 17 additions & 1 deletion vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading