Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include request headers in client.Info #4547

Merged
merged 21 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

## 💡 Enhancements 💡

- `confighttp` and `configgrpc`: New config option `include_metadata` to persist request metadata/headers in `client.Info.Metadata` (experimental) (#4547)

## 🛑 Breaking changes 🛑

- Define a type `WatcherFunc` for onChange func instead of func pointer (#4656)
Expand Down
29 changes: 29 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ type Info struct {
// configauth.ServerAuthenticator implementations tied to the receiver for
// this connection.
Auth AuthData

// Metadata is the request metadata from the client connecting to this connector.
// Experimental: *NOTE* this structure is subject to change or removal in the future.
Metadata Metadata
}

// Metadata is an immutable map, meant to contain request metadata.
type Metadata struct {
data map[string][]string
}

// AuthData represents the authentication data as seen by authenticators tied to
Expand Down Expand Up @@ -137,3 +146,23 @@ func FromContext(ctx context.Context) Info {
}
return c
}

// NewMetadata creates a new Metadata object to use in Info. md is used as-is.
func NewMetadata(md map[string][]string) Metadata {
return Metadata{
data: md,
}
}

// Get gets the value of the key from metadata, returning a copy.
func (m Metadata) Get(key string) []string {
vals := m.data[key]
if len(vals) == 0 {
return nil
}

ret := make([]string, len(vals))
copy(ret, vals)

return ret
}
13 changes: 13 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,16 @@ func TestFromContext(t *testing.T) {
})
}
}

func TestMetadata(t *testing.T) {
source := map[string][]string{"test-key": {"test-val"}}
md := NewMetadata(source)
assert.Equal(t, []string{"test-val"}, md.Get("test-key"))

// test if copy. In regular use, source cannot change
val := md.Get("test-key")
source["test-key"][0] = "abc"
assert.Equal(t, []string{"test-val"}, val)

assert.Empty(t, md.Get("non-existent-key"))
}
31 changes: 22 additions & 9 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ type GRPCServerSettings struct {

// Auth for this receiver
Auth *configauth.Authentication `mapstructure:"auth,omitempty"`

// Include propagates the incoming connection's metadata to downstream consumers.
// Experimental: *NOTE* this option is subject to change or removal in the future.
IncludeMetadata bool `mapstructure:"include_metadata,omitempty"`
}

// SanitizedEndpoint strips the prefix of either http:// or https:// from configgrpc.GRPCClientSettings.Endpoint.
Expand Down Expand Up @@ -348,8 +352,8 @@ func (gss *GRPCServerSettings) ToServerOption(host component.Host, settings comp
otelgrpc.WithPropagators(otel.GetTextMapPropagator()),
))

uInterceptors = append(uInterceptors, enhanceWithClientInformation)
sInterceptors = append(sInterceptors, enhanceStreamWithClientInformation)
uInterceptors = append(uInterceptors, enhanceWithClientInformation(gss.IncludeMetadata))
sInterceptors = append(sInterceptors, enhanceStreamWithClientInformation(gss.IncludeMetadata))

opts = append(opts, grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...))

Expand All @@ -372,23 +376,32 @@ func getGRPCCompressionName(compressionType middleware.CompressionType) (string,

// enhanceWithClientInformation intercepts the incoming RPC, replacing the incoming context with one that includes
// a client.Info, potentially with the peer's address.
func enhanceWithClientInformation(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return handler(contextWithClient(ctx), req)
func enhanceWithClientInformation(includeMetadata bool) func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return handler(contextWithClient(ctx, includeMetadata), req)
}
}

