Skip to content

Commit e088f13

Browse files
fix(otlp): Write protobuf status on error (#15097)
(cherry picked from commit 63a2442)
1 parent 10b7478 commit e088f13

File tree

5 files changed

+115
-62
lines changed

5 files changed

+115
-62
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ require (
364364
golang.org/x/tools v0.23.0 // indirect
365365
google.golang.org/genproto v0.0.0-20241113202542-65e8d215514f // indirect
366366
google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect
367-
google.golang.org/genproto/googleapis/rpc v0.0.0-20241113202542-65e8d215514f // indirect
367+
google.golang.org/genproto/googleapis/rpc v0.0.0-20241113202542-65e8d215514f
368368
gopkg.in/fsnotify/fsnotify.v1 v1.4.7 // indirect
369369
gopkg.in/inf.v0 v0.9.1 // indirect
370370
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect

pkg/distributor/http.go

+7-26
Original file line numberDiff line numberDiff line change
@@ -19,38 +19,19 @@ import (
1919

2020
// PushHandler reads a snappy-compressed proto from the HTTP body.
2121
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
22-
d.pushHandler(w, r, push.ParseLokiRequest)
22+
d.pushHandler(w, r, push.ParseLokiRequest, push.HTTPError)
2323
}
2424

2525
func (d *Distributor) OTLPPushHandler(w http.ResponseWriter, r *http.Request) {
26-
interceptor := newOtelErrorHeaderInterceptor(w)
27-
d.pushHandler(interceptor, r, push.ParseOTLPRequest)
26+
d.pushHandler(w, r, push.ParseOTLPRequest, push.OTLPError)
2827
}
2928

30-
// otelErrorHeaderInterceptor maps 500 errors to 503.
31-
// According to the OTLP specification, 500 errors are never retried on the client side, but 503 are.
32-
type otelErrorHeaderInterceptor struct {
33-
http.ResponseWriter
34-
}
35-
36-
func newOtelErrorHeaderInterceptor(w http.ResponseWriter) *otelErrorHeaderInterceptor {
37-
return &otelErrorHeaderInterceptor{ResponseWriter: w}
38-
}
39-
40-
func (i *otelErrorHeaderInterceptor) WriteHeader(statusCode int) {
41-
if statusCode == http.StatusInternalServerError {
42-
statusCode = http.StatusServiceUnavailable
43-
}
44-
45-
i.ResponseWriter.WriteHeader(statusCode)
46-
}
47-
48-
func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRequestParser push.RequestParser) {
29+
func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRequestParser push.RequestParser, errorWriter push.ErrorWriter) {
4930
logger := util_log.WithContext(r.Context(), util_log.Logger)
5031
tenantID, err := tenant.TenantID(r.Context())
5132
if err != nil {
5233
level.Error(logger).Log("msg", "error getting tenant id", "err", err)
53-
http.Error(w, err.Error(), http.StatusBadRequest)
34+
errorWriter(w, err.Error(), http.StatusBadRequest, logger)
5435
return
5536
}
5637

@@ -70,7 +51,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
7051
}
7152
d.writeFailuresManager.Log(tenantID, fmt.Errorf("couldn't parse push request: %w", err))
7253

73-
http.Error(w, err.Error(), http.StatusBadRequest)
54+
errorWriter(w, err.Error(), http.StatusBadRequest, logger)
7455
return
7556
}
7657

@@ -106,7 +87,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
10687
"err", body,
10788
)
10889
}
109-
http.Error(w, body, int(resp.Code))
90+
errorWriter(w, body, int(resp.Code), logger)
11091
} else {
11192
if d.tenantConfigs.LogPushRequest(tenantID) {
11293
level.Debug(logger).Log(
@@ -115,7 +96,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
11596
"err", err.Error(),
11697
)
11798
}
118-
http.Error(w, err.Error(), http.StatusInternalServerError)
99+
errorWriter(w, err.Error(), http.StatusInternalServerError, logger)
119100
}
120101
}
121102

pkg/distributor/http_test.go

+1-33
Original file line numberDiff line numberDiff line change
@@ -78,43 +78,11 @@ func TestRequestParserWrapping(t *testing.T) {
7878
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "fake-path", nil)
7979
require.NoError(t, err)
8080

