From a5820c531f6afdd1e68cddc6a399a0593f983b73 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Mon, 8 Feb 2021 14:15:48 -0500 Subject: [PATCH] gRPC streaming bugfix (#260) --- CHANGELOG.md | 6 +- .../instrumentation/grpc/_server.py | 30 +++ .../tests/test_server_interceptor.py | 233 +++++++++++++++++- 3 files changed, 259 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 94c33be23d..37bb467331 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#308](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/308)) - `opentelemetry-instrumentation-boto` updated to set span attributes instead of overriding the resource. ([#310](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/310)) +- `opentelemetry-instrumentation-grpc` Fix issue tracking child spans in streaming responses + ([#260](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/260)) +- `opentelemetry-instrumentation-grpc` Updated client attributes, added tests, fixed examples, docs + ([#269](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/269)) ## [0.17b0](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.17b0) - 2021-01-20 @@ -78,8 +82,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246)) - Update TraceState to adhere to specs ([#276](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/276)) -- `opentelemetry-instrumentation-grpc` Updated client attributes, added tests, fixed examples, docs - ([#269](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/269)) ### Removed - Remove Configuration diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 3fe859f574..9895b8853f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -239,6 +239,15 @@ def intercept_service(self, continuation, handler_call_details): def telemetry_wrapper(behavior, request_streaming, response_streaming): def telemetry_interceptor(request_or_iterator, context): + # handle streaming responses specially + if response_streaming: + return self._intercept_server_stream( + behavior, + handler_call_details, + request_or_iterator, + context, + ) + with self._set_remote_context(context): with self._start_span( handler_call_details, context @@ -249,6 +258,7 @@ def telemetry_interceptor(request_or_iterator, context): # And now we run the actual RPC. try: return behavior(request_or_iterator, context) + except Exception as error: # Bare exceptions are likely to be gRPC aborts, which # we handle in our context wrapper. @@ -263,3 +273,23 @@ def telemetry_interceptor(request_or_iterator, context): return _wrap_rpc_behavior( continuation(handler_call_details), telemetry_wrapper ) + + # Handle streaming responses separately - we have to do this + # to return a *new* generator or various upstream things + # get confused, or we'll lose the consistent trace + def _intercept_server_stream( + self, behavior, handler_call_details, request_or_iterator, context + ): + + with self._set_remote_context(context): + with self._start_span(handler_call_details, context) as span: + context = _OpenTelemetryServicerContext(context, span) + + try: + yield from behavior(request_or_iterator, context) + + except Exception as error: + # pylint:disable=unidiomatic-typecheck + if type(error) != Exception: + span.record_exception(error) + raise error diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py index cb61043c15..86c7136c89 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py @@ -30,6 +30,12 @@ from opentelemetry.test.test_base import TestBase from opentelemetry.trace.status import StatusCode +from .protobuf.test_server_pb2 import Request, Response +from .protobuf.test_server_pb2_grpc import ( + GRPCTestServerServicer, + add_GRPCTestServerServicer_to_server, +) + class UnaryUnaryMethodHandler(grpc.RpcMethodHandler): def __init__(self, handler): @@ -51,6 +57,23 @@ def service(self, handler_call_details): return UnaryUnaryMethodHandler(self._unary_unary_handler) +class Servicer(GRPCTestServerServicer): + """Our test servicer""" + + # pylint:disable=C0103 + def SimpleMethod(self, request, context): + return Response( + server_id=request.client_id, response_data=request.request_data, + ) + + # pylint:disable=C0103 + def ServerStreamingMethod(self, request, context): + for data in ("one", "two", "three"): + yield Response( + server_id=request.client_id, response_data=data, + ) + + class TestOpenTelemetryServerInterceptor(TestBase): def test_instrumentor(self): def handler(request, context): @@ -134,25 +157,146 @@ def test_create_span(self): # Intercept gRPC calls... interceptor = server_interceptor() - # No-op RPC handler - def handler(request, context): - return b"" + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(Servicer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + channel.unary_unary(rpc_call)(msg) + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) + # Check attributes + self.assert_span_has_attributes( + span, + { + "net.peer.ip": "[::1]", + "net.peer.name": "localhost", + "rpc.method": "SimpleMethod", + "rpc.service": "GRPCTestServer", + "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], + }, + ) + + def test_create_two_spans(self): + """Verify that the interceptor captures sub spans within the given + trace""" + + class TwoSpanServicer(GRPCTestServerServicer): + # pylint:disable=C0103 + def SimpleMethod(self, request, context): + + # create another span + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span("child") as child: + child.add_event("child event") + + return Response( + server_id=request.client_id, + response_data=request.request_data, + ) + + # Intercept gRPC calls... + interceptor = server_interceptor() + + # setup the server server = grpc.server( futures.ThreadPoolExecutor(max_workers=1), options=(("grpc.so_reuseport", 0),), interceptors=[interceptor], ) + add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) - server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + # setup the RPC + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + channel.unary_unary(rpc_call)(msg) + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 2) + child_span = spans_list[0] + parent_span = spans_list[1] + + self.assertEqual(parent_span.name, rpc_call) + self.assertIs(parent_span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + parent_span, opentelemetry.instrumentation.grpc + ) + # Check attributes + self.assert_span_has_attributes( + parent_span, + { + "net.peer.ip": "[::1]", + "net.peer.name": "localhost", + "rpc.method": "SimpleMethod", + "rpc.service": "GRPCTestServer", + "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], + }, + ) + + # Check the child span + self.assertEqual(child_span.name, "child") + self.assertEqual( + parent_span.context.trace_id, child_span.context.trace_id + ) + + def test_create_span_streaming(self): + """Check that the interceptor wraps calls with spans server-side, on a + streaming call.""" + + # Intercept gRPC calls... + interceptor = server_interceptor() + + # setup the server + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(Servicer(), server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel("localhost:{:d}".format(port)) - rpc_call = "TestServicer/handler" + # setup the RPC + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() try: server.start() - channel.unary_unary(rpc_call)(b"") + list(channel.unary_stream(rpc_call)(msg)) finally: server.stop(None) @@ -174,13 +318,86 @@ def handler(request, context): { "net.peer.ip": "[::1]", "net.peer.name": "localhost", - "rpc.method": "handler", - "rpc.service": "TestServicer", + "rpc.method": "ServerStreamingMethod", + "rpc.service": "GRPCTestServer", + "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], + }, + ) + + def test_create_two_spans_streaming(self): + """Verify that the interceptor captures sub spans in a + streaming call, within the given trace""" + + class TwoSpanServicer(GRPCTestServerServicer): + # pylint:disable=C0103 + def ServerStreamingMethod(self, request, context): + + # create another span + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span("child") as child: + child.add_event("child event") + + for data in ("one", "two", "three"): + yield Response( + server_id=request.client_id, response_data=data, + ) + + # Intercept gRPC calls... + interceptor = server_interceptor() + + # setup the server + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + # setup the RPC + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + list(channel.unary_stream(rpc_call)(msg)) + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 2) + child_span = spans_list[0] + parent_span = spans_list[1] + + self.assertEqual(parent_span.name, rpc_call) + self.assertIs(parent_span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + parent_span, opentelemetry.instrumentation.grpc + ) + + # Check attributes + self.assert_span_has_attributes( + parent_span, + { + "net.peer.ip": "[::1]", + "net.peer.name": "localhost", + "rpc.method": "ServerStreamingMethod", + "rpc.service": "GRPCTestServer", "rpc.system": "grpc", "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], }, ) + # Check the child span + self.assertEqual(child_span.name, "child") + self.assertEqual( + parent_span.context.trace_id, child_span.context.trace_id + ) + def test_span_lifetime(self): """Check that the span is active for the duration of the call."""