Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Backport 15097 to k229 #15122

Merged
merged 3 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ require (
golang.org/x/tools v0.23.0 // indirect
google.golang.org/genproto v0.0.0-20241104194629-dd2ea8efbc28 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241113202542-65e8d215514f
gopkg.in/fsnotify/fsnotify.v1 v1.4.7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3654,8 +3654,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc/go.
google.golang.org/genproto/googleapis/rpc v0.0.0-20230629202037-9506855d4529/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230706204954-ccb25ca9f130/go.mod h1:8mL13HKkDa+IuJ8yruA3ci0q+0vsUz4m//+ottjwS5o=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 h1:XVhgTWWV3kGQlwJHR3upFWZeTsei6Oks1apkZSeonIE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241113202542-65e8d215514f h1:C1QccEa9kUwvMgEUORqQD9S17QesQijxjZ84sO82mfo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241113202542-65e8d215514f/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI=
google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
Expand Down
33 changes: 7 additions & 26 deletions pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,19 @@ import (

// PushHandler reads a snappy-compressed proto from the HTTP body.
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
d.pushHandler(w, r, push.ParseLokiRequest)
d.pushHandler(w, r, push.ParseLokiRequest, push.HTTPError)
}

func (d *Distributor) OTLPPushHandler(w http.ResponseWriter, r *http.Request) {
interceptor := newOtelErrorHeaderInterceptor(w)
d.pushHandler(interceptor, r, push.ParseOTLPRequest)
d.pushHandler(w, r, push.ParseOTLPRequest, push.OTLPError)
}

// otelErrorHeaderInterceptor maps 500 errors to 503.
// According to the OTLP specification, 500 errors are never retried on the client side, but 503 are.
type otelErrorHeaderInterceptor struct {
http.ResponseWriter
}

func newOtelErrorHeaderInterceptor(w http.ResponseWriter) *otelErrorHeaderInterceptor {
return &otelErrorHeaderInterceptor{ResponseWriter: w}
}

func (i *otelErrorHeaderInterceptor) WriteHeader(statusCode int) {
if statusCode == http.StatusInternalServerError {
statusCode = http.StatusServiceUnavailable
}

i.ResponseWriter.WriteHeader(statusCode)
}

func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRequestParser push.RequestParser) {
func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRequestParser push.RequestParser, errorWriter push.ErrorWriter) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
tenantID, err := tenant.TenantID(r.Context())
if err != nil {
level.Error(logger).Log("msg", "error getting tenant id", "err", err)
http.Error(w, err.Error(), http.StatusBadRequest)
errorWriter(w, err.Error(), http.StatusBadRequest, logger)
return
}

Expand All @@ -70,7 +51,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
}
d.writeFailuresManager.Log(tenantID, fmt.Errorf("couldn't parse push request: %w", err))

http.Error(w, err.Error(), http.StatusBadRequest)
errorWriter(w, err.Error(), http.StatusBadRequest, logger)
return
}

Expand Down Expand Up @@ -106,7 +87,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
"err", body,
)
}
http.Error(w, body, int(resp.Code))
errorWriter(w, body, int(resp.Code), logger)
} else {
if d.tenantConfigs.LogPushRequest(tenantID) {
level.Debug(logger).Log(
Expand All @@ -115,7 +96,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
"err", err.Error(),
)
}
http.Error(w, err.Error(), http.StatusInternalServerError)
errorWriter(w, err.Error(), http.StatusInternalServerError, logger)
}
}

Expand Down
34 changes: 1 addition & 33 deletions pkg/distributor/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,43 +78,11 @@ func TestRequestParserWrapping(t *testing.T) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "fake-path", nil)
require.NoError(t, err)

distributors[0].pushHandler(httptest.NewRecorder(), req, stubParser)
distributors[0].pushHandler(httptest.NewRecorder(), req, stubParser, push.HTTPError)

require.True(t, called)
}

func Test_OtelErrorHeaderInterceptor(t *testing.T) {
for _, tc := range []struct {
name string
inputCode int
expectedCode int
}{
{
name: "500",
inputCode: http.StatusInternalServerError,
expectedCode: http.StatusServiceUnavailable,
},
{
name: "400",
inputCode: http.StatusBadRequest,
expectedCode: http.StatusBadRequest,
},
{
name: "204",
inputCode: http.StatusNoContent,
expectedCode: http.StatusNoContent,
},
} {
t.Run(tc.name, func(t *testing.T) {
r := httptest.NewRecorder()
i := newOtelErrorHeaderInterceptor(r)

http.Error(i, "error", tc.inputCode)
require.Equal(t, tc.expectedCode, r.Code)
})
}
}

