diff --git a/interop/xds/client/Dockerfile b/interop/xds/client/Dockerfile index fa5a710d0b6b..c479bc32c03e 100644 --- a/interop/xds/client/Dockerfile +++ b/interop/xds/client/Dockerfile @@ -24,13 +24,13 @@ COPY . . # Build a static binary without cgo so that we can copy just the binary in the # final image, and can get rid of Go compiler and gRPC-Go dependencies. -RUN go build -tags osusergo,netgo interop/xds/client/client.go +RUN cd interop/xds/client && go build -tags osusergo,netgo . # Second stage of the build which copies over only the client binary and skips # the Go compiler and gRPC repo from the earlier stage. This significantly # reduces the docker image size. FROM alpine -COPY --from=build /go/src/grpc-go/client . +COPY --from=build /go/src/grpc-go/interop/xds/client/client . ENV GRPC_GO_LOG_VERBOSITY_LEVEL=99 ENV GRPC_GO_LOG_SEVERITY_LEVEL="info" ENV GRPC_GO_LOG_FORMATTER="json" diff --git a/interop/xds/client/client.go b/interop/xds/client/client.go index ff03428e1105..ec609ef3d884 100644 --- a/interop/xds/client/client.go +++ b/interop/xds/client/client.go @@ -25,6 +25,8 @@ import ( "fmt" "log" "net" + "net/http" + "os" "strings" "sync" "sync/atomic" @@ -35,15 +37,21 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/xds" "google.golang.org/grpc/grpclog" + _ "google.golang.org/grpc/interop/xds" // to register Custom LB. "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/reflection" + "google.golang.org/grpc/stats/opentelemetry" + "google.golang.org/grpc/stats/opentelemetry/csm" "google.golang.org/grpc/status" _ "google.golang.org/grpc/xds" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" - _ "google.golang.org/grpc/interop/xds" // to register Custom LB. + + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/sdk/metric" ) func init() { @@ -169,16 +177,19 @@ func (as *accumulatedStats) finishRPC(rpcType string, err error) { } var ( - failOnFailedRPC = flag.Bool("fail_on_failed_rpc", false, "Fail client if any RPCs fail after first success") - numChannels = flag.Int("num_channels", 1, "Num of channels") - printResponse = flag.Bool("print_response", false, "Write RPC response to stdout") - qps = flag.Int("qps", 1, "QPS per channel, for each type of RPC") - rpc = flag.String("rpc", "UnaryCall", "Types of RPCs to make, ',' separated string. RPCs can be EmptyCall or UnaryCall. Deprecated: Use Configure RPC to XdsUpdateClientConfigureServiceServer instead.") - rpcMetadata = flag.String("metadata", "", "The metadata to send with RPC, in format EmptyCall:key1:value1,UnaryCall:key2:value2. Deprecated: Use Configure RPC to XdsUpdateClientConfigureServiceServer instead.") - rpcTimeout = flag.Duration("rpc_timeout", 20*time.Second, "Per RPC timeout") - server = flag.String("server", "localhost:8080", "Address of server to connect to") - statsPort = flag.Int("stats_port", 8081, "Port to expose peer distribution stats service") - secureMode = flag.Bool("secure_mode", false, "If true, retrieve security configuration from the management server. Else, use insecure credentials.") + failOnFailedRPC = flag.Bool("fail_on_failed_rpc", false, "Fail client if any RPCs fail after first success") + numChannels = flag.Int("num_channels", 1, "Num of channels") + printResponse = flag.Bool("print_response", false, "Write RPC response to stdout") + qps = flag.Int("qps", 1, "QPS per channel, for each type of RPC") + rpc = flag.String("rpc", "UnaryCall", "Types of RPCs to make, ',' separated string. RPCs can be EmptyCall or UnaryCall. Deprecated: Use Configure RPC to XdsUpdateClientConfigureServiceServer instead.") + rpcMetadata = flag.String("metadata", "", "The metadata to send with RPC, in format EmptyCall:key1:value1,UnaryCall:key2:value2. Deprecated: Use Configure RPC to XdsUpdateClientConfigureServiceServer instead.") + rpcTimeout = flag.Duration("rpc_timeout", 20*time.Second, "Per RPC timeout") + server = flag.String("server", "localhost:8080", "Address of server to connect to") + statsPort = flag.Int("stats_port", 8081, "Port to expose peer distribution stats service") + secureMode = flag.Bool("secure_mode", false, "If true, retrieve security configuration from the management server. Else, use insecure credentials.") + enableCSMObservability = flag.Bool("enable_csm_observability", false, "Whether to enable CSM Observability") + requestPayloadSize = flag.Int("request_payload_size", 0, "Ask the server to respond with SimpleResponse.payload.body of the given length (may not be implemented on the server).") + responsePayloadSize = flag.Int("response_payload_size", 0, "Ask the server to respond with SimpleResponse.payload.body of the given length (may not be implemented on the server).") rpcCfgs atomic.Value @@ -368,6 +379,35 @@ func parseRPCMetadata(rpcMetadataStr string, rpcs []string) []*rpcConfig { func main() { flag.Parse() + if *enableCSMObservability { + exporter, err := prometheus.New() + if err != nil { + logger.Fatalf("Failed to start prometheus exporter: %v", err) + } + provider := metric.NewMeterProvider( + metric.WithReader(exporter), + ) + var addr string + var ok bool + if addr, ok = os.LookupEnv("OTEL_EXPORTER_PROMETHEUS_HOST"); !ok { + addr = "" + } + var port string + if port, ok = os.LookupEnv("OTEL_EXPORTER_PROMETHEUS_PORT"); !ok { + port = "9464" + } + go func() { + if err := http.ListenAndServe(addr+":"+port, promhttp.Handler()); err != nil { + logger.Fatalf("error listening: %v", err) + } + }() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + cleanup := csm.EnableObservability(ctx, opentelemetry.Options{MetricsOptions: opentelemetry.MetricsOptions{MeterProvider: provider}}) + defer cleanup() + } + rpcCfgs.Store(parseRPCMetadata(*rpcMetadata, parseRPCTypes(*rpc))) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *statsPort)) @@ -430,8 +470,16 @@ func makeOneRPC(c testgrpc.TestServiceClient, cfg *rpcConfig) (*peer.Peer, *rpcI accStats.startRPC(cfg.typ) switch cfg.typ { case unaryCall: + sr := &testpb.SimpleRequest{FillServerId: true} + if *requestPayloadSize > 0 { + sr.Payload = &testpb.Payload{Body: make([]byte, *requestPayloadSize)} + } + if *responsePayloadSize > 0 { + sr.ResponseSize = int32(*responsePayloadSize) + } + sr.ResponseSize = int32(*responsePayloadSize) var resp *testpb.SimpleResponse - resp, err = c.UnaryCall(ctx, &testpb.SimpleRequest{FillServerId: true}, grpc.Peer(&p), grpc.Header(&header)) + resp, err = c.UnaryCall(ctx, sr, grpc.Peer(&p), grpc.Header(&header)) // For UnaryCall, also read hostname from response, in case the server // isn't updated to send headers. if resp != nil { diff --git a/interop/xds/go.mod b/interop/xds/go.mod new file mode 100644 index 000000000000..115b2e1485eb --- /dev/null +++ b/interop/xds/go.mod @@ -0,0 +1,46 @@ +module google.golang.org/grpc/interop/xds + +go 1.21.0 + +replace google.golang.org/grpc => ../.. + +replace google.golang.org/grpc/stats/opentelemetry => ../../stats/opentelemetry + +require ( + github.com/prometheus/client_golang v1.19.1 + go.opentelemetry.io/otel/exporters/prometheus v0.49.0 + go.opentelemetry.io/otel/sdk/metric v1.27.0 + google.golang.org/grpc v1.64.0 + google.golang.org/grpc/stats/opentelemetry v0.0.0-20240523232201-f7d3d3eecbee +) + +require ( + cel.dev/expr v0.15.0 // indirect + cloud.google.com/go/compute/metadata v0.3.0 // indirect + github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.23.0 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b // indirect + github.com/envoyproxy/go-control-plane v0.12.0 // indirect + github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.53.0 // indirect + github.com/prometheus/procfs v0.15.0 // indirect + go.opentelemetry.io/contrib/detectors/gcp v1.27.0 // indirect + go.opentelemetry.io/otel v1.27.0 // indirect + go.opentelemetry.io/otel/metric v1.27.0 // indirect + go.opentelemetry.io/otel/sdk v1.27.0 // indirect + go.opentelemetry.io/otel/trace v1.27.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/oauth2 v0.20.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect + google.golang.org/protobuf v1.34.1 // indirect +) diff --git a/interop/xds/go.sum b/interop/xds/go.sum new file mode 100644 index 000000000000..d59483488963 --- /dev/null +++ b/interop/xds/go.sum @@ -0,0 +1,75 @@ +cel.dev/expr v0.15.0 h1:O1jzfJCQBfL5BFoYktaxwIhuttaQPsVWerH9/EEKx0w= +cel.dev/expr v0.15.0/go.mod h1:TRSuuV7DlVCE/uwv5QbAiW/v8l5O8C4eEPHeu7gf7Sg= +cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc= +cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.23.0 h1:yRhWveg9NbJcJYoJL4FoSauT2dxnt4N9MIAJ7tvU/mQ= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.23.0/go.mod h1:p2puVVSKjQ84Qb1gzw2XHLs34WQyHTYFZLaVxypAFYs= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= +github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnThWgvH2wg8376yUJmPhEH4H3kw= +github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.12.0 h1:4X+VP1GHd1Mhj6IB5mMeGbLCleqxjletLK6K0rbxyZI= +github.com/envoyproxy/go-control-plane v0.12.0/go.mod h1:ZBTaoJ23lqITozF0M6G4/IragXCQKCnYbmlmtHvwRG0= +github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= +github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= +github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.53.0 h1:U2pL9w9nmJwJDa4qqLQ3ZaePJ6ZTwt7cMD3AG3+aLCE= +github.com/prometheus/common v0.53.0/go.mod h1:BrxBKv3FWBIGXw89Mg1AeBq7FSyRzXWI3l3e7W3RN5U= +github.com/prometheus/procfs v0.15.0 h1:A82kmvXJq2jTu5YUhSGNlYoxh85zLnKgPz4bMZgI5Ek= +github.com/prometheus/procfs v0.15.0/go.mod h1:Y0RJ/Y5g5wJpkTisOtqwDSo4HwhGmLB4VQSw2sQJLHk= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/contrib/detectors/gcp v1.27.0 h1:eVfDeFAPnMFZUhNdDZ/BbpEmC7/xxDKTSba5NhJH88s= +go.opentelemetry.io/contrib/detectors/gcp v1.27.0/go.mod h1:amd+4uZxqJAUx7zI1JvygUtAc2EVWtQeyz8D+3161SQ= +go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= +go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +go.opentelemetry.io/otel/exporters/prometheus v0.49.0 h1:Er5I1g/YhfYv9Affk9nJLfH/+qCCVVg1f2R9AbJfqDQ= +go.opentelemetry.io/otel/exporters/prometheus v0.49.0/go.mod h1:KfQ1wpjf3zsHjzP149P4LyAwWRupc6c7t1ZJ9eXpKQM= +go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik= +go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak= +go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kTWmI= +go.opentelemetry.io/otel/sdk v1.27.0/go.mod h1:Ha9vbLwJE6W86YstIywK2xFfPjbWlCuwPtMkKdz/Y4A= +go.opentelemetry.io/otel/sdk/metric v1.27.0 h1:5uGNOlpXi+Hbo/DRoI31BSb1v+OGcpv2NemcCrOL8gI= +go.opentelemetry.io/otel/sdk/metric v1.27.0/go.mod h1:we7jJVrYN2kh3mVBlswtPU22K0SA+769l93J6bsyvqw= +go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= +go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo= +golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:7whR9kGa5LUwFtpLm2ArCEejtnxlGeLbAyjFY8sGNFw= +google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/interop/xds/server/Dockerfile b/interop/xds/server/Dockerfile index 534fdc12d39b..8c5dcd8d7db0 100644 --- a/interop/xds/server/Dockerfile +++ b/interop/xds/server/Dockerfile @@ -24,13 +24,13 @@ COPY . . # Build a static binary without cgo so that we can copy just the binary in the # final image, and can get rid of the Go compiler and gRPC-Go dependencies. -RUN go build -tags osusergo,netgo interop/xds/server/server.go +RUN cd interop/xds/server && go build -tags osusergo,netgo . -# Second stage of the build which copies over only the client binary and skips +# Second stage of the build which copies over only the server binary and skips # the Go compiler and gRPC repo from the earlier stage. This significantly # reduces the docker image size. FROM alpine -COPY --from=build /go/src/grpc-go/server . +COPY --from=build /go/src/grpc-go/interop/xds/server/server . ENV GRPC_GO_LOG_VERBOSITY_LEVEL=99 ENV GRPC_GO_LOG_SEVERITY_LEVEL="info" ENV GRPC_GO_LOG_FORMATTER="json" diff --git a/interop/xds/server/server.go b/interop/xds/server/server.go index dd54c9ac65c8..4b8fa2841b27 100644 --- a/interop/xds/server/server.go +++ b/interop/xds/server/server.go @@ -25,6 +25,7 @@ import ( "fmt" "log" "net" + "net/http" "os" "strconv" "strings" @@ -36,9 +37,11 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/health" - "google.golang.org/grpc/internal/status" "google.golang.org/grpc/metadata" "google.golang.org/grpc/reflection" + "google.golang.org/grpc/stats/opentelemetry" + "google.golang.org/grpc/stats/opentelemetry/csm" + "google.golang.org/grpc/status" "google.golang.org/grpc/xds" xdscreds "google.golang.org/grpc/credentials/xds" @@ -46,14 +49,19 @@ import ( healthpb "google.golang.org/grpc/health/grpc_health_v1" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/sdk/metric" ) var ( - port = flag.Int("port", 8080, "Listening port for test service") - maintenancePort = flag.Int("maintenance_port", 8081, "Listening port for maintenance services like health, reflection, channelz etc when -secure_mode is true. When -secure_mode is false, all these services will be registered on -port") - serverID = flag.String("server_id", "go_server", "Server ID included in response") - secureMode = flag.Bool("secure_mode", false, "If true, retrieve security configuration from the management server. Else, use insecure credentials.") - hostNameOverride = flag.String("host_name_override", "", "If set, use this as the hostname instead of the real hostname") + port = flag.Int("port", 8080, "Listening port for test service") + maintenancePort = flag.Int("maintenance_port", 8081, "Listening port for maintenance services like health, reflection, channelz etc when -secure_mode is true. When -secure_mode is false, all these services will be registered on -port") + serverID = flag.String("server_id", "go_server", "Server ID included in response") + secureMode = flag.Bool("secure_mode", false, "If true, retrieve security configuration from the management server. Else, use insecure credentials.") + hostNameOverride = flag.String("host_name_override", "", "If set, use this as the hostname instead of the real hostname") + enableCSMObservability = flag.Bool("enable_csm_observability", false, "Whether to enable CSM Observability") logger = grpclog.Component("interop") ) @@ -94,6 +102,11 @@ func (s *testServiceImpl) EmptyCall(ctx context.Context, _ *testpb.Empty) (*test func (s *testServiceImpl) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { response := &testpb.SimpleResponse{ServerId: s.serverID, Hostname: s.hostname} + if in.ResponseSize > 0 { + response.Payload = &testpb.Payload{ + Body: make([]byte, in.ResponseSize), + } + } forLoop: for _, headerVal := range getRPCBehaviorMetadata(ctx) { @@ -161,7 +174,7 @@ forLoop: } grpc.SetHeader(ctx, metadata.Pairs("hostname", s.hostname)) - return response, status.Err(codes.OK, "") + return response, status.Error(codes.OK, "") } func getRPCBehaviorMetadata(ctx context.Context) []string { @@ -214,6 +227,34 @@ func xdsServingModeCallback(addr net.Addr, args xds.ServingModeChangeArgs) { func main() { flag.Parse() + if *enableCSMObservability { + exporter, err := prometheus.New() + if err != nil { + logger.Fatalf("Failed to start prometheus exporter: %v", err) + } + var addr string + var ok bool + if addr, ok = os.LookupEnv("OTEL_EXPORTER_PROMETHEUS_HOST"); !ok { + addr = "" + } + var port string + if port, ok = os.LookupEnv("OTEL_EXPORTER_PROMETHEUS_PORT"); !ok { + port = "9464" + } + go func() { + if err := http.ListenAndServe(addr+":"+port, promhttp.Handler()); err != nil { + logger.Fatalf("error listening: %v", err) + } + }() + + provider := metric.NewMeterProvider( + metric.WithReader(exporter), + ) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + cleanup := csm.EnableObservability(ctx, opentelemetry.Options{MetricsOptions: opentelemetry.MetricsOptions{MeterProvider: provider}}) + defer cleanup() + } if *secureMode && *port == *maintenancePort { logger.Fatal("-port and -maintenance_port must be different when -secure_mode is set") diff --git a/stats/opentelemetry/csm/observability.go b/stats/opentelemetry/csm/observability.go index 01b34b00e7c0..466e7b52cd96 100644 --- a/stats/opentelemetry/csm/observability.go +++ b/stats/opentelemetry/csm/observability.go @@ -42,7 +42,7 @@ func EnableObservability(ctx context.Context, options opentelemetry.Options) fun csmPluginOption := newPluginOption(ctx) clientSideOTelWithCSM := dialOptionWithCSMPluginOption(options, csmPluginOption) clientSideOTel := opentelemetry.DialOption(options) - internal.AddGlobalPerTargetDialOptions.(func(opt any))(perTargetDialOption{ + internal.AddGlobalPerTargetDialOptions.(func(opt any))(&perTargetDialOption{ clientSideOTelWithCSM: clientSideOTelWithCSM, clientSideOTel: clientSideOTel, }) diff --git a/stats/opentelemetry/csm/pluginoption.go b/stats/opentelemetry/csm/pluginoption.go index 726f59e3a318..cc99e4c7c87f 100644 --- a/stats/opentelemetry/csm/pluginoption.go +++ b/stats/opentelemetry/csm/pluginoption.go @@ -177,7 +177,7 @@ func getEnv(name string) string { var ( // This function will be overridden in unit tests. getAttrSetFromResourceDetector = func(ctx context.Context) *attribute.Set { - r, err := resource.New(ctx, resource.WithDetectors(gcp.NewDetector())) + r, err := resource.New(ctx, resource.WithFromEnv(), resource.WithDetectors(gcp.NewDetector())) if err != nil { logger.Warningf("error reading OpenTelemetry resource: %v", err) }