Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distributor: report OTLP parse errors back to client #10588

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
* [BUGFIX] Distributor: Fix edge case at the HA-tracker with memberlist as KVStore, where when a replica in the KVStore is marked as deleted but not yet removed, it fails to update the KVStore. #10443
* [BUGFIX] Distributor: Fix panics in `DurationWithJitter` util functions when computed variance is zero. #10507
* [BUGFIX] Ingester: Fixed a race condition in the `PostingsForMatchers` cache that may have infrequently returned expired cached postings. #10500
* [BUGFIX] Distributor: Report partially converted OTLP requests with status 400 Bad Request. #10588

### Mixin

Expand Down
243 changes: 148 additions & 95 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"net/http"
"time"
Expand Down Expand Up @@ -71,7 +72,92 @@ func OTLPHandler(
) http.Handler {
discardedDueToOtelParseError := validation.DiscardedSamplesCounter(reg, otelParseError)

return otlpHandler(maxRecvMsgSize, requestBufferPool, sourceIPs, retryCfg, push, logger, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, buffers *util.RequestBuffers, req *mimirpb.PreallocWriteRequest, logger log.Logger) error {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
aknuds1 marked this conversation as resolved.
Show resolved Hide resolved
ctx := r.Context()
logger := utillog.WithContext(ctx, logger)
if sourceIPs != nil {
source := sourceIPs.Get(r)
if source != "" {
logger = utillog.WithSourceIPs(source, logger)
}
}

otlpConverter := newOTLPMimirConverter()

parser := newOTLPParser(limits, resourceAttributePromotionConfig, otlpConverter, enableStartTimeQuietZero, pushMetrics, discardedDueToOtelParseError)

supplier := func() (*mimirpb.WriteRequest, func(), error) {
rb := util.NewRequestBuffers(requestBufferPool)
var req mimirpb.PreallocWriteRequest
if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil {
// Check for httpgrpc error, default to client error if parsing failed
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok {
err = httpgrpc.Error(http.StatusBadRequest, err.Error())
}

rb.CleanUp()
return nil, nil, err
}

cleanup := func() {
mimirpb.ReuseSlice(req.Timeseries)
rb.CleanUp()
}
return &req.WriteRequest, cleanup, nil
}
req := newRequest(supplier)

pushErr := push(ctx, req)
if pushErr == nil {
// Push was successful, but OTLP converter left out some samples. We let the client know about it by replying with 4xx (and an insight log).
if otlpErr := otlpConverter.Err(); otlpErr != nil {
pushErr = httpgrpc.Error(http.StatusBadRequest, otlpErr.Error())
}
}
if pushErr != nil {
if errors.Is(pushErr, context.Canceled) {
level.Warn(logger).Log("msg", "push request canceled", "err", pushErr)
writeErrorToHTTPResponseBody(r.Context(), w, statusClientClosedRequest, codes.Canceled, "push request context canceled", logger)
return
}
var (
httpCode int
grpcCode codes.Code
errorMsg string
)
if st, ok := grpcutil.ErrorToStatus(pushErr); ok {
// This code is needed for a correct handling of errors returned by the supplier function.
// These errors are created by using the httpgrpc package.
httpCode = httpRetryableToOTLPRetryable(int(st.Code()))
grpcCode = st.Code()
errorMsg = st.Message()
} else {
grpcCode, httpCode = toOtlpGRPCHTTPStatus(pushErr)
errorMsg = pushErr.Error()
}
if httpCode != 202 {
// This error message is consistent with error message in Prometheus remote-write handler, and ingester's ingest-storage pushToStorage method.
msgs := []interface{}{"msg", "detected an error while ingesting OTLP metrics request (the request may have been partially ingested)", "httpCode", httpCode, "err", pushErr}
if httpCode/100 == 4 {
msgs = append(msgs, "insight", true)
}
level.Error(logger).Log(msgs...)
}
addHeaders(w, pushErr, r, httpCode, retryCfg)
writeErrorToHTTPResponseBody(r.Context(), w, httpCode, grpcCode, errorMsg, logger)
}
})
}

