Skip to content

Commit

Permalink
Pass cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Jan 24, 2025
1 parent b8ca00e commit d9cb890
Show file tree
Hide file tree
Showing 13 changed files with 57 additions and 21 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ lint: check-makefiles
faillint -paths \
"github.com/twmb/franz-go/pkg/kgo.{AllowAutoTopicCreation}" \
./pkg/... ./cmd/... ./tools/... ./integration/...
# We need to ensure that when creating http-grpc requests, the X-Cluster header is included.
faillint -paths "github.com/grafana/dskit/httpgrpc.{FromHTTPRequest}=github.com/grafana/dskit/httpgrpc.FromHTTPRequestWithCluster" \
./pkg/... ./cmd/... ./tools/... ./integration/...

format: ## Run gofmt and goimports.
find . $(DONT_FIND) -name '*.pb.go' -prune -o -type f -name '*.go' -exec gofmt -w -s {} \;
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/golang/snappy v0.0.4
github.com/google/gopacket v1.1.19
github.com/gorilla/mux v1.8.1
github.com/grafana/dskit v0.0.0-20250123101449-feb230ca9dc2
github.com/grafana/dskit v0.0.0-20250124130032-aff6c876915b
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/influxdata/influxdb/v2 v2.7.11
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1271,8 +1271,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4=
github.com/grafana/alerting v0.0.0-20250113170557-b4ab2ba363a8 h1:mdI6P22PgFD7bQ0Yf4h8cfHSldak4nxogvlsTHZyZmc=
github.com/grafana/alerting v0.0.0-20250113170557-b4ab2ba363a8/go.mod h1:QsnoKX/iYZxA4Cv+H+wC7uxutBD8qi8ZW5UJvD2TYmU=
github.com/grafana/dskit v0.0.0-20250123101449-feb230ca9dc2 h1:2ZN2dTx3NDEvREKr5UH6qaAtI+g6wkItol9TzZMGhcQ=
github.com/grafana/dskit v0.0.0-20250123101449-feb230ca9dc2/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ=
github.com/grafana/dskit v0.0.0-20250124130032-aff6c876915b h1:y34FUoHHxGMbsX8nsGdCdJGfazKlGjf/7QhvgFMn0HA=
github.com/grafana/dskit v0.0.0-20250124130032-aff6c876915b/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI=
github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937 h1:fwwnG/NcygoS6XbAaEyK2QzMXI/BZIEJvQ3CD+7XZm8=
Expand Down
7 changes: 2 additions & 5 deletions pkg/api/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/clusterutil"
"github.com/grafana/dskit/middleware"
)

// ClusterValidationMiddleware validates that requests are for the correct cluster.
func ClusterValidationMiddleware(cluster string, logger log.Logger) middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
reqCluster := r.Header.Get(clusterHeader)
reqCluster := r.Header.Get(clusterutil.ClusterHeader)
if reqCluster != cluster {
level.Warn(logger).Log("msg", "rejecting request intended for wrong cluster", "cluster", cluster, "request_cluster", reqCluster)
http.Error(w, fmt.Sprintf("request intended for cluster %q - this is cluster %q", reqCluster, cluster),
Expand All @@ -25,7 +26,3 @@ func ClusterValidationMiddleware(cluster string, logger log.Logger) middleware.I
})
})
}

const (
clusterHeader = "X-Cluster"
)
9 changes: 7 additions & 2 deletions pkg/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type CombinedFrontendConfig struct {
QueryMiddleware querymiddleware.Config `yaml:",inline"`

DownstreamURL string `yaml:"downstream_url" category:"advanced"`

Cluster string `yaml:"-"`
}

func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
Expand Down Expand Up @@ -66,6 +68,9 @@ func InitFrontend(
reg prometheus.Registerer,
codec querymiddleware.Codec,
) (http.RoundTripper, *v1.Frontend, *v2.Frontend, error) {
if cfg.Cluster == "" {
panic("cluster not defined")
}
switch {
case cfg.DownstreamURL != "":
// If the user has specified a downstream Prometheus, then we should use that.
Expand All @@ -88,14 +93,14 @@ func InitFrontend(
}

fr, err := v2.NewFrontend(cfg.FrontendV2, v2Limits, log, reg, codec)
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), nil, fr, err
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr, cfg.Cluster), nil, fr, err

default:
// No scheduler = use original frontend.
fr, err := v1.New(cfg.FrontendV1, v1Limits, log, reg)
if err != nil {
return nil, nil, nil, err
}
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), fr, nil, nil
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr, cfg.Cluster), fr, nil, nil
}
}
10 changes: 7 additions & 3 deletions pkg/frontend/transport/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ type GrpcRoundTripper interface {
RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, io.ReadCloser, error)
}

func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper {
return &grpcRoundTripperAdapter{roundTripper: r}
func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper, cluster string) http.RoundTripper {
return &grpcRoundTripperAdapter{
roundTripper: r,
cluster: cluster,
}
}