func stubParser(
_ string,
_ *http.Request,
Expand Down
42 changes: 42 additions & 0 deletions pkg/loghttp/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"encoding/base64"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"

Expand All @@ -13,6 +15,8 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/protobuf/proto"

"github.com/grafana/loki/pkg/push"

Expand Down Expand Up @@ -733,3 +737,41 @@ type fakeRetention struct{}
func (f fakeRetention) RetentionPeriodFor(_ string, _ labels.Labels) time.Duration {
return time.Hour
}

func TestOtlpError(t *testing.T) {
for _, tc := range []struct {
name string
msg string
inCode int
expectedCode int
}{
{
name: "500 error maps 503",
msg: "test error 500 to 503",
inCode: http.StatusInternalServerError,
expectedCode: http.StatusServiceUnavailable,
},
{
name: "other error",
msg: "test error",
inCode: http.StatusForbidden,
expectedCode: http.StatusForbidden,
},
} {
t.Run(tc.name, func(t *testing.T) {
logger := log.NewNopLogger()

r := httptest.NewRecorder()
OTLPError(r, tc.msg, tc.inCode, logger)

require.Equal(t, tc.expectedCode, r.Code)
require.Equal(t, "application/octet-stream", r.Header().Get("Content-Type"))

respStatus := &status.Status{}
require.NoError(t, proto.Unmarshal(r.Body.Bytes(), respStatus))

require.Equal(t, tc.msg, respStatus.Message)
require.EqualValues(t, 0, respStatus.Code)
})
}
}
66 changes: 64 additions & 2 deletions pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@ import (

"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"

loki_util "github.com/grafana/loki/v3/pkg/util"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"

"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/loghttp"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/util"
loki_util "github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/unmarshal"
unmarshal2 "github.com/grafana/loki/v3/pkg/util/unmarshal/legacy"
Expand Down Expand Up @@ -86,6 +88,7 @@ func (EmptyLimits) DiscoverServiceName(string) []string {
type (
RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error)
RequestParserWrapper func(inner RequestParser) RequestParser
ErrorWriter func(w http.ResponseWriter, error string, code int, logger log.Logger)
)

type Stats struct {
Expand Down Expand Up @@ -307,3 +310,62 @@ func RetentionPeriodToString(retentionPeriod time.Duration) string {
}
return retentionHours
}

// OTLPError writes an OTLP-compliant error response to the given http.ResponseWriter.
//
// According to the OTLP spec: https://opentelemetry.io/docs/specs/otlp/#failures-1
// Re. the error response format
// > If the processing of the request fails, the server MUST respond with appropriate HTTP 4xx or HTTP 5xx status code.
// > The response body for all HTTP 4xx and HTTP 5xx responses MUST be a Protobuf-encoded Status message that describes the problem.
// > This specification does not use Status.code field and the server MAY omit Status.code field.
// > The clients are not expected to alter their behavior based on Status.code field but MAY record it for troubleshooting purposes.
// > The Status.message field SHOULD contain a developer-facing error message as defined in Status message schema.
//
// Re. retryable errors
// > The requests that receive a response status code listed in following table SHOULD be retried.
// > All other 4xx or 5xx response status codes MUST NOT be retried
// > 429 Too Many Requests
// > 502 Bad Gateway
// > 503 Service Unavailable
// > 504 Gateway Timeout
// In loki, we expect clients to retry on 500 errors, so we map 500 errors to 503.
func OTLPError(w http.ResponseWriter, error string, code int, logger log.Logger) {
// Map 500 errors to 503. 500 errors are never retried on the client side, but 503 are.
if code == http.StatusInternalServerError {
code = http.StatusServiceUnavailable
}

// As per the OTLP spec, we send the status code on the http header.
w.WriteHeader(code)

// Status 0 because we omit the Status.code field.
status := grpcstatus.New(0, error).Proto()
respBytes, err := proto.Marshal(status)
if err != nil {
level.Error(logger).Log("msg", "failed to marshal error response", "error", err)
writeResponseFailedBody, _ := proto.Marshal(grpcstatus.New(
codes.Internal,
fmt.Sprintf("failed to marshal error response: %s", err.Error()),
).Proto())
_, _ = w.Write(writeResponseFailedBody)
return
}

w.Header().Set(contentType, "application/octet-stream")
if _, err = w.Write(respBytes); err != nil {
level.Error(logger).Log("msg", "failed to write error response", "error", err)
writeResponseFailedBody, _ := proto.Marshal(grpcstatus.New(
codes.Internal,
fmt.Sprintf("failed write error: %s", err.Error()),
).Proto())
_, _ = w.Write(writeResponseFailedBody)
}
}

var _ ErrorWriter = OTLPError

func HTTPError(w http.ResponseWriter, error string, code int, _ log.Logger) {
http.Error(w, error, code)
}

var _ ErrorWriter = HTTPError
2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1996,7 +1996,7 @@ google.golang.org/genproto/googleapis/api/expr/v1alpha1
google.golang.org/genproto/googleapis/api/label
google.golang.org/genproto/googleapis/api/metric
google.golang.org/genproto/googleapis/api/monitoredres
# google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28
# google.golang.org/genproto/googleapis/rpc v0.0.0-20241113202542-65e8d215514f
## explicit; go 1.21
google.golang.org/genproto/googleapis/rpc/code
google.golang.org/genproto/googleapis/rpc/errdetails
Expand Down
Loading