diff --git a/CHANGELOG.md b/CHANGELOG.md index ecd6b7c5b7..99d7884081 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ * [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747 * [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653 * [FEATURE] New `-.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247 #8270 #8276 #8277 #8291 #8303 #8340 #8256 #8348 #8422 #8430 #8455 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247 #8270 #8276 #8277 #8291 #8303 #8340 #8256 #8348 #8422 #8430 #8454 #8455 * [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739 * [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698 * [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123 diff --git a/pkg/api/error/error.go b/pkg/api/error/error.go index 6a6b4ca9d1..c30947b0c4 100644 --- a/pkg/api/error/error.go +++ b/pkg/api/error/error.go @@ -31,17 +31,17 @@ const ( TypeNotAcceptable Type = "not_acceptable" ) -type apiError struct { +type APIError struct { Type Type Message string } -func (e *apiError) Error() string { +func (e *APIError) Error() string { return e.Message } // adapted from https://github.com/prometheus/prometheus/blob/fdbc40a9efcc8197a94f23f0e479b0b56e52d424/web/api/v1/api.go#L1508-L1521 -func (e *apiError) statusCode() int { +func (e *APIError) StatusCode() int { switch e.Type { case TypeBadData: return http.StatusBadRequest @@ -67,30 +67,34 @@ func (e *apiError) statusCode() int { return http.StatusInternalServerError } -// HTTPResponseFromError converts an apiError into a JSON HTTP response -func HTTPResponseFromError(err error) (*httpgrpc.HTTPResponse, bool) { - var apiErr *apiError - if !errors.As(err, &apiErr) { - return nil, false - } - - body, err := json.Marshal( +func (e *APIError) EncodeJSON() ([]byte, error) { + return json.Marshal( struct { Status string `json:"status"` ErrorType Type `json:"errorType,omitempty"` Error string `json:"error,omitempty"` }{ Status: "error", - Error: apiErr.Message, - ErrorType: apiErr.Type, + Error: e.Message, + ErrorType: e.Type, }, ) +} + +// HTTPResponseFromError converts an APIError into a JSON HTTP response +func HTTPResponseFromError(err error) (*httpgrpc.HTTPResponse, bool) { + var apiErr *APIError + if !errors.As(err, &apiErr) { + return nil, false + } + + body, err := apiErr.EncodeJSON() if err != nil { return nil, false } return &httpgrpc.HTTPResponse{ - Code: int32(apiErr.statusCode()), + Code: int32(apiErr.StatusCode()), Body: body, Headers: []*httpgrpc.Header{ {Key: "Content-Type", Values: []string{"application/json"}}, @@ -99,29 +103,29 @@ func HTTPResponseFromError(err error) (*httpgrpc.HTTPResponse, bool) { } // New creates a new apiError with a static string message -func New(typ Type, msg string) error { - return &apiError{ +func New(typ Type, msg string) *APIError { + return &APIError{ Message: msg, Type: typ, } } // Newf creates a new apiError with a formatted message -func Newf(typ Type, tmpl string, args ...interface{}) error { +func Newf(typ Type, tmpl string, args ...interface{}) *APIError { return New(typ, fmt.Sprintf(tmpl, args...)) } // IsAPIError returns true if the error provided is an apiError. // This implies that HTTPResponseFromError will succeed. func IsAPIError(err error) bool { - apiErr := &apiError{} + apiErr := &APIError{} return errors.As(err, &apiErr) } // AddDetails adds details to an existing apiError, but keeps the type and handling. // If the error is not an apiError, it will wrap the error with the details. func AddDetails(err error, details string) error { - apiErr := &apiError{} + apiErr := &APIError{} if !errors.As(err, &apiErr) { return errors.Wrap(err, details) } @@ -131,7 +135,7 @@ func AddDetails(err error, details string) error { // IsNonRetryableAPIError returns true if err is an apiError which should be failed and not retried. func IsNonRetryableAPIError(err error) bool { - apiErr := &apiError{} + apiErr := &APIError{} // Reasoning: // TypeNone and TypeNotFound are not used anywhere in Mimir nor Prometheus; // TypeTimeout, TypeTooManyRequests, TypeNotAcceptable, TypeUnavailable we presume a retry of the same request will fail in the same way. diff --git a/pkg/api/error/error_test.go b/pkg/api/error/error_test.go index 0e0bd21779..dd86557e0a 100644 --- a/pkg/api/error/error_test.go +++ b/pkg/api/error/error_test.go @@ -17,15 +17,15 @@ func TestAllPrometheusErrorTypeValues(t *testing.T) { for _, prometheusErrorTypeString := range prometheusErrorTypeStrings { errorType := Type(prometheusErrorTypeString) - apiError := New(errorType, "").(*apiError) + apiError := New(errorType, "") if errorType == TypeUnavailable { - require.Equal(t, http.StatusServiceUnavailable, apiError.statusCode()) + require.Equal(t, http.StatusServiceUnavailable, apiError.StatusCode()) } else if errorType == TypeInternal || errorType == TypeNone { - require.Equal(t, http.StatusInternalServerError, apiError.statusCode()) + require.Equal(t, http.StatusInternalServerError, apiError.StatusCode()) } else { // If this assertion fails, it probably means a new error type has been added to Prometheus' API. - require.NotEqual(t, http.StatusInternalServerError, apiError.statusCode(), "unrecognised Prometheus error type constant '%s'", prometheusErrorTypeString) + require.NotEqual(t, http.StatusInternalServerError, apiError.StatusCode(), "unrecognised Prometheus error type constant '%s'", prometheusErrorTypeString) } } } diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index 4c7e1e81e9..ad5003168a 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -34,6 +34,7 @@ import ( "github.com/grafana/mimir/pkg/querier" querierapi "github.com/grafana/mimir/pkg/querier/api" "github.com/grafana/mimir/pkg/querier/stats" + "github.com/grafana/mimir/pkg/streamingpromql/compat" "github.com/grafana/mimir/pkg/usagestats" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/validation" @@ -285,7 +286,8 @@ func NewQuerierHandler( router := mux.NewRouter() routeInjector := middleware.RouteInjector{RouteMatcher: router} - router.Use(routeInjector.Wrap) + fallbackInjector := compat.EngineFallbackInjector{} + router.Use(routeInjector.Wrap, fallbackInjector.Wrap) // Use a separate metric for the querier in order to differentiate requests from the query-frontend when // running Mimir in monolithic mode. diff --git a/pkg/frontend/querymiddleware/codec.go b/pkg/frontend/querymiddleware/codec.go index 23818b3ab0..48bdc5c9fd 100644 --- a/pkg/frontend/querymiddleware/codec.go +++ b/pkg/frontend/querymiddleware/codec.go @@ -35,6 +35,7 @@ import ( apierror "github.com/grafana/mimir/pkg/api/error" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/querier/api" + "github.com/grafana/mimir/pkg/streamingpromql/compat" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -579,6 +580,15 @@ func (c prometheusCodec) EncodeMetricsQueryRequest(ctx context.Context, r Metric req.Header.Add(api.ReadConsistencyHeader, consistency) } + for _, h := range r.GetHeaders() { + if h.Name == compat.ForceFallbackHeaderName { + for _, v := range h.Values { + // There should only be one value, but add all of them for completeness. + req.Header.Add(compat.ForceFallbackHeaderName, v) + } + } + } + return req.WithContext(ctx), nil } diff --git a/pkg/streamingpromql/compat/fallback_engine.go b/pkg/streamingpromql/compat/fallback_engine.go index 7dda25583a..504aa31cf6 100644 --- a/pkg/streamingpromql/compat/fallback_engine.go +++ b/pkg/streamingpromql/compat/fallback_engine.go @@ -27,6 +27,8 @@ type EngineWithFallback struct { logger log.Logger } +const fallbackForcedByHTTPHeader = "fallback forced by HTTP header" + func NewEngineWithFallback(preferred, fallback promql.QueryEngine, reg prometheus.Registerer, logger log.Logger) promql.QueryEngine { return &EngineWithFallback{ preferred: preferred, @@ -46,43 +48,59 @@ func NewEngineWithFallback(preferred, fallback promql.QueryEngine, reg prometheu } func (e EngineWithFallback) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) { - query, err := e.preferred.NewInstantQuery(ctx, q, opts, qs, ts) + reason := "" - if err == nil { - e.supportedQueries.Inc() - return query, nil - } + if isForceFallbackEnabled(ctx) { + reason = fallbackForcedByHTTPHeader + } else { + query, err := e.preferred.NewInstantQuery(ctx, q, opts, qs, ts) + + if err == nil { + e.supportedQueries.Inc() + return query, nil + } + + notSupportedErr := NotSupportedError{} + if !errors.As(err, ¬SupportedErr) { + // Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported. + return nil, err + } - notSupportedErr := NotSupportedError{} - if !errors.As(err, ¬SupportedErr) { - // Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported. - return nil, err + reason = notSupportedErr.reason } logger := spanlogger.FromContext(ctx, e.logger) - level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", notSupportedErr.reason, "expr", qs) - e.unsupportedQueries.WithLabelValues(notSupportedErr.reason).Inc() + level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", reason, "expr", qs) + e.unsupportedQueries.WithLabelValues(reason).Inc() return e.fallback.NewInstantQuery(ctx, q, opts, qs, ts) } func (e EngineWithFallback) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) { - query, err := e.preferred.NewRangeQuery(ctx, q, opts, qs, start, end, interval) + reason := "" - if err == nil { - e.supportedQueries.Inc() - return query, nil - } + if isForceFallbackEnabled(ctx) { + reason = fallbackForcedByHTTPHeader + } else { + query, err := e.preferred.NewRangeQuery(ctx, q, opts, qs, start, end, interval) + + if err == nil { + e.supportedQueries.Inc() + return query, nil + } + + notSupportedErr := NotSupportedError{} + if !errors.As(err, ¬SupportedErr) { + // Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported. + return nil, err + } - notSupportedErr := NotSupportedError{} - if !errors.As(err, ¬SupportedErr) { - // Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported. - return nil, err + reason = notSupportedErr.reason } logger := spanlogger.FromContext(ctx, e.logger) - level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", notSupportedErr.reason, "expr", qs) - e.unsupportedQueries.WithLabelValues(notSupportedErr.reason).Inc() + level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", reason, "expr", qs) + e.unsupportedQueries.WithLabelValues(reason).Inc() return e.fallback.NewRangeQuery(ctx, q, opts, qs, start, end, interval) } diff --git a/pkg/streamingpromql/compat/fallback_engine_test.go b/pkg/streamingpromql/compat/fallback_engine_test.go index 35c7889f36..91c4cfb85a 100644 --- a/pkg/streamingpromql/compat/fallback_engine_test.go +++ b/pkg/streamingpromql/compat/fallback_engine_test.go @@ -20,14 +20,13 @@ import ( ) func TestEngineWithFallback(t *testing.T) { - ctx := context.Background() logger := log.NewNopLogger() - generators := map[string]func(engine promql.QueryEngine, expr string) (promql.Query, error){ - "instant query": func(engine promql.QueryEngine, expr string) (promql.Query, error) { + generators := map[string]func(ctx context.Context, engine promql.QueryEngine, expr string) (promql.Query, error){ + "instant query": func(ctx context.Context, engine promql.QueryEngine, expr string) (promql.Query, error) { return engine.NewInstantQuery(ctx, nil, nil, expr, time.Now()) }, - "range query": func(engine promql.QueryEngine, expr string) (promql.Query, error) { + "range query": func(ctx context.Context, engine promql.QueryEngine, expr string) (promql.Query, error) { return engine.NewRangeQuery(ctx, nil, nil, expr, time.Now(), time.Now().Add(-time.Minute), time.Second) }, } @@ -35,12 +34,13 @@ func TestEngineWithFallback(t *testing.T) { for name, createQuery := range generators { t.Run(name, func(t *testing.T) { t.Run("should not fall back for supported expressions", func(t *testing.T) { + ctx := context.Background() reg := prometheus.NewPedanticRegistry() preferredEngine := newFakeEngineThatSupportsLimitedQueries() fallbackEngine := newFakeEngineThatSupportsAllQueries() engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger) - query, err := createQuery(engineWithFallback, "a_supported_expression") + query, err := createQuery(ctx, engineWithFallback, "a_supported_expression") require.NoError(t, err) require.Equal(t, preferredEngine.query, query, "should return query from preferred engine") require.False(t, fallbackEngine.wasCalled, "should not call fallback engine if expression is supported by preferred engine") @@ -53,12 +53,13 @@ func TestEngineWithFallback(t *testing.T) { }) t.Run("should fall back for unsupported expressions", func(t *testing.T) { + ctx := context.Background() reg := prometheus.NewPedanticRegistry() preferredEngine := newFakeEngineThatSupportsLimitedQueries() fallbackEngine := newFakeEngineThatSupportsAllQueries() engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger) - query, err := createQuery(engineWithFallback, "a_non_supported_expression") + query, err := createQuery(ctx, engineWithFallback, "a_non_supported_expression") require.NoError(t, err) require.Equal(t, fallbackEngine.query, query, "should return query from fallback engine if expression is not supported by preferred engine") @@ -73,15 +74,37 @@ func TestEngineWithFallback(t *testing.T) { }) t.Run("should not fall back if creating query fails for another reason", func(t *testing.T) { + ctx := context.Background() reg := prometheus.NewPedanticRegistry() preferredEngine := newFakeEngineThatSupportsLimitedQueries() fallbackEngine := newFakeEngineThatSupportsAllQueries() engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger) - _, err := createQuery(engineWithFallback, "an_invalid_expression") + _, err := createQuery(ctx, engineWithFallback, "an_invalid_expression") require.EqualError(t, err, "the query is invalid") require.False(t, fallbackEngine.wasCalled, "should not call fallback engine if creating query fails for another reason") }) + + t.Run("should fall back if falling back has been explicitly requested, even if the expression is supported", func(t *testing.T) { + ctx := withForceFallbackEnabled(context.Background()) + reg := prometheus.NewPedanticRegistry() + preferredEngine := newFakeEngineThatSupportsLimitedQueries() + fallbackEngine := newFakeEngineThatSupportsAllQueries() + engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger) + + query, err := createQuery(ctx, engineWithFallback, "a_supported_expression") + require.NoError(t, err) + require.Equal(t, fallbackEngine.query, query, "should return query from fallback engine if expression is not supported by preferred engine") + + require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_mimir_query_engine_supported_queries_total Total number of queries that were supported by the Mimir query engine. + # TYPE cortex_mimir_query_engine_supported_queries_total counter + cortex_mimir_query_engine_supported_queries_total 0 + # HELP cortex_mimir_query_engine_unsupported_queries_total Total number of queries that were not supported by the Mimir query engine and so fell back to Prometheus' engine. + # TYPE cortex_mimir_query_engine_unsupported_queries_total counter + cortex_mimir_query_engine_unsupported_queries_total{reason="fallback forced by HTTP header"} 1 + `), "cortex_mimir_query_engine_supported_queries_total", "cortex_mimir_query_engine_unsupported_queries_total")) + }) }) } } diff --git a/pkg/streamingpromql/compat/fallback_header.go b/pkg/streamingpromql/compat/fallback_header.go new file mode 100644 index 0000000000..dba8e412c3 --- /dev/null +++ b/pkg/streamingpromql/compat/fallback_header.go @@ -0,0 +1,48 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package compat + +import ( + "context" + "net/http" + + apierror "github.com/grafana/mimir/pkg/api/error" +) + +type engineFallbackContextKey int + +const forceFallbackEnabledContextKey = engineFallbackContextKey(0) +const ForceFallbackHeaderName = "X-Mimir-Force-Prometheus-Engine" + +type EngineFallbackInjector struct{} + +func (i EngineFallbackInjector) Wrap(handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if value := r.Header.Get(ForceFallbackHeaderName); value != "" { + if value != "true" { + // Send a Prometheus API-style JSON error response. + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + e := apierror.Newf(apierror.TypeBadData, "invalid value '%s' for '%s' header, must be exactly 'true' or not set", value, ForceFallbackHeaderName) + + if body, err := e.EncodeJSON(); err == nil { + _, _ = w.Write(body) + } + + return + } + + r = r.WithContext(withForceFallbackEnabled(r.Context())) + } + + handler.ServeHTTP(w, r) + }) +} + +func withForceFallbackEnabled(ctx context.Context) context.Context { + return context.WithValue(ctx, forceFallbackEnabledContextKey, true) +} + +func isForceFallbackEnabled(ctx context.Context) bool { + return ctx.Value(forceFallbackEnabledContextKey) != nil +} diff --git a/pkg/streamingpromql/compat/fallback_header_test.go b/pkg/streamingpromql/compat/fallback_header_test.go new file mode 100644 index 0000000000..5c5fdb9714 --- /dev/null +++ b/pkg/streamingpromql/compat/fallback_header_test.go @@ -0,0 +1,75 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package compat + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEngineFallbackInjector(t *testing.T) { + testCases := map[string]struct { + headers http.Header + + expectFallback bool + expectedError string + }{ + "no headers": { + headers: http.Header{}, + expectFallback: false, + }, + "unrelated header": { + headers: http.Header{ + "Content-Type": []string{"application/blah"}, + }, + expectFallback: false, + }, + "force fallback header is present, but does not have expected value": { + headers: http.Header{ + "X-Mimir-Force-Prometheus-Engine": []string{"blah"}, + }, + expectedError: "invalid value 'blah' for 'X-Mimir-Force-Prometheus-Engine' header, must be exactly 'true' or not set", + }, + "force fallback header is present, and does have expected value": { + headers: http.Header{ + "X-Mimir-Force-Prometheus-Engine": []string{"true"}, + }, + expectFallback: true, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + injector := EngineFallbackInjector{} + handlerCalled := false + handler := injector.Wrap(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + handlerCalled = true + require.Equal(t, testCase.expectFallback, isForceFallbackEnabled(req.Context())) + w.WriteHeader(http.StatusOK) + })) + + req, err := http.NewRequest(http.MethodGet, "/blah", nil) + require.NoError(t, err) + req.Header = testCase.headers + + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + + if testCase.expectedError == "" { + require.True(t, handlerCalled) + require.Equal(t, http.StatusOK, resp.Code) + } else { + require.False(t, handlerCalled) + require.Equal(t, http.StatusBadRequest, resp.Code) + require.Equal(t, "application/json", resp.Header().Get("Content-Type")) + + body := resp.Body.String() + expectedBody := `{"status": "error", "errorType": "bad_data", "error": "` + testCase.expectedError + `"}` + require.JSONEq(t, expectedBody, body) + } + }) + } +}