81-
distributors[0].pushHandler(httptest.NewRecorder(), req, stubParser)
81+
distributors[0].pushHandler(httptest.NewRecorder(), req, stubParser, push.HTTPError)
8282

8383
require.True(t, called)
8484
}
8585

86-
func Test_OtelErrorHeaderInterceptor(t *testing.T) {
87-
for _, tc := range []struct {
88-
name string
89-
inputCode int
90-
expectedCode int
91-
}{
92-
{
93-
name: "500",
94-
inputCode: http.StatusInternalServerError,
95-
expectedCode: http.StatusServiceUnavailable,
96-
},
97-
{
98-
name: "400",
99-
inputCode: http.StatusBadRequest,
100-
expectedCode: http.StatusBadRequest,
101-
},
102-
{
103-
name: "204",
104-
inputCode: http.StatusNoContent,
105-
expectedCode: http.StatusNoContent,
106-
},
107-
} {
108-
t.Run(tc.name, func(t *testing.T) {
109-
r := httptest.NewRecorder()
110-
i := newOtelErrorHeaderInterceptor(r)
111-
112-
http.Error(i, "error", tc.inputCode)
113-
require.Equal(t, tc.expectedCode, r.Code)
114-
})
115-
}
116-
}
117-
11886
func stubParser(
11987
_ string,
12088
_ *http.Request,

pkg/loghttp/push/otlp_test.go

+42
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"encoding/base64"
66
"fmt"
7+
"net/http"
8+
"net/http/httptest"
79
"testing"
810
"time"
911

@@ -13,6 +15,8 @@ import (
1315
"github.com/stretchr/testify/require"
1416
"go.opentelemetry.io/collector/pdata/pcommon"
1517
"go.opentelemetry.io/collector/pdata/plog"
18+
"google.golang.org/genproto/googleapis/rpc/status"
19+
"google.golang.org/protobuf/proto"
1620

1721
"github.com/grafana/loki/pkg/push"
1822

@@ -733,3 +737,41 @@ type fakeRetention struct{}
733737
func (f fakeRetention) RetentionPeriodFor(_ string, _ labels.Labels) time.Duration {
734738
return time.Hour
735739
}
740+
741+
func TestOtlpError(t *testing.T) {
742+
for _, tc := range []struct {
743+
name string
744+
msg string
745+
inCode int
746+
expectedCode int
747+
}{
748+
{
749+
name: "500 error maps 503",
750+
msg: "test error 500 to 503",
751+
inCode: http.StatusInternalServerError,
752+
expectedCode: http.StatusServiceUnavailable,
753+
},
754+
{
755+
name: "other error",
756+
msg: "test error",
757+
inCode: http.StatusForbidden,
758+
expectedCode: http.StatusForbidden,
759+
},
760+
} {
761+
t.Run(tc.name, func(t *testing.T) {
762+
logger := log.NewNopLogger()
763+
764+
r := httptest.NewRecorder()
765+
OTLPError(r, tc.msg, tc.inCode, logger)
766+
767+
require.Equal(t, tc.expectedCode, r.Code)
768+
require.Equal(t, "application/octet-stream", r.Header().Get("Content-Type"))
769+
770+
respStatus := &status.Status{}
771+
require.NoError(t, proto.Unmarshal(r.Body.Bytes(), respStatus))
772+
773+
require.Equal(t, tc.msg, respStatus.Message)
774+
require.EqualValues(t, 0, respStatus.Code)
775+
})
776+
}
777+
}

pkg/loghttp/push/push.go

+64-2
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,19 @@ import (
1616

1717
"github.com/dustin/go-humanize"
1818
"github.com/go-kit/log"
19+
"github.com/gogo/protobuf/proto"
1920
"github.com/prometheus/client_golang/prometheus"
2021
"github.com/prometheus/client_golang/prometheus/promauto"
2122
"github.com/prometheus/prometheus/model/labels"
22-
23-
loki_util "github.com/grafana/loki/v3/pkg/util"
23+
"google.golang.org/grpc/codes"
24+
grpcstatus "google.golang.org/grpc/status"
2425

2526
"github.com/grafana/loki/v3/pkg/analytics"
2627
"github.com/grafana/loki/v3/pkg/loghttp"
2728
"github.com/grafana/loki/v3/pkg/logproto"
2829
"github.com/grafana/loki/v3/pkg/logql/syntax"
2930
"github.com/grafana/loki/v3/pkg/util"
31+
loki_util "github.com/grafana/loki/v3/pkg/util"
3032
"github.com/grafana/loki/v3/pkg/util/constants"
3133
"github.com/grafana/loki/v3/pkg/util/unmarshal"
3234
unmarshal2 "github.com/grafana/loki/v3/pkg/util/unmarshal/legacy"
@@ -86,6 +88,7 @@ func (EmptyLimits) DiscoverServiceName(string) []string {
8688
type (
8789
RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error)
8890
RequestParserWrapper func(inner RequestParser) RequestParser
91+
ErrorWriter func(w http.ResponseWriter, error string, code int, logger log.Logger)
8992
)
9093

9194
type Stats struct {
@@ -307,3 +310,62 @@ func RetentionPeriodToString(retentionPeriod time.Duration) string {
307310
}
308311
return retentionHours
309312
}
313+
314+
// OTLPError writes an OTLP-compliant error response to the given http.ResponseWriter.
315+
//
316+
// According to the OTLP spec: https://opentelemetry.io/docs/specs/otlp/#failures-1
317+
// Re. the error response format
318+
// > If the processing of the request fails, the server MUST respond with appropriate HTTP 4xx or HTTP 5xx status code.
319+
// > The response body for all HTTP 4xx and HTTP 5xx responses MUST be a Protobuf-encoded Status message that describes the problem.
320+
// > This specification does not use Status.code field and the server MAY omit Status.code field.
321+
// > The clients are not expected to alter their behavior based on Status.code field but MAY record it for troubleshooting purposes.
322+
// > The Status.message field SHOULD contain a developer-facing error message as defined in Status message schema.
323+
//
324+
// Re. retryable errors
325+
// > The requests that receive a response status code listed in following table SHOULD be retried.
326+
// > All other 4xx or 5xx response status codes MUST NOT be retried
327+
// > 429 Too Many Requests
328+
// > 502 Bad Gateway
329+
// > 503 Service Unavailable
330+
// > 504 Gateway Timeout
331+
// In loki, we expect clients to retry on 500 errors, so we map 500 errors to 503.
332+
func OTLPError(w http.ResponseWriter, error string, code int, logger log.Logger) {
333+
// Map 500 errors to 503. 500 errors are never retried on the client side, but 503 are.
334+
if code == http.StatusInternalServerError {
335+
code = http.StatusServiceUnavailable
336+
}
337+
338+
// As per the OTLP spec, we send the status code on the http header.
339+
w.WriteHeader(code)
340+
341+
// Status 0 because we omit the Status.code field.
342+
status := grpcstatus.New(0, error).Proto()
343+
respBytes, err := proto.Marshal(status)
344+
if err != nil {
345+
level.Error(logger).Log("msg", "failed to marshal error response", "error", err)
346+
writeResponseFailedBody, _ := proto.Marshal(grpcstatus.New(
347+
codes.Internal,
348+
fmt.Sprintf("failed to marshal error response: %s", err.Error()),
349+
).Proto())
350+
_, _ = w.Write(writeResponseFailedBody)
351+
return
352+
}
353+
354+
w.Header().Set(contentType, "application/octet-stream")
355+
if _, err = w.Write(respBytes); err != nil {
356+
level.Error(logger).Log("msg", "failed to write error response", "error", err)
357+
writeResponseFailedBody, _ := proto.Marshal(grpcstatus.New(
358+
codes.Internal,
359+
fmt.Sprintf("failed write error: %s", err.Error()),
360+
).Proto())
361+
_, _ = w.Write(writeResponseFailedBody)
362+
}
363+
}
364+
365+
var _ ErrorWriter = OTLPError
366+
367+
func HTTPError(w http.ResponseWriter, error string, code int, _ log.Logger) {
368+
http.Error(w, error, code)
369+
}
370+
371+
var _ ErrorWriter = HTTPError

0 commit comments

Comments
 (0)