Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include request headers in client.Info #4547

Merged
merged 21 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
in a future release.
- `service.telemetry.metrics.level` and `service.telemetry.metrics.address`
should be used to configure collector self-metrics.
- `confighttp` and `configgrpc`: New config option `include_metadata` to persist request metadata/headers in `client.Info.Metadata` (#4547)

## 🧰 Bug fixes 🧰

Expand Down
3 changes: 3 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ type Info struct {
// configauth.ServerAuthenticator implementations tied to the receiver for
// this connection.
Auth AuthData

// Metadata is the request metadata from the client connecting to this connector.
Metadata map[string][]string
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
}

// AuthData represents the authentication data as seen by authenticators tied to
Expand Down
30 changes: 21 additions & 9 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ type GRPCServerSettings struct {

// Auth for this receiver
Auth *configauth.Authentication `mapstructure:"auth,omitempty"`

// Include propagates the incoming connection's metadata to downstream consumers.
IncludeMetadata bool `mapstructure:"include_metadata,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want users to be able to select which "metadata" to copy from the request? Maybe not now, but we can be safe and mark this experimental for the moment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly doable, blindly copying all of it could be bad for memory if the request is decorated with large metadata? But that would also mean we'd have to roll our own md.Copy() or header.Clone() routines, as copying-first-then-filtering would be pointless in terms of memory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why I recommend just adding a comment that this is experimental, and we should look a bit into how/if we want this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bogdandrutu added

}

// SanitizedEndpoint strips the prefix of either http:// or https:// from configgrpc.GRPCClientSettings.Endpoint.
Expand Down Expand Up @@ -365,8 +368,8 @@ func (gss *GRPCServerSettings) ToServerOption(host component.Host, settings comp
otelgrpc.WithPropagators(otel.GetTextMapPropagator()),
))

uInterceptors = append(uInterceptors, enhanceWithClientInformation)
sInterceptors = append(sInterceptors, enhanceStreamWithClientInformation)
uInterceptors = append(uInterceptors, enhanceWithClientInformation(gss.IncludeMetadata))
sInterceptors = append(sInterceptors, enhanceStreamWithClientInformation(gss.IncludeMetadata))

opts = append(opts, grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...))

Expand All @@ -385,23 +388,32 @@ func GetGRPCCompressionKey(compressionType string) string {

// enhanceWithClientInformation intercepts the incoming RPC, replacing the incoming context with one that includes
// a client.Info, potentially with the peer's address.
func enhanceWithClientInformation(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return handler(contextWithClient(ctx), req)
func enhanceWithClientInformation(includeMetadata bool) func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return handler(contextWithClient(ctx, includeMetadata), req)
}
}

func enhanceStreamWithClientInformation(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
wrapped := middleware.WrapServerStream(ss)
wrapped.WrappedContext = contextWithClient(ss.Context())
return handler(srv, wrapped)
func enhanceStreamWithClientInformation(includeMetadata bool) func(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return func(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
wrapped := middleware.WrapServerStream(ss)
wrapped.WrappedContext = contextWithClient(ss.Context(), includeMetadata)
return handler(srv, wrapped)
}
}

// contextWithClient attempts to add the peer address to the client.Info from the context. When no
// client.Info exists in the context, one is created.
func contextWithClient(ctx context.Context) context.Context {
func contextWithClient(ctx context.Context, includeMetadata bool) context.Context {
cl := client.FromContext(ctx)
if p, ok := peer.FromContext(ctx); ok {
cl.Addr = p.Addr
}
if includeMetadata {
if md, ok := metadata.FromIncomingContext(ctx); ok {
cl.Metadata = md.Copy()
}
}
return client.NewContext(ctx, cl)
}

Expand Down
39 changes: 34 additions & 5 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,10 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {

func TestContextWithClient(t *testing.T) {
testCases := []struct {
desc string
input context.Context
expected client.Info
desc string
input context.Context
doMetadata bool
expected client.Info
}{
{
desc: "no peer information, empty client",
Expand Down Expand Up @@ -624,10 +625,38 @@ func TestContextWithClient(t *testing.T) {
},
},
},
{
desc: "existing client with metadata, no metadata processing",
input: client.NewContext(context.Background(), client.Info{
Metadata: map[string][]string{"test-metadata-key": {"test-value"}},
}),
expected: client.Info{},
},
{
desc: "existing client with metadata",
input: client.NewContext(context.Background(), client.Info{
Metadata: map[string][]string{"test-metadata-key": {"test-value"}},
}),
doMetadata: true,
expected: client.Info{
Metadata: map[string][]string{"test-metadata-key": {"test-value"}},
},
},
{
desc: "existing client with metadata in context",
input: metadata.NewIncomingContext(
client.NewContext(context.Background(), client.Info{}),
metadata.Pairs("test-metadata-key", "test-value"),
),
doMetadata: true,
expected: client.Info{
Metadata: map[string][]string{"test-metadata-key": {"test-value"}},
},
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
cl := client.FromContext(contextWithClient(tC.input))
cl := client.FromContext(contextWithClient(tC.input, tC.doMetadata))
assert.Equal(t, tC.expected, cl)
})
}
Expand All @@ -650,7 +679,7 @@ func TestStreamInterceptorEnhancesClient(t *testing.T) {
}

// test
err := enhanceStreamWithClientInformation(nil, stream, nil, handler)
err := enhanceStreamWithClientInformation(false)(nil, stream, nil, handler)

// verify
assert.NoError(t, err)
Expand Down
11 changes: 9 additions & 2 deletions config/confighttp/clientinfohandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,32 @@ var _ http.Handler = (*clientInfoHandler)(nil)
// clientInfoHandler is an http.Handler that enhances the incoming request context with client.Info.
type clientInfoHandler struct {
next http.Handler

// include client metadata or not
includeMetadata bool
}

// ServeHTTP intercepts incoming HTTP requests, replacing the request's context with one that contains
// a client.Info containing the client's IP address.
func (h *clientInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
req = req.WithContext(contextWithClient(req))
req = req.WithContext(contextWithClient(req, h.includeMetadata))
h.next.ServeHTTP(w, req)
}

// contextWithClient attempts to add the client IP address to the client.Info from the context. When no
// client.Info exists in the context, one is created.
func contextWithClient(req *http.Request) context.Context {
func contextWithClient(req *http.Request, includeMetadata bool) context.Context {
cl := client.FromContext(req.Context())

ip := parseIP(req.RemoteAddr)
if ip != nil {
cl.Addr = ip
}

if includeMetadata {
cl.Metadata = req.Header.Clone()
}

ctx := client.NewContext(req.Context(), cl)
return ctx
}
Expand Down
6 changes: 5 additions & 1 deletion config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ type HTTPServerSettings struct {

// Auth for this receiver
Auth *configauth.Authentication `mapstructure:"auth,omitempty"`

// Include propagates the client metadata from the incoming requests to the downstream consumers
IncludeMetadata bool `mapstructure:"include_metadata,omitempty"`
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
}

// ToListener creates a net.Listener.
Expand Down Expand Up @@ -284,7 +287,8 @@ func (hss *HTTPServerSettings) ToServer(host component.Host, settings component.

// wrap the current handler in an interceptor that will add client.Info to the request's context
handler = &clientInfoHandler{
next: handler,
next: handler,
includeMetadata: hss.IncludeMetadata,
}

return &http.Server{
Expand Down
31 changes: 25 additions & 6 deletions config/confighttp/confighttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,17 +738,18 @@ func TestHttpHeaders(t *testing.T) {

func TestContextWithClient(t *testing.T) {
testCases := []struct {
desc string
input *http.Request
expected client.Info
desc string
input *http.Request
doMetadata bool
expected client.Info
}{
{
desc: "request without client IP",
desc: "request without client IP or headers",
input: &http.Request{},
expected: client.Info{},
},
{
desc: "request without client IP",
desc: "request with client IP",
input: &http.Request{
RemoteAddr: "1.2.3.4:55443",
},
Expand All @@ -758,10 +759,28 @@ func TestContextWithClient(t *testing.T) {
},
},
},
{
desc: "request with client headers, no metadata processing",
input: &http.Request{
Header: map[string][]string{"x-test-header": {"test-value"}},
},
doMetadata: false,
expected: client.Info{},
},
{
desc: "request with client headers",
input: &http.Request{
Header: map[string][]string{"x-test-header": {"test-value"}},
},
doMetadata: true,
expected: client.Info{
Metadata: map[string][]string{"x-test-header": {"test-value"}},
},
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
ctx := contextWithClient(tC.input)
ctx := contextWithClient(tC.input, tC.doMetadata)
assert.Equal(t, tC.expected, client.FromContext(ctx))
})
}
Expand Down