From 31d91bceb0e3e938a26f496d91c212eb89aa440f Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 6 Dec 2024 16:46:00 +0100 Subject: [PATCH] query-tee: add per-backend request headers (#10081) * Add tests Signed-off-by: Dimitar Dimitrov * Add headers to proxy Signed-off-by: Dimitar Dimitrov * Add CHANGELOG.md entry Signed-off-by: Dimitar Dimitrov * Use YAML config instead Signed-off-by: Dimitar Dimitrov * Add comment for why we allow choosing by index too Signed-off-by: Dimitar Dimitrov * Validate list of backends Signed-off-by: Dimitar Dimitrov * Add util function for BackendConfig Signed-off-by: Dimitar Dimitrov * Fix validation Signed-off-by: Dimitar Dimitrov --------- Signed-off-by: Dimitar Dimitrov --- CHANGELOG.md | 1 + tools/querytee/proxy.go | 72 ++++++++++++- tools/querytee/proxy_backend.go | 10 +- tools/querytee/proxy_backend_test.go | 6 +- tools/querytee/proxy_endpoint_test.go | 14 +-- tools/querytee/proxy_test.go | 150 ++++++++++++++++++++++++++ 6 files changed, 243 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa2620d560c..2f11b03ab15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -142,6 +142,7 @@ ### Query-tee * [FEATURE] Added `-proxy.compare-skip-samples-before` to skip samples before the given time when comparing responses. The time can be in RFC3339 format (or) RFC3339 without the timezone and seconds (or) date only. #9515 +* [FEATURE] Add `-backend.config-file` for a YAML configuration file for per-backend options. Currently, it only supports additional HTTP request headers. #10081 * [ENHANCEMENT] Added human-readable timestamps to comparison failure messages. #9665 ### Documentation diff --git a/tools/querytee/proxy.go b/tools/querytee/proxy.go index 3a4f52e21bb..6954835dab5 100644 --- a/tools/querytee/proxy.go +++ b/tools/querytee/proxy.go @@ -6,11 +6,13 @@ package querytee import ( + "encoding/json" "flag" "fmt" "net/http" "net/http/httputil" "net/url" + "os" "strconv" "strings" "sync" @@ -21,8 +23,10 @@ import ( "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/server" "github.com/grafana/dskit/spanlogger" + "github.com/grafana/regexp" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "gopkg.in/yaml.v3" ) type ProxyConfig struct { @@ -34,6 +38,8 @@ type ProxyConfig struct { BackendEndpoints string PreferredBackend string BackendReadTimeout time.Duration + BackendConfigFile string + parsedBackendConfig map[string]*BackendConfig CompareResponses bool LogSlowQueryResponseThreshold time.Duration ValueComparisonTolerance float64 @@ -47,6 +53,23 @@ type ProxyConfig struct { SecondaryBackendsRequestProportion float64 } +type BackendConfig struct { + RequestHeaders http.Header `json:"request_headers" yaml:"request_headers"` +} + +func exampleJSONBackendConfig() string { + cfg := BackendConfig{ + RequestHeaders: http.Header{ + "Cache-Control": {"no-store"}, + }, + } + jsonBytes, err := json.Marshal(cfg) + if err != nil { + panic("invalid example backend config" + err.Error()) + } + return string(jsonBytes) +} + func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.ServerHTTPServiceAddress, "server.http-service-address", "", "Bind address for server where query-tee service listens for HTTP requests.") f.IntVar(&cfg.ServerHTTPServicePort, "server.http-service-port", 80, "The HTTP port where the query-tee service listens for HTTP requests.") @@ -62,6 +85,7 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.BackendSkipTLSVerify, "backend.skip-tls-verify", false, "Skip TLS verification on backend targets.") f.StringVar(&cfg.PreferredBackend, "backend.preferred", "", "The hostname of the preferred backend when selecting the response to send back to the client. If no preferred backend is configured then the query-tee will send back to the client the first successful response received without waiting for other backends.") f.DurationVar(&cfg.BackendReadTimeout, "backend.read-timeout", 150*time.Second, "The timeout when reading the response from a backend.") + f.StringVar(&cfg.BackendConfigFile, "backend.config-file", "", "Path to a file with YAML or JSON configuration for each backend. Each key in the YAML/JSON document is a backend hostname. This is an example configuration value for a backend in JSON: "+exampleJSONBackendConfig()) f.BoolVar(&cfg.CompareResponses, "proxy.compare-responses", false, "Compare responses between preferred and secondary endpoints for supported routes.") f.DurationVar(&cfg.LogSlowQueryResponseThreshold, "proxy.log-slow-query-response-threshold", 10*time.Second, "The minimum difference in response time between slowest and fastest back-end over which to log the query. 0 to disable.") f.Float64Var(&cfg.ValueComparisonTolerance, "proxy.value-comparison-tolerance", 0.000001, "The tolerance to apply when comparing floating point values in the responses. 0 to disable tolerance and require exact match (not recommended).") @@ -119,6 +143,17 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro return nil, errors.New("preferred backend must be set when secondary backends request proportion is not 1") } + if len(cfg.BackendConfigFile) > 0 { + configBytes, err := os.ReadFile(cfg.BackendConfigFile) + if err != nil { + return nil, fmt.Errorf("failed to read backend config file (%s): %w", cfg.BackendConfigFile, err) + } + err = yaml.Unmarshal(configBytes, &cfg.parsedBackendConfig) + if err != nil { + return nil, fmt.Errorf("failed to parse backend YAML config: %w", err) + } + } + p := &Proxy{ cfg: cfg, logger: logger, @@ -153,7 +188,18 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro preferred = preferredIdx == idx } - p.backends = append(p.backends, NewProxyBackend(name, u, cfg.BackendReadTimeout, preferred, cfg.BackendSkipTLSVerify)) + backendCfg := cfg.parsedBackendConfig[name] + if backendCfg == nil { + // In tests, we have the same hostname for all backends, so we also + // support a numeric preferred backend which is the index in the list + // of backends. + backendCfg = cfg.parsedBackendConfig[strconv.Itoa(idx)] + if backendCfg == nil { + backendCfg = &BackendConfig{} + } + } + + p.backends = append(p.backends, NewProxyBackend(name, u, cfg.BackendReadTimeout, preferred, cfg.BackendSkipTLSVerify, *backendCfg)) } // At least 1 backend is required @@ -161,6 +207,11 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro return nil, errors.New("at least 1 backend is required") } + err := validateBackendConfig(p.backends, cfg.parsedBackendConfig) + if err != nil { + return nil, fmt.Errorf("validating external backend configs: %w", err) + } + // If the preferred backend is configured, then it must exist among the actual backends. if cfg.PreferredBackend != "" { exists := false @@ -188,6 +239,25 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro return p, nil } +func validateBackendConfig(backends []ProxyBackendInterface, config map[string]*BackendConfig) error { + // Tests need to pass the same hostname for all backends, so we also + // support a numeric preferred backend which is the index in the list of backend. + numericBackendNameRegex := regexp.MustCompile("^[0-9]+$") + for configuredBackend := range config { + backendExists := false + for _, actualBacked := range backends { + if actualBacked.Name() == configuredBackend { + backendExists = true + break + } + } + if !backendExists && !numericBackendNameRegex.MatchString(configuredBackend) { + return fmt.Errorf("configured backend %s does not exist in the list of actual backends", configuredBackend) + } + } + return nil +} + func (p *Proxy) Start() error { // Setup server first, so we can fail early if the ports are in use. serv, err := server.New(server.Config{ diff --git a/tools/querytee/proxy_backend.go b/tools/querytee/proxy_backend.go index 301c6b4cdc6..394a8828bd5 100644 --- a/tools/querytee/proxy_backend.go +++ b/tools/querytee/proxy_backend.go @@ -37,10 +37,11 @@ type ProxyBackend struct { // Whether this is the preferred backend from which picking up // the response and sending it back to the client. preferred bool + cfg BackendConfig } // NewProxyBackend makes a new ProxyBackend -func NewProxyBackend(name string, endpoint *url.URL, timeout time.Duration, preferred bool, skipTLSVerify bool) ProxyBackendInterface { +func NewProxyBackend(name string, endpoint *url.URL, timeout time.Duration, preferred bool, skipTLSVerify bool, cfg BackendConfig) ProxyBackendInterface { innerTransport := &http.Transport{ Proxy: http.ProxyFromEnvironment, TLSClientConfig: &tls.Config{ @@ -63,6 +64,7 @@ func NewProxyBackend(name string, endpoint *url.URL, timeout time.Duration, pref endpoint: endpoint, timeout: timeout, preferred: preferred, + cfg: cfg, client: &http.Client{ CheckRedirect: func(_ *http.Request, _ []*http.Request) error { return errors.New("the query-tee proxy does not follow redirects") @@ -139,6 +141,12 @@ func (b *ProxyBackend) createBackendRequest(ctx context.Context, orig *http.Requ // Remove Accept-Encoding header to avoid sending compressed responses req.Header.Del("Accept-Encoding") + for headerName, headerValues := range b.cfg.RequestHeaders { + for _, headerValue := range headerValues { + req.Header.Add(headerName, headerValue) + } + } + return req, nil } diff --git a/tools/querytee/proxy_backend_test.go b/tools/querytee/proxy_backend_test.go index 4a892286f8f..9d59a032d30 100644 --- a/tools/querytee/proxy_backend_test.go +++ b/tools/querytee/proxy_backend_test.go @@ -84,7 +84,7 @@ func Test_ProxyBackend_createBackendRequest_HTTPBasicAuthentication(t *testing.T orig.Header.Set("X-Scope-OrgID", testData.clientTenant) } - b := NewProxyBackend("test", u, time.Second, false, false) + b := NewProxyBackend("test", u, time.Second, false, false, defaultBackendConfig()) bp, ok := b.(*ProxyBackend) if !ok { t.Fatalf("Type assertion to *ProxyBackend failed") @@ -98,3 +98,7 @@ func Test_ProxyBackend_createBackendRequest_HTTPBasicAuthentication(t *testing.T }) } } + +func defaultBackendConfig() BackendConfig { + return BackendConfig{} +} diff --git a/tools/querytee/proxy_endpoint_test.go b/tools/querytee/proxy_endpoint_test.go index 2658f171d51..24108364f2a 100644 --- a/tools/querytee/proxy_endpoint_test.go +++ b/tools/querytee/proxy_endpoint_test.go @@ -37,9 +37,9 @@ func Test_ProxyEndpoint_waitBackendResponseForDownstream(t *testing.T) { backendURL3, err := url.Parse("http://backend-3/") require.NoError(t, err) - backendPref := NewProxyBackend("backend-1", backendURL1, time.Second, true, false) - backendOther1 := NewProxyBackend("backend-2", backendURL2, time.Second, false, false) - backendOther2 := NewProxyBackend("backend-3", backendURL3, time.Second, false, false) + backendPref := NewProxyBackend("backend-1", backendURL1, time.Second, true, false, defaultBackendConfig()) + backendOther1 := NewProxyBackend("backend-2", backendURL2, time.Second, false, false, defaultBackendConfig()) + backendOther2 := NewProxyBackend("backend-3", backendURL3, time.Second, false, false, defaultBackendConfig()) tests := map[string]struct { backends []ProxyBackendInterface @@ -157,8 +157,8 @@ func Test_ProxyEndpoint_Requests(t *testing.T) { require.NoError(t, err) backends := []ProxyBackendInterface{ - NewProxyBackend("backend-1", backendURL1, time.Second, true, false), - NewProxyBackend("backend-2", backendURL2, time.Second, false, false), + NewProxyBackend("backend-1", backendURL1, time.Second, true, false, defaultBackendConfig()), + NewProxyBackend("backend-2", backendURL2, time.Second, false, false, defaultBackendConfig()), } endpoint := NewProxyEndpoint(backends, testRoute, NewProxyMetrics(nil), log.NewNopLogger(), nil, 0, 1.0) @@ -325,8 +325,8 @@ func Test_ProxyEndpoint_Comparison(t *testing.T) { require.NoError(t, err) backends := []ProxyBackendInterface{ - NewProxyBackend("preferred-backend", preferredBackendURL, time.Second, true, false), - NewProxyBackend("secondary-backend", secondaryBackendURL, time.Second, false, false), + NewProxyBackend("preferred-backend", preferredBackendURL, time.Second, true, false, defaultBackendConfig()), + NewProxyBackend("secondary-backend", secondaryBackendURL, time.Second, false, false, defaultBackendConfig()), } logger := newMockLogger() diff --git a/tools/querytee/proxy_test.go b/tools/querytee/proxy_test.go index 85b04e121c8..948ebdf569b 100644 --- a/tools/querytee/proxy_test.go +++ b/tools/querytee/proxy_test.go @@ -11,6 +11,7 @@ import ( "io" "net/http" "net/http/httptest" + "os" "strconv" "strings" "testing" @@ -203,6 +204,7 @@ func Test_Proxy_RequestsForwarding(t *testing.T) { requestPath string requestMethod string backends []mockedBackend + backendConfig map[string]*BackendConfig preferredBackendIdx int expectedStatus int expectedRes string @@ -323,6 +325,29 @@ func Test_Proxy_RequestsForwarding(t *testing.T) { expectedStatus: 200, expectedRes: querySingleMetric1, }, + "adds request headers to specific backend": { + requestPath: "/api/v1/query", + requestMethod: http.MethodGet, + preferredBackendIdx: 0, + backendConfig: map[string]*BackendConfig{ + "0": { + RequestHeaders: map[string][]string{ + "X-Test-Header": {"test-value"}, + }, + }, + }, + backends: []mockedBackend{ + {handler: func(rw http.ResponseWriter, r *http.Request) { + assert.Equal(t, r.Header.Get("X-Test-Header"), "test-value") + rw.WriteHeader(http.StatusOK) + }}, + {handler: func(rw http.ResponseWriter, r *http.Request) { + assert.Empty(t, r.Header.Get("X-Test-Header")) + rw.WriteHeader(http.StatusOK) + }}, + }, + expectedStatus: http.StatusOK, + }, } for testName, testData := range tests { @@ -341,6 +366,7 @@ func Test_Proxy_RequestsForwarding(t *testing.T) { cfg := ProxyConfig{ BackendEndpoints: strings.Join(backendURLs, ","), PreferredBackend: strconv.Itoa(testData.preferredBackendIdx), + parsedBackendConfig: testData.backendConfig, ServerHTTPServiceAddress: "localhost", ServerHTTPServicePort: 0, ServerGRPCServiceAddress: "localhost", @@ -671,6 +697,130 @@ func TestProxyHTTPGRPC(t *testing.T) { }) } +func Test_NewProxy_BackendConfigPath(t *testing.T) { + // Helper to create a temporary file with content + createTempFile := func(t *testing.T, content string) string { + tmpfile, err := os.CreateTemp("", "backend-config-*.yaml") + require.NoError(t, err) + + defer tmpfile.Close() + + _, err = tmpfile.Write([]byte(content)) + require.NoError(t, err) + + return tmpfile.Name() + } + + tests := map[string]struct { + configContent string + createFile bool + expectedError string + expectedConfig map[string]*BackendConfig + }{ + "missing file": { + createFile: false, + expectedError: "failed to read backend config file (/nonexistent/path): open /nonexistent/path: no such file or directory", + }, + "empty file": { + createFile: true, + configContent: "", + expectedConfig: map[string]*BackendConfig(nil), + }, + "invalid YAML structure (not a map)": { + createFile: true, + configContent: "- item1\n- item2", + expectedError: "failed to parse backend YAML config:", + }, + "valid configuration": { + createFile: true, + configContent: ` + backend1: + request_headers: + X-Custom-Header: ["value1", "value2"] + Cache-Control: ["no-store"] + backend2: + request_headers: + Authorization: ["Bearer token123"] + `, + expectedConfig: map[string]*BackendConfig{ + "backend1": { + RequestHeaders: http.Header{ + "X-Custom-Header": {"value1", "value2"}, + "Cache-Control": {"no-store"}, + }, + }, + "backend2": { + RequestHeaders: http.Header{ + "Authorization": {"Bearer token123"}, + }, + }, + }, + }, + "configured backend which doesn't exist": { + createFile: true, + configContent: ` + backend1: + request_headers: + X-Custom-Header: ["value1", "value2"] + Cache-Control: ["no-store"] + backend2: + request_headers: + Authorization: ["Bearer token123"] + backend3: + request_headers: + Authorization: ["Bearer token123"] + `, + expectedError: "backend3 does not exist in the list of actual backends", + expectedConfig: map[string]*BackendConfig{ + "backend1": { + RequestHeaders: http.Header{ + "X-Custom-Header": {"value1", "value2"}, + "Cache-Control": {"no-store"}, + }, + }, + "backend2": { + RequestHeaders: http.Header{ + "Authorization": {"Bearer token123"}, + }, + }, + }, + }, + } + + for testName, testCase := range tests { + t.Run(testName, func(t *testing.T) { + // Base config that's valid except for the backend config path + cfg := ProxyConfig{ + BackendEndpoints: "http://backend1:9090,http://backend2:9090", + ServerHTTPServiceAddress: "localhost", + ServerHTTPServicePort: 0, + ServerGRPCServiceAddress: "localhost", + ServerGRPCServicePort: 0, + SecondaryBackendsRequestProportion: 1.0, + } + + if !testCase.createFile { + cfg.BackendConfigFile = "/nonexistent/path" + } else { + tmpPath := createTempFile(t, testCase.configContent) + cfg.BackendConfigFile = tmpPath + defer os.Remove(tmpPath) + } + + p, err := NewProxy(cfg, log.NewNopLogger(), testRoutes, nil) + + if testCase.expectedError != "" { + assert.ErrorContains(t, err, testCase.expectedError) + assert.Nil(t, p) + } else { + assert.NoError(t, err) + assert.NotNil(t, p) + assert.Equal(t, testCase.expectedConfig, p.cfg.parsedBackendConfig) + } + }) + } +} + func mockQueryResponse(path string, status int, res string) http.HandlerFunc { return mockQueryResponseWithExpectedBody(path, "", status, res) }