Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otelgrpc: Add peer attributes to spans recorded by stats handlers #4539

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Add the new `go.opentelemetry.io/contrib/instrgen` package to provide auto-generated source code instrumentation. (#3068, #3108)
- Add `SDK.Shutdown` method in `"go.opentelemetry.io/contrib/config"`. (#4583)
- Add peer attributes to spans recorded by `NewClientHandler`, `NewServerHandler` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4539)

### Changed

Expand Down Expand Up @@ -54,7 +55,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add metrics support (No-op, OTLP and Prometheus) to `go.opentelemetry.io/contrib/exporters/autoexport`. (#4229, #4479)
- Add support for `console` span exporter and metrics exporter in `go.opentelemetry.io/contrib/exporters/autoexport`. (#4486)
- Set unit and description on all instruments in `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp`. (#4500)
- Add metric support for `grpc.StatsHandler` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4356)
- Add metric support for `NewClientHandler`, `NewServerHandler` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4356)
- Expose the name of the scopes in all instrumentation libraries as `ScopeName`. (#4448)

### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ type gRPCContext struct {

type serverHandler struct {
*config
peerAttr []attribute.KeyValue
}

// NewServerHandler creates a stats.Handler for gRPC server.
// NewServerHandler creates a stats.Handler for a gRPC server.
//
// Caution: Do not share the handler across multiple gRPC servers.
func NewServerHandler(opts ...Option) stats.Handler {
h := &serverHandler{
config: newConfig(opts, "server"),
Expand All @@ -54,9 +57,7 @@ func NewServerHandler(opts ...Option) stats.Handler {

// TagConn can attach some information to the given context.
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
span := trace.SpanFromContext(ctx)
attrs := peerAttr(peerFromCtx(ctx))
span.SetAttributes(attrs...)
h.peerAttr = peerAttr(info.RemoteAddr.String())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this work if the Handler has two connections at once?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I will carefully test it tomorrow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will also double-check that the client handles concurrent calls without races.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also confirm that there can be a race on the client:

==================
WARNING: DATA RACE
Write at 0x00c0000a5048 by goroutine 12:
  go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc.(*clientHandler).TagConn()
      /home/rpajak/repos/opentelemetry-go-contrib/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go:137 +0x84

Previous read at 0x00c0000a5048 by goroutine 18:
  go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc.(*clientHandler).TagRPC()
      /home/rpajak/repos/opentelemetry-go-contrib/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go:119 +0x1e7

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also found out that TagConn may be also called after TagRPC (or even not at all according to my current testing).

I think that currently it may be impossible to implement.

return ctx
}

Expand All @@ -75,6 +76,7 @@ func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attrs...),
trace.WithAttributes(h.peerAttr...),
)

gctx := gRPCContext{
Expand All @@ -91,9 +93,12 @@ func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {

type clientHandler struct {
*config
peerAttr []attribute.KeyValue
}

// NewClientHandler creates a stats.Handler for gRPC client.
//
// Caution: Do not share the handler across multiple gRPC clients.
func NewClientHandler(opts ...Option) stats.Handler {
h := &clientHandler{
config: newConfig(opts, "client"),
Expand All @@ -111,6 +116,7 @@ func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attrs...),
trace.WithAttributes(h.peerAttr...),
)

gctx := gRPCContext{
Expand All @@ -127,10 +133,8 @@ func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
}

// TagConn can attach some information to the given context.
func (h *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context {
span := trace.SpanFromContext(ctx)
attrs := peerAttr(cti.RemoteAddr.String())
span.SetAttributes(attrs...)
func (h *clientHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
h.peerAttr = peerAttr(info.RemoteAddr.String())
return ctx
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"io"
"net"
"strconv"
"sync"
"testing"

Expand Down Expand Up @@ -74,7 +75,7 @@ func TestStatsHandler(t *testing.T) {
doCalls(client)

t.Run("ClientSpans", func(t *testing.T) {
checkClientSpans(t, clientSR.Ended())
checkClientSpans(t, clientSR.Ended(), listener.Addr().String())
})

t.Run("ClientMetrics", func(t *testing.T) {
Expand All @@ -90,9 +91,14 @@ func TestStatsHandler(t *testing.T) {
})
}

func checkClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
func checkClientSpans(t *testing.T, spans []trace.ReadOnlySpan, addr string) {
require.Len(t, spans, 5)

host, p, err := net.SplitHostPort(addr)
require.NoError(t, err)
port, err := strconv.Atoi(p)
require.NoError(t, err)

emptySpan := spans[0]
assert.False(t, emptySpan.EndTime().IsZero())
assert.Equal(t, "grpc.testing.TestService/EmptyCall", emptySpan.Name())
Expand Down Expand Up @@ -121,6 +127,8 @@ func checkClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr(host),
semconv.NetSockPeerPort(port),
}, emptySpan.Attributes())

largeSpan := spans[1]
Expand Down Expand Up @@ -151,6 +159,8 @@ func checkClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr(host),
semconv.NetSockPeerPort(port),
}, largeSpan.Attributes())

streamInput := spans[2]
Expand Down Expand Up @@ -209,6 +219,8 @@ func checkClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr(host),
semconv.NetSockPeerPort(port),
}, streamInput.Attributes())

streamOutput := spans[3]
Expand Down Expand Up @@ -266,6 +278,8 @@ func checkClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr(host),
semconv.NetSockPeerPort(port),
}, streamOutput.Attributes())

pingPong := spans[4]
Expand Down Expand Up @@ -350,6 +364,8 @@ func checkClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr(host),
semconv.NetSockPeerPort(port),
}, pingPong.Attributes())
}

Expand Down Expand Up @@ -379,11 +395,15 @@ func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
},
},
}, emptySpan.Events())
port, ok := findAttribute(emptySpan.Attributes(), semconv.NetSockPeerPortKey)
assert.True(t, ok)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethodKey.String("EmptyCall"),
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr("127.0.0.1"),
port,
}, emptySpan.Attributes())

largeSpan := spans[1]
Expand All @@ -409,11 +429,15 @@ func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
},
},
}, largeSpan.Events())
port, ok = findAttribute(largeSpan.Attributes(), semconv.NetSockPeerPortKey)
assert.True(t, ok)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethodKey.String("UnaryCall"),
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr("127.0.0.1"),
port,
}, largeSpan.Attributes())

streamInput := spans[2]
Expand Down Expand Up @@ -467,11 +491,15 @@ func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
},
// client does not record an event for the server response.
}, streamInput.Events())
port, ok = findAttribute(streamInput.Attributes(), semconv.NetSockPeerPortKey)
assert.True(t, ok)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethodKey.String("StreamingInputCall"),
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr("127.0.0.1"),
port,
}, streamInput.Attributes())

streamOutput := spans[3]
Expand Down Expand Up @@ -524,11 +552,15 @@ func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
},
},
}, streamOutput.Events())
port, ok = findAttribute(streamOutput.Attributes(), semconv.NetSockPeerPortKey)
assert.True(t, ok)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethodKey.String("StreamingOutputCall"),
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr("127.0.0.1"),
port,
}, streamOutput.Attributes())

pingPong := spans[4]
Expand Down Expand Up @@ -608,11 +640,15 @@ func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
},
},
}, pingPong.Events())
port, ok = findAttribute(pingPong.Attributes(), semconv.NetSockPeerPortKey)
assert.True(t, ok)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethodKey.String("FullDuplexCall"),
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr("127.0.0.1"),
port,
}, pingPong.Attributes())
}

Expand Down