diff --git a/pkg/distributor/otel.go b/pkg/distributor/otel.go index 79f02bb1bbe..6c110043a9e 100644 --- a/pkg/distributor/otel.go +++ b/pkg/distributor/otel.go @@ -319,14 +319,15 @@ func httpRetryableToOTLPRetryable(httpStatusCode int) int { // writeErrorToHTTPResponseBody converts the given error into a grpc status and marshals it into a byte slice, in order to be written to the response body. // See doc https://opentelemetry.io/docs/specs/otlp/#failures-1 func writeErrorToHTTPResponseBody(reqCtx context.Context, w http.ResponseWriter, httpCode int, grpcCode codes.Code, msg string, logger log.Logger) { + validUTF8Msg := validUTF8Message(msg) w.Header().Set("Content-Type", "application/octet-stream") w.Header().Set("X-Content-Type-Options", "nosniff") if server.IsHandledByHttpgrpcServer(reqCtx) { - w.Header().Set(server.ErrorMessageHeaderKey, msg) // If httpgrpc Server wants to convert this HTTP response into error, use this error message, instead of using response body. + w.Header().Set(server.ErrorMessageHeaderKey, validUTF8Msg) // If httpgrpc Server wants to convert this HTTP response into error, use this error message, instead of using response body. } w.WriteHeader(httpCode) - respBytes, err := proto.Marshal(status.New(grpcCode, msg).Proto()) + respBytes, err := proto.Marshal(status.New(grpcCode, validUTF8Msg).Proto()) if err != nil { level.Error(logger).Log("msg", "otlp response marshal failed", "err", err) writeResponseFailedBody, _ := proto.Marshal(status.New(codes.Internal, "failed to marshal OTLP response").Proto()) diff --git a/pkg/distributor/push.go b/pkg/distributor/push.go index 5f1cb8c0c72..812250eb2a3 100644 --- a/pkg/distributor/push.go +++ b/pkg/distributor/push.go @@ -221,7 +221,7 @@ func handler( level.Error(logger).Log(msgs...) } addHeaders(w, err, r, code, retryCfg) - http.Error(w, msg, code) + http.Error(w, validUTF8Message(msg), code) } }) } diff --git a/pkg/distributor/push_test.go b/pkg/distributor/push_test.go index 58e4704b2b8..000178a1319 100644 --- a/pkg/distributor/push_test.go +++ b/pkg/distributor/push_test.go @@ -1244,6 +1244,25 @@ func TestOTLPPushHandlerErrorsAreReportedCorrectlyViaHttpgrpc(t *testing.T) { expectedGrpcErrorMessage: "rpc error: code = Code(400) desc = ReadObjectCB: expect { or n, but found i, error found in #1 byte of ...|invalid|..., bigger context ...|invalid|...", }, + "invalid JSON with non-utf8 characters request returns 400": { + request: &httpgrpc.HTTPRequest{ + Method: "POST", + Headers: []*httpgrpc.Header{ + {Key: "Content-Type", Values: []string{"application/json"}}, + }, + Url: "/otlp", + Body: []byte("\n\xf6\x16\n\xd3\x02\n\x1d\n\x11container.runtime\x12\x08\n\x06docker\n'\n\x12container.h"), + }, + expectedResponse: &httpgrpc.HTTPResponse{Code: 400, + Headers: []*httpgrpc.Header{ + {Key: "Content-Type", Values: []string{"application/octet-stream"}}, + {Key: "X-Content-Type-Options", Values: []string{"nosniff"}}, + }, + Body: mustMarshalStatus(t, 400, "ReadObjectCB: expect { or n, but found \ufffd, error found in #2 byte of ...|\n\ufffd\u0016\n\ufffd\u0002\n\u001d\n\u0011co|..., bigger context ...|\n\ufffd\u0016\n\ufffd\u0002\n\u001d\n\u0011container.runtime\u0012\u0008\n\u0006docker\n'\n\u0012container.h|..."), + }, + expectedGrpcErrorMessage: "rpc error: code = Code(400) desc = ReadObjectCB: expect { or n, but found \ufffd, error found in #2 byte of ...|\n\ufffd\u0016\n\ufffd\u0002\n\u001d\n\u0011co|..., bigger context ...|\n\ufffd\u0016\n\ufffd\u0002\n\u001d\n\u0011container.runtime\u0012\u0008\n\u0006docker\n'\n\u0012container.h|...", + }, + "empty JSON is good request, with 200 status code": { request: &httpgrpc.HTTPRequest{ Method: "POST", diff --git a/pkg/distributor/validate.go b/pkg/distributor/validate.go index ab9426513ad..be4bb183026 100644 --- a/pkg/distributor/validate.go +++ b/pkg/distributor/validate.go @@ -430,7 +430,7 @@ func validateLabels(m *sampleValidationMetrics, cfg labelValidationConfig, userI return fmt.Errorf(labelNameTooLongMsgFormat, l.Name, mimirpb.FromLabelAdaptersToString(ls)) } else if !skipLabelValidation && !model.LabelValue(l.Value).IsValid() { m.invalidLabelValue.WithLabelValues(userID, group).Inc() - return fmt.Errorf(invalidLabelValueMsgFormat, l.Name, strings.ToValidUTF8(l.Value, ""), unsafeMetricName) + return fmt.Errorf(invalidLabelValueMsgFormat, l.Name, validUTF8Message(l.Value), unsafeMetricName) } else if len(l.Value) > maxLabelValueLength { m.labelValueTooLong.WithLabelValues(userID, group).Inc() return fmt.Errorf(labelValueTooLongMsgFormat, l.Name, l.Value, mimirpb.FromLabelAdaptersToString(ls)) @@ -512,3 +512,15 @@ func getMetricAndEllipsis(ls []mimirpb.LabelAdapter) (string, string) { } return metric, ellipsis } + +// validUTF8ErrMessage ensures that the given message contains only valid utf8 characters. +// The presence of non-utf8 characters in some errors might break some crucial parts of distributor's logic. +// For example, if httpgrpc.HttpServer.Handle() returns a httpgprc error containing a non-utf8 character, +// this error will not be propagated to httpgrpc.HttpClient as a htttpgrpc error, but as a generic error, +// which might break some of Mimir internal logic. +// This is because golang's proto.Marshal(), which is used by gRPC internally, fails when it marshals the +// httpgrpc error containing non-utf8 character produced by httpgrpc.HttpSeriver.Handle(), making the resulting +// error lose some important properties. +func validUTF8Message(msg string) string { + return strings.ToValidUTF8(msg, string(utf8.RuneError)) +} diff --git a/pkg/distributor/validate_test.go b/pkg/distributor/validate_test.go index df4de2dd60f..ffaf5bc9512 100644 --- a/pkg/distributor/validate_test.go +++ b/pkg/distributor/validate_test.go @@ -8,16 +8,22 @@ package distributor import ( "errors" "fmt" + "net/http" "strings" "testing" "time" "unicode/utf8" + "github.com/gogo/protobuf/proto" + "github.com/grafana/dskit/grpcutil" + "github.com/grafana/dskit/httpgrpc" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + grpcstatus "google.golang.org/grpc/status" + golangproto "google.golang.org/protobuf/proto" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/util/validation" @@ -212,7 +218,7 @@ func TestValidateLabels(t *testing.T) { skipLabelCountValidation: false, err: fmt.Errorf( invalidLabelValueMsgFormat, - "label1", "abcdef", "foo", + "label1", "abc\ufffddef", "foo", ), }, { @@ -671,3 +677,59 @@ func tooManyLabelsArgs(series []mimirpb.LabelAdapter, limit int) []any { return []any{len(series), limit, metric, ellipsis} } + +func TestValidUTF8Message(t *testing.T) { + testCases := map[string]struct { + body []byte + containsNonUTF8Characters bool + }{ + "valid message returns no error": { + body: []byte("valid message"), + containsNonUTF8Characters: false, + }, + "message containing only utf8 characters retruns no error": { + body: []byte("\n\ufffd\u0016\n\ufffd\u0002\n\u001D\n\u0011container.runtime\u0012\b\n\u0006docker\n'\n\u0012container.h"), + containsNonUTF8Characters: false, + }, + "message containing non-utf8 character returns an error": { + body: []byte("\n\xf6\x1a\n\xd3\x02\n\x1d\n\x11container.runtime\x12\x08\n\x06docker\n'\n\x12container.h"), + containsNonUTF8Characters: true, + }, + } + + for name, tc := range testCases { + for _, withValidation := range []bool{false, true} { + t.Run(fmt.Sprintf("%s withValidation: %v", name, withValidation), func(t *testing.T) { + msg := string(tc.body) + if withValidation { + msg = validUTF8Message(msg) + } + httpgrpcErr := httpgrpc.Error(http.StatusBadRequest, msg) + + // gogo's proto.Marshal() correctly processes both httpgrpc errors with and without non-utf8 characters. + st, ok := grpcutil.ErrorToStatus(httpgrpcErr) + require.True(t, ok) + stBytes, err := proto.Marshal(st.Proto()) + require.NoError(t, err) + require.NotNil(t, stBytes) + + grpcSt, ok := grpcstatus.FromError(httpgrpcErr) + require.True(t, ok) + stBytes, err = golangproto.Marshal(grpcSt.Proto()) + if withValidation { + // Ensure that errors with validated messages can always be correctly marshaled. + require.NoError(t, err) + require.NotNil(t, stBytes) + } else { + if tc.containsNonUTF8Characters { + // Ensure that errors with non-validated non-utf8 messages cannot be correctly marshaled. + require.Error(t, err) + } else { + require.NoError(t, err) + require.NotNil(t, stBytes) + } + } + }) + } + } +}