func newOTLPParser(
limits OTLPHandlerLimits,
resourceAttributePromotionConfig OTelResourceAttributePromotionConfig,
otlpConverter *otlpMimirConverter,
enableStartTimeQuietZero bool,
pushMetrics *PushMetrics,
discardedDueToOtelParseError *prometheus.CounterVec,
) parserFunc {
return func(ctx context.Context, r *http.Request, maxRecvMsgSize int, buffers *util.RequestBuffers, req *mimirpb.PreallocWriteRequest, logger log.Logger) error {
contentType := r.Header.Get("Content-Type")
contentEncoding := r.Header.Get("Content-Encoding")
var compression util.CompressionType
Expand Down Expand Up @@ -183,8 +269,10 @@ func OTLPHandler(
pushMetrics.IncOTLPRequest(tenantID)
pushMetrics.ObserveUncompressedBodySize(tenantID, float64(uncompressedBodySize))

var metrics []mimirpb.PreallocTimeseries
metrics, err = otelMetricsToTimeseries(ctx, tenantID, addSuffixes, enableCTZeroIngestion, enableStartTimeQuietZero, promoteResourceAttributes, keepIdentifyingResourceAttributes, discardedDueToOtelParseError, spanLogger, otlpReq.Metrics())
metrics, metricsDropped, err := otelMetricsToTimeseries(ctx, otlpConverter, addSuffixes, enableCTZeroIngestion, enableStartTimeQuietZero, promoteResourceAttributes, keepIdentifyingResourceAttributes, otlpReq.Metrics(), spanLogger)
if metricsDropped > 0 {
discardedDueToOtelParseError.WithLabelValues(tenantID, "").Add(float64(metricsDropped)) // "group" label is empty here as metrics couldn't be parsed
}
if err != nil {
return err
}
Expand All @@ -203,6 +291,7 @@ func OTLPHandler(
level.Debug(spanLogger).Log(
"msg", "OTLP to Prometheus conversion complete",
"metric_count", metricCount,
"metric_dropped", metricsDropped,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plural metrics are dropped:

Suggested change
"metric_dropped", metricsDropped,
"metrics_dropped", metricsDropped,

"sample_count", sampleCount,
"histogram_count", histogramCount,
"exemplar_count", exemplarCount,
Expand All @@ -213,80 +302,7 @@ func OTLPHandler(
req.Metadata = otelMetricsToMetadata(addSuffixes, otlpReq.Metrics())

return nil
})
}

func otlpHandler(
maxRecvMsgSize int,
requestBufferPool util.Pool,
sourceIPs *middleware.SourceIPExtractor,
retryCfg RetryConfig,
push PushFunc,
logger log.Logger,
parser parserFunc,
) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := utillog.WithContext(ctx, logger)
if sourceIPs != nil {
source := sourceIPs.Get(r)
if source != "" {
logger = utillog.WithSourceIPs(source, logger)
}
}
supplier := func() (*mimirpb.WriteRequest, func(), error) {
rb := util.NewRequestBuffers(requestBufferPool)
var req mimirpb.PreallocWriteRequest
if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil {
// Check for httpgrpc error, default to client error if parsing failed
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok {
err = httpgrpc.Error(http.StatusBadRequest, err.Error())
}

rb.CleanUp()
return nil, nil, err
}

cleanup := func() {
mimirpb.ReuseSlice(req.Timeseries)
rb.CleanUp()
}
return &req.WriteRequest, cleanup, nil
}
req := newRequest(supplier)
if err := push(ctx, req); err != nil {
if errors.Is(err, context.Canceled) {
level.Warn(logger).Log("msg", "push request canceled", "err", err)
writeErrorToHTTPResponseBody(r.Context(), w, statusClientClosedRequest, codes.Canceled, "push request context canceled", logger)
return
}
var (
httpCode int
grpcCode codes.Code
errorMsg string
)
if st, ok := grpcutil.ErrorToStatus(err); ok {
// This code is needed for a correct handling of errors returned by the supplier function.
// These errors are created by using the httpgrpc package.
httpCode = httpRetryableToOTLPRetryable(int(st.Code()))
grpcCode = st.Code()
errorMsg = st.Message()
} else {
grpcCode, httpCode = toOtlpGRPCHTTPStatus(err)
errorMsg = err.Error()
}
if httpCode != 202 {
// This error message is consistent with error message in Prometheus remote-write handler, and ingester's ingest-storage pushToStorage method.
msgs := []interface{}{"msg", "detected an error while ingesting OTLP metrics request (the request may have been partially ingested)", "httpCode", httpCode, "err", err}
if httpCode/100 == 4 {
msgs = append(msgs, "insight", true)
}
level.Error(logger).Log(msgs...)
}
addHeaders(w, err, r, httpCode, retryCfg)
writeErrorToHTTPResponseBody(r.Context(), w, httpCode, grpcCode, errorMsg, logger)
}
})
}
}

// toOtlpGRPCHTTPStatus is utilized by the OTLP endpoint.
Expand Down Expand Up @@ -414,33 +430,70 @@ func otelMetricsToMetadata(addSuffixes bool, md pmetric.Metrics) []*mimirpb.Metr
return metadata
}

