diff --git a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py index 0e9b19ef51b..66b09860641 100644 --- a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py +++ b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py @@ -21,7 +21,7 @@ from opentelemetry.ext.grpc.version import __version__ -def client_interceptor(tracer_provider=None): +def client_interceptor(tracer_provider=None, meter=None): """Create a gRPC client channel interceptor. Args: @@ -34,7 +34,7 @@ def client_interceptor(tracer_provider=None): tracer = trace.get_tracer(__name__, __version__, tracer_provider) - return _client.OpenTelemetryClientInterceptor(tracer) + return _client.OpenTelemetryClientInterceptor(tracer, meter) def server_interceptor(tracer_provider=None): diff --git a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py index 373d8f345cf..a9977f771e3 100644 --- a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py +++ b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py @@ -28,7 +28,7 @@ from opentelemetry.trace.status import Status, StatusCanonicalCode from . import grpcext -from ._utilities import RpcInfo +from ._utilities import RpcInfo, TimedMetricRecorder class _GuardedSpan: @@ -79,8 +79,10 @@ def callback(response_future): class OpenTelemetryClientInterceptor( grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor ): - def __init__(self, tracer): + def __init__(self, tracer, meter): self._tracer = tracer + self._meter = meter + self._metrics_recorder = TimedMetricRecorder(self._meter, "client") def _start_span(self, method): return self._tracer.start_as_current_span( @@ -109,6 +111,14 @@ def _trace_result(self, guarded_span, rpc_info, result): def _start_guarded_span(self, *args, **kwargs): return _GuardedSpan(self._start_span(*args, **kwargs)) + def _bytes_out_iterator_wrapper(self, iterator, client_info): + for request in iterator: + if "ByteSize" in dir(request): + self._metrics_recorder.record_bytes_out( + request.ByteSize(), client_info.full_method + ) + yield request + def intercept_unary(self, request, metadata, client_info, invoker): if not metadata: mutable_metadata = OrderedDict() @@ -116,25 +126,41 @@ def intercept_unary(self, request, metadata, client_info, invoker): mutable_metadata = OrderedDict(metadata) with self._start_guarded_span(client_info.full_method) as guarded_span: - _inject_span_context(mutable_metadata) - metadata = tuple(mutable_metadata.items()) - - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout, - request=request, - ) - - try: - result = invoker(request, metadata) - except grpc.RpcError as exc: - guarded_span.generated_span.set_status( - Status(StatusCanonicalCode(exc.code().value[0])) + with self._metrics_recorder.record_latency( + client_info.full_method + ): + _inject_span_context(mutable_metadata) + metadata = tuple(mutable_metadata.items()) + + # If protobuf is used, we can record the bytes in/out. Otherwise, we have no way + # to get the size of the request/response properly, so don't record anything + if "ByteSize" in dir(request): + self._metrics_recorder.record_bytes_out( + request.ByteSize(), client_info.full_method + ) + + rpc_info = RpcInfo( + full_method=client_info.full_method, + metadata=metadata, + timeout=client_info.timeout, + request=request, ) - raise - return self._trace_result(guarded_span, rpc_info, result) + try: + result = invoker(request, metadata) + except grpc.RpcError as exc: + guarded_span.generated_span.set_status( + Status(StatusCanonicalCode(exc.code().value[0])) + ) + raise + + ret = self._trace_result(guarded_span, rpc_info, result) + + if "ByteSize" in dir(rpc_info.response): + self._metrics_recorder.record_bytes_in( + rpc_info.response.ByteSize(), client_info.full_method + ) + return ret # For RPCs that stream responses, the result can be a generator. To record # the span across the generated responses and detect any errors, we wrap @@ -148,25 +174,44 @@ def _intercept_server_stream( mutable_metadata = OrderedDict(metadata) with self._start_span(client_info.full_method) as span: - _inject_span_context(mutable_metadata) - metadata = tuple(mutable_metadata.items()) - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout, - ) - if client_info.is_client_stream: - rpc_info.request = request_or_iterator - - try: - result = invoker(request_or_iterator, metadata) - for response in result: - yield response - except grpc.RpcError as exc: - span.set_status( - Status(StatusCanonicalCode(exc.code().value[0])) + with self._metrics_recorder.record_latency( + client_info.full_method + ): + _inject_span_context(mutable_metadata) + metadata = tuple(mutable_metadata.items()) + rpc_info = RpcInfo( + full_method=client_info.full_method, + metadata=metadata, + timeout=client_info.timeout, ) - raise + + if client_info.is_client_stream: + rpc_info.request = request_or_iterator + request_or_iterator = self._bytes_out_iterator_wrapper( + request_or_iterator, client_info + ) + else: + if "ByteSize" in dir(request_or_iterator): + self._metrics_recorder.record_bytes_out( + request_or_iterator.ByteSize(), + client_info.full_method, + ) + + try: + result = invoker(request_or_iterator, metadata) + + # Rewrap the result stream into a generator, and record the bytes received + for response in result: + if "ByteSize" in dir(response): + self._metrics_recorder.record_bytes_in( + response.ByteSize(), client_info.full_method + ) + yield response + except grpc.RpcError as exc: + span.set_status( + Status(StatusCanonicalCode(exc.code().value[0])) + ) + raise def intercept_stream( self, request_or_iterator, metadata, client_info, invoker @@ -182,21 +227,36 @@ def intercept_stream( mutable_metadata = OrderedDict(metadata) with self._start_guarded_span(client_info.full_method) as guarded_span: - _inject_span_context(mutable_metadata) - metadata = tuple(mutable_metadata.items()) - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout, - request=request_or_iterator, - ) + with self._metrics_recorder.record_latency( + client_info.full_method + ): + _inject_span_context(mutable_metadata) + metadata = tuple(mutable_metadata.items()) + rpc_info = RpcInfo( + full_method=client_info.full_method, + metadata=metadata, + timeout=client_info.timeout, + request=request_or_iterator, + ) + + rpc_info.request = request_or_iterator + + request_or_iterator = self._bytes_out_iterator_wrapper( + request_or_iterator, client_info + ) + + try: + result = invoker(request_or_iterator, metadata) + except grpc.RpcError as exc: + guarded_span.generated_span.set_status( + Status(StatusCanonicalCode(exc.code().value[0])) + ) + raise + + ret = self._trace_result(guarded_span, rpc_info, result) - try: - result = invoker(request_or_iterator, metadata) - except grpc.RpcError as exc: - guarded_span.generated_span.set_status( - Status(StatusCanonicalCode(exc.code().value[0])) + self._metrics_recorder.record_bytes_in( + rpc_info.response.ByteSize(), client_info.full_method ) - raise - return self._trace_result(guarded_span, rpc_info, result) + return ret diff --git a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_utilities.py b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_utilities.py index b6ff7d311a4..1dfe31ec995 100644 --- a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_utilities.py +++ b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_utilities.py @@ -14,6 +14,13 @@ """Internal utilities.""" +from contextlib import contextmanager +from time import time + +import grpc + +from opentelemetry.sdk.metrics import Counter, ValueRecorder + class RpcInfo: def __init__( @@ -31,3 +38,75 @@ def __init__( self.request = request self.response = response self.error = error + + +class TimedMetricRecorder: + def __init__(self, meter, span_kind): + self._meter = meter + service_name = "grpcio" + self._span_kind = span_kind + base_attributes = ["method"] + + if self._meter: + self._duration = self._meter.create_metric( + name="{}/{}/duration".format(service_name, span_kind), + description="Duration of grpc requests to the server", + unit="ms", + value_type=float, + metric_type=ValueRecorder, + label_keys=base_attributes + ["error", "status_code"], + ) + self._error_count = self._meter.create_metric( + name="{}/{}/errors".format(service_name, span_kind), + description="Number of errors that were returned from the server", + unit="1", + value_type=int, + metric_type=Counter, + label_keys=base_attributes + ["status_code"], + ) + self._bytes_in = self._meter.create_metric( + name="{}/{}/bytes_in".format(service_name, span_kind), + description="Number of bytes received from the server", + unit="by", + value_type=int, + metric_type=Counter, + label_keys=base_attributes, + ) + self._bytes_out = self._meter.create_metric( + name="{}/{}/bytes_out".format(service_name, span_kind), + description="Number of bytes sent out through gRPC", + unit="by", + value_type=int, + metric_type=Counter, + label_keys=base_attributes, + ) + + def record_bytes_in(self, bytes_in, method): + if self._meter: + labels = {"method": method} + self._bytes_in.add(bytes_in, labels) + + def record_bytes_out(self, bytes_out, method): + if self._meter: + labels = {"method": method} + self._bytes_out.add(bytes_out, labels) + + @contextmanager + def record_latency(self, method): + start_time = time() + labels = {"method": method, "status_code": grpc.StatusCode.OK} + try: + yield labels + except grpc.RpcError as exc: + if self._meter: + # pylint: disable=no-member + labels["status_code"] = exc.code() + self._error_count.add(1, labels) + labels["error"] = True + raise + finally: + if self._meter: + if "error" not in labels: + labels["error"] = False + elapsed_time = (time() - start_time) * 1000 + self._duration.record(elapsed_time, labels) diff --git a/ext/opentelemetry-ext-grpc/tests/test_client_interceptor.py b/ext/opentelemetry-ext-grpc/tests/test_client_interceptor.py index 47dc9fa0bb6..39f12385c58 100644 --- a/ext/opentelemetry-ext-grpc/tests/test_client_interceptor.py +++ b/ext/opentelemetry-ext-grpc/tests/test_client_interceptor.py @@ -18,6 +18,10 @@ from opentelemetry import metrics, trace from opentelemetry.ext.grpc import client_interceptor from opentelemetry.ext.grpc.grpcext import intercept_channel +from opentelemetry.sdk.metrics.export.aggregate import ( + MinMaxSumCountAggregator, + SumAggregator, +) from opentelemetry.sdk.metrics.export.controller import PushController from opentelemetry.test.test_base import TestBase from tests.protobuf import test_server_pb2_grpc @@ -37,7 +41,7 @@ def setUp(self): self.server = create_test_server(25565) self.server.start() meter = metrics.get_meter(__name__) - interceptor = client_interceptor() + interceptor = client_interceptor(meter=meter) self.channel = intercept_channel( grpc.insecure_channel("localhost:25565"), interceptor ) @@ -52,6 +56,66 @@ def tearDown(self): self.memory_metrics_exporter.clear() self.server.stop(None) + def _verify_success_records(self, num_bytes_out, num_bytes_in, method): + self._controller.tick() + records = self.memory_metrics_exporter.get_exported_metrics() + self.assertEqual(len(records), 3) + + bytes_out = None + bytes_in = None + duration = None + + for record in records: + if record.instrument.name == "grpcio/client/duration": + duration = record + elif record.instrument.name == "grpcio/client/bytes_out": + bytes_out = record + elif record.instrument.name == "grpcio/client/bytes_in": + bytes_in = record + + self.assertIsNotNone(bytes_out) + self.assertEqual(bytes_out.instrument.name, "grpcio/client/bytes_out") + self.assertEqual(bytes_out.labels, (("method", method),)) + + self.assertIsNotNone(bytes_in) + self.assertEqual(bytes_in.instrument.name, "grpcio/client/bytes_in") + self.assertEqual(bytes_in.labels, (("method", method),)) + + self.assertIsNotNone(duration) + self.assertEqual(duration.instrument.name, "grpcio/client/duration") + self.assertEqual( + duration.labels, + ( + ("error", False), + ("method", method), + ("status_code", grpc.StatusCode.OK), + ), + ) + + self.assertEqual(type(bytes_out.aggregator), SumAggregator) + self.assertEqual(type(bytes_in.aggregator), SumAggregator) + self.assertEqual(type(duration.aggregator), MinMaxSumCountAggregator) + + self.assertEqual(bytes_out.aggregator.checkpoint, num_bytes_out) + self.assertEqual(bytes_in.aggregator.checkpoint, num_bytes_in) + + self.assertEqual(duration.aggregator.checkpoint.count, 1) + self.assertGreaterEqual(duration.aggregator.checkpoint.sum, 0) + + def test_unary_unary(self): + simple_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info(span, opentelemetry.ext.grpc) + + self._verify_success_records(8, 8, "/GRPCTestServer/SimpleMethod") + def test_unary_stream(self): server_streaming_method(self._stub) spans = self.memory_exporter.get_finished_spans() @@ -64,6 +128,10 @@ def test_unary_stream(self): # Check version and name in span's instrumentation info self.check_span_instrumentation_info(span, opentelemetry.ext.grpc) + self._verify_success_records( + 8, 40, "/GRPCTestServer/ServerStreamingMethod" + ) + def test_stream_unary(self): client_streaming_method(self._stub) spans = self.memory_exporter.get_finished_spans() @@ -76,6 +144,10 @@ def test_stream_unary(self): # Check version and name in span's instrumentation info self.check_span_instrumentation_info(span, opentelemetry.ext.grpc) + self._verify_success_records( + 40, 8, "/GRPCTestServer/ClientStreamingMethod" + ) + def test_stream_stream(self): bidirectional_streaming_method(self._stub) spans = self.memory_exporter.get_finished_spans() @@ -90,10 +162,56 @@ def test_stream_stream(self): # Check version and name in span's instrumentation info self.check_span_instrumentation_info(span, opentelemetry.ext.grpc) + self._verify_success_records( + 40, 40, "/GRPCTestServer/BidirectionalStreamingMethod" + ) + + def _verify_error_records(self, method): + self._controller.tick() + records = self.memory_metrics_exporter.get_exported_metrics() + self.assertEqual(len(records), 3) + + bytes_out = None + errors = None + duration = None + + for record in records: + if record.instrument.name == "grpcio/client/duration": + duration = record + elif record.instrument.name == "grpcio/client/bytes_out": + bytes_out = record + elif record.instrument.name == "grpcio/client/errors": + errors = record + + self.assertIsNotNone(bytes_out) + self.assertIsNotNone(errors) + self.assertIsNotNone(duration) + + self.assertEqual(errors.instrument.name, "grpcio/client/errors") + self.assertEqual( + errors.labels, + ( + ("method", method), + ("status_code", grpc.StatusCode.INVALID_ARGUMENT), + ), + ) + self.assertEqual(errors.aggregator.checkpoint, 1) + + self.assertEqual( + duration.labels, + ( + ("error", True), + ("method", method), + ("status_code", grpc.StatusCode.INVALID_ARGUMENT), + ), + ) + def test_error_simple(self): with self.assertRaises(grpc.RpcError): simple_method(self._stub, error=True) + self._verify_error_records("/GRPCTestServer/SimpleMethod") + spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) span = spans[0] @@ -106,6 +224,7 @@ def test_error_stream_unary(self): with self.assertRaises(grpc.RpcError): client_streaming_method(self._stub, error=True) + self._verify_error_records("/GRPCTestServer/ClientStreamingMethod") spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) span = spans[0] @@ -118,6 +237,8 @@ def test_error_unary_stream(self): with self.assertRaises(grpc.RpcError): server_streaming_method(self._stub, error=True) + self._verify_error_records("/GRPCTestServer/ServerStreamingMethod") + spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) span = spans[0] @@ -130,6 +251,10 @@ def test_error_stream_stream(self): with self.assertRaises(grpc.RpcError): bidirectional_streaming_method(self._stub, error=True) + self._verify_error_records( + "/GRPCTestServer/BidirectionalStreamingMethod" + ) + spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) span = spans[0]