func enhanceStreamWithClientInformation(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
wrapped := middleware.WrapServerStream(ss)
wrapped.WrappedContext = contextWithClient(ss.Context())
return handler(srv, wrapped)
func enhanceStreamWithClientInformation(includeMetadata bool) func(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return func(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
wrapped := middleware.WrapServerStream(ss)
wrapped.WrappedContext = contextWithClient(ss.Context(), includeMetadata)
return handler(srv, wrapped)
}
}

// contextWithClient attempts to add the peer address to the client.Info from the context. When no
// client.Info exists in the context, one is created.
func contextWithClient(ctx context.Context) context.Context {
func contextWithClient(ctx context.Context, includeMetadata bool) context.Context {
cl := client.FromContext(ctx)
if p, ok := peer.FromContext(ctx); ok {
cl.Addr = p.Addr
}
if includeMetadata {
if md, ok := metadata.FromIncomingContext(ctx); ok {
cl.Metadata = client.NewMetadata(md.Copy())
}
}
return client.NewContext(ctx, cl)
}

Expand Down
40 changes: 35 additions & 5 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,9 +643,10 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {

func TestContextWithClient(t *testing.T) {
testCases := []struct {
desc string
input context.Context
expected client.Info
desc string
input context.Context
doMetadata bool
expected client.Info
}{
{
desc: "no peer information, empty client",
Expand Down Expand Up @@ -695,10 +696,39 @@ func TestContextWithClient(t *testing.T) {
},
},
},
{
desc: "existing client with metadata",
input: client.NewContext(context.Background(), client.Info{
Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
}),
doMetadata: true,
expected: client.Info{
Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
},
},
{
desc: "existing client with metadata in context",
input: metadata.NewIncomingContext(
client.NewContext(context.Background(), client.Info{}),
metadata.Pairs("test-metadata-key", "test-value"),
),
doMetadata: true,
expected: client.Info{
Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
},
},
{
desc: "existing client with metadata in context, no metadata processing",
input: metadata.NewIncomingContext(
client.NewContext(context.Background(), client.Info{}),
metadata.Pairs("test-metadata-key", "test-value"),
),
expected: client.Info{},
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
cl := client.FromContext(contextWithClient(tC.input))
cl := client.FromContext(contextWithClient(tC.input, tC.doMetadata))
assert.Equal(t, tC.expected, cl)
})
}
Expand All @@ -721,7 +751,7 @@ func TestStreamInterceptorEnhancesClient(t *testing.T) {
}

// test
err := enhanceStreamWithClientInformation(nil, stream, nil, handler)
err := enhanceStreamWithClientInformation(false)(nil, stream, nil, handler)

// verify
assert.NoError(t, err)
Expand Down
11 changes: 9 additions & 2 deletions config/confighttp/clientinfohandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,32 @@ var _ http.Handler = (*clientInfoHandler)(nil)
// clientInfoHandler is an http.Handler that enhances the incoming request context with client.Info.
type clientInfoHandler struct {
next http.Handler

// include client metadata or not
includeMetadata bool
}

// ServeHTTP intercepts incoming HTTP requests, replacing the request's context with one that contains
// a client.Info containing the client's IP address.
func (h *clientInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
req = req.WithContext(contextWithClient(req))
req = req.WithContext(contextWithClient(req, h.includeMetadata))
h.next.ServeHTTP(w, req)
}

// contextWithClient attempts to add the client IP address to the client.Info from the context. When no
// client.Info exists in the context, one is created.
func contextWithClient(req *http.Request) context.Context {
func contextWithClient(req *http.Request, includeMetadata bool) context.Context {
cl := client.FromContext(req.Context())

ip := parseIP(req.RemoteAddr)
if ip != nil {
cl.Addr = ip
}

if includeMetadata {
cl.Metadata = client.NewMetadata(req.Header.Clone())
}

ctx := client.NewContext(req.Context(), cl)
return ctx
}
Expand Down
7 changes: 6 additions & 1 deletion config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ type HTTPServerSettings struct {

// Auth for this receiver
Auth *configauth.Authentication `mapstructure:"auth,omitempty"`

// IncludeMetadata propagates the client metadata from the incoming requests to the downstream consumers
// Experimental: *NOTE* this option is subject to change or removal in the future.
IncludeMetadata bool `mapstructure:"include_metadata,omitempty"`
}

// ToListener creates a net.Listener.
Expand Down Expand Up @@ -284,7 +288,8 @@ func (hss *HTTPServerSettings) ToServer(host component.Host, settings component.

// wrap the current handler in an interceptor that will add client.Info to the request's context
handler = &clientInfoHandler{
next: handler,
next: handler,
includeMetadata: hss.IncludeMetadata,
}

return &http.Server{
Expand Down
31 changes: 25 additions & 6 deletions config/confighttp/confighttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,17 +738,18 @@ func TestHttpHeaders(t *testing.T) {

func TestContextWithClient(t *testing.T) {
testCases := []struct {
desc string
input *http.Request
expected client.Info
desc string
input *http.Request
doMetadata bool
expected client.Info
}{
{
desc: "request without client IP",
desc: "request without client IP or headers",
input: &http.Request{},
expected: client.Info{},
},
{
desc: "request without client IP",
desc: "request with client IP",
input: &http.Request{
RemoteAddr: "1.2.3.4:55443",
},
Expand All @@ -758,10 +759,28 @@ func TestContextWithClient(t *testing.T) {
},
},
},
{
desc: "request with client headers, no metadata processing",
input: &http.Request{
Header: map[string][]string{"x-test-header": {"test-value"}},
},
doMetadata: false,
expected: client.Info{},
},
{
desc: "request with client headers",
input: &http.Request{
Header: map[string][]string{"x-test-header": {"test-value"}},
},
doMetadata: true,
expected: client.Info{
Metadata: client.NewMetadata(map[string][]string{"x-test-header": {"test-value"}}),
},
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
ctx := contextWithClient(tC.input)
ctx := contextWithClient(tC.input, tC.doMetadata)
assert.Equal(t, tC.expected, client.FromContext(ctx))
})
}
Expand Down