func otelMetricsToTimeseries(ctx context.Context, tenantID string, addSuffixes, enableCTZeroIngestion, enableStartTimeQuietZero bool, promoteResourceAttributes []string, keepIdentifyingResourceAttributes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
converter := otlp.NewMimirConverter()
_, errs := converter.FromMetrics(ctx, md, otlp.Settings{
func otelMetricsToTimeseries(
ctx context.Context,
converter *otlpMimirConverter,
addSuffixes, enableCTZeroIngestion, enableStartTimeQuietZero bool,
promoteResourceAttributes []string,
keepIdentifyingResourceAttributes bool,
md pmetric.Metrics,
logger log.Logger,
) ([]mimirpb.PreallocTimeseries, int, error) {
settings := otlp.Settings{
AddMetricSuffixes: addSuffixes,
EnableCreatedTimestampZeroIngestion: enableCTZeroIngestion,
EnableStartTimeQuietZero: enableStartTimeQuietZero,
PromoteResourceAttributes: promoteResourceAttributes,
KeepIdentifyingResourceAttributes: keepIdentifyingResourceAttributes,
}, utillog.SlogFromGoKit(logger))
mimirTS := converter.TimeSeries()
if errs != nil {
dropped := len(multierr.Errors(errs))
discardedDueToOtelParseError.WithLabelValues(tenantID, "").Add(float64(dropped)) // Group is empty here as metrics couldn't be parsed

parseErrs := errs.Error()
if len(parseErrs) > maxErrMsgLen {
parseErrs = parseErrs[:maxErrMsgLen]
}
}
mimirTS := converter.ToTimeseries(ctx, md, settings, logger)

if len(mimirTS) == 0 {
return nil, errors.New(parseErrs)
}
dropped := converter.DroppedTotal()
if len(mimirTS) == 0 && dropped > 0 {
return nil, dropped, converter.Err()
}
return mimirTS, dropped, nil
}

level.Warn(logger).Log("msg", "OTLP parse error", "err", parseErrs)
type otlpMimirConverter struct {
converter *otlp.MimirConverter
// err holds OTLP parse errors
err error
}

func newOTLPMimirConverter() *otlpMimirConverter {
return &otlpMimirConverter{
converter: otlp.NewMimirConverter(),
}
}

return mimirTS, nil
func (c *otlpMimirConverter) ToTimeseries(ctx context.Context, md pmetric.Metrics, settings otlp.Settings, logger log.Logger) []mimirpb.PreallocTimeseries {
if c.err != nil {
return nil
}

_, c.err = c.converter.FromMetrics(ctx, md, settings, utillog.SlogFromGoKit(logger))

mimirTS := c.converter.TimeSeries()
return mimirTS
Comment on lines +476 to +478
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Nit] Easier to read:

Suggested change
mimirTS := c.converter.TimeSeries()
return mimirTS
return c.converter.TimeSeries()

}

func (c *otlpMimirConverter) DroppedTotal() int {
if c.err != nil {
return len(multierr.Errors(c.err))
}
return 0
}

func (c *otlpMimirConverter) Err() error {
if c.err != nil {
errMsg := c.err.Error()
if len(errMsg) > maxErrMsgLen {
errMsg = errMsg[:maxErrMsgLen]
}
return fmt.Errorf("otlp parse error: %s", errMsg)
}
return nil
}

// TimeseriesToOTLPRequest is used in tests.
Expand Down
27 changes: 15 additions & 12 deletions pkg/distributor/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"github.com/grafana/dskit/user"
"github.com/pierrec/lz4/v4"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
Expand All @@ -42,11 +40,6 @@ import (
)

func TestOTelMetricsToTimeSeries(t *testing.T) {
const tenantID = "testTenant"
discardedDueToOTelParseError := promauto.With(nil).NewCounterVec(prometheus.CounterOpts{
Name: "discarded_due_to_otel_parse_error",
Help: "Number of metrics discarded due to OTLP parse errors.",
}, []string{tenantID, "group"})
resourceAttrs := map[string]string{
"service.name": "service name",
"service.namespace": "service namespace",
Expand Down Expand Up @@ -282,11 +275,21 @@ func TestOTelMetricsToTimeSeries(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mimirTS, err := otelMetricsToTimeseries(
context.Background(), tenantID, true, false, false, tc.promoteResourceAttributes, tc.keepIdentifyingResourceAttributes, discardedDueToOTelParseError, log.NewNopLogger(), md,
converter := newOTLPMimirConverter()
mimirTS, dropped, err := otelMetricsToTimeseries(
context.Background(),
converter,
true,
false,
false,
tc.promoteResourceAttributes,
tc.keepIdentifyingResourceAttributes,
md,
log.NewNopLogger(),
)
require.NoError(t, err)
require.Len(t, mimirTS, 2)
require.Equal(t, 0, dropped)
var ts mimirpb.PreallocTimeseries
var targetInfo mimirpb.PreallocTimeseries
for i := range mimirTS {
Expand Down Expand Up @@ -832,7 +835,7 @@ func TestHandler_otlpDroppedMetricsPanic(t *testing.T) {
return nil
}, nil, nil, log.NewNopLogger())
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
assert.Equal(t, http.StatusBadRequest, resp.Code)
}

func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) {
Expand Down Expand Up @@ -878,7 +881,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) {
return nil
}, nil, nil, log.NewNopLogger())
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
assert.Equal(t, http.StatusBadRequest, resp.Code)

// Second case is to make sure that histogram metrics are counted correctly.
metric3 := resource1.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
Expand All @@ -904,7 +907,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) {
return nil
}, nil, nil, log.NewNopLogger())
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
assert.Equal(t, http.StatusBadRequest, resp.Code)
}

func TestHandler_otlpWriteRequestTooBigWithCompression(t *testing.T) {
Expand Down