// This adapter wraps GrpcRoundTripper and converted it into http.RoundTripper
type grpcRoundTripperAdapter struct {
roundTripper GrpcRoundTripper
cluster string
}

type buffer struct {
Expand All @@ -39,7 +43,7 @@ func (b *buffer) Bytes() []byte {
}

func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, error) {
req, err := httpgrpc.FromHTTPRequest(r)
req, err := httpgrpc.FromHTTPRequestWithCluster(r, a.cluster)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a
handlerCfg := transport.HandlerConfig{QueryStatsEnabled: true}
flagext.DefaultValues(&handlerCfg)

rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(v1)
rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(v1, "")
r := mux.NewRouter()
r.PathPrefix("/").Handler(middleware.Merge(
middleware.AuthenticateUser,
Expand Down
6 changes: 3 additions & 3 deletions pkg/frontend/v2/frontend_scheduler_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestExtractAdditionalQueueDimensions(t *testing.T) {
reqs := []*http.Request{rangeHTTPReq, labelValuesHTTPReq}

for _, req := range reqs {
httpgrpcReq, err := httpgrpc.FromHTTPRequest(req)
httpgrpcReq, err := httpgrpc.FromHTTPRequestWithCluster(req, "")
require.NoError(t, err)

additionalQueueDimensions, err := adapter.extractAdditionalQueueDimensions(
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestExtractAdditionalQueueDimensions(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "tenant-0")

instantHTTPReq := makeInstantHTTPRequest(ctx, testData.time)
httpgrpcReq, err := httpgrpc.FromHTTPRequest(instantHTTPReq)
httpgrpcReq, err := httpgrpc.FromHTTPRequestWithCluster(instantHTTPReq, "")
require.NoError(t, err)

additionalQueueDimensions, err := adapter.extractAdditionalQueueDimensions(
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestQueryDecoding(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "tenant-0")

labelValuesHTTPReq := makeLabelValuesHTTPRequest(ctx, testData.start, testData.end)
httpgrpcReq, err := httpgrpc.FromHTTPRequest(labelValuesHTTPReq)
httpgrpcReq, err := httpgrpc.FromHTTPRequestWithCluster(labelValuesHTTPReq, "")
require.NoError(t, err)

additionalQueueDimensions, err := adapter.extractAdditionalQueueDimensions(
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func TestFrontendStreamingResponse(t *testing.T) {
})

req := httptest.NewRequest("GET", "/api/v1/cardinality/active_series?selector=metric", nil)
rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(f)
rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(f, "")

resp, err := rt.RoundTrip(req.WithContext(user.InjectOrgID(context.Background(), userID)))
require.NoError(t, err)
Expand Down
6 changes: 5 additions & 1 deletion pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/clusterutil"
"github.com/grafana/dskit/dns"
httpgrpc_server "github.com/grafana/dskit/httpgrpc/server"
"github.com/grafana/dskit/kv"
Expand Down Expand Up @@ -718,7 +719,9 @@ func (t *Mimir) initFlusher() (serv services.Service, err error) {
// initQueryFrontendCodec initializes query frontend codec.
// NOTE: Grafana Enterprise Metrics depends on this.
func (t *Mimir) initQueryFrontendCodec() (services.Service, error) {
t.QueryFrontendCodec = querymiddleware.NewPrometheusCodec(t.Registerer, t.Cfg.Querier.EngineConfig.LookbackDelta, t.Cfg.Frontend.QueryMiddleware.QueryResultResponseFormat, t.Cfg.Frontend.QueryMiddleware.ExtraPropagateHeaders)
// Always pass through the cluster header.
propagateHeaders := append([]string{clusterutil.ClusterHeader}, t.Cfg.Frontend.QueryMiddleware.ExtraPropagateHeaders...)
t.QueryFrontendCodec = querymiddleware.NewPrometheusCodec(t.Registerer, t.Cfg.Querier.EngineConfig.LookbackDelta, t.Cfg.Frontend.QueryMiddleware.QueryResultResponseFormat, propagateHeaders)
return nil, nil
}

Expand Down Expand Up @@ -783,6 +786,7 @@ func (t *Mimir) initQueryFrontendTripperware() (serv services.Service, err error
}

func (t *Mimir) initQueryFrontend() (serv services.Service, err error) {
t.Cfg.Frontend.Cluster = t.Cfg.Server.Cluster
t.Cfg.Frontend.FrontendV2.QuerySchedulerDiscovery = t.Cfg.QueryScheduler.ServiceDiscovery
t.Cfg.Frontend.FrontendV2.LookBackDelta = t.Cfg.Querier.EngineConfig.LookbackDelta
t.Cfg.Frontend.FrontendV2.QueryStoreAfter = t.Cfg.Querier.QueryStoreAfter
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 17 additions & 1 deletion vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion vendor/modules.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d9cb890

Please sign in to comment.