diff --git a/CHANGELOG.md b/CHANGELOG.md index b77946b5d0..8d27a89531 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added + +- `opentelemetry-instrumentation-grpc` add supports to filter requests to instrument. ([#1241](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1241)) - Flask sqlalchemy psycopg2 integration ([#1224](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1224)) - Add metric instrumentation in fastapi @@ -125,7 +127,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `opentelemetry-instrumentation-starlette` Capture custom request/response headers in span attributes - ([#1046])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1046) + ([#1046](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1046)) ### Fixed - Prune autoinstrumentation sitecustomize module directory from PYTHONPATH immediately @@ -148,17 +150,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `opentelemetry-instrumentation-fastapi` Capture custom request/response headers in span attributes - ([#1032])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1032) + ([#1032](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1032)) - `opentelemetry-instrumentation-django` Capture custom request/response headers in span attributes - ([#1024])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1024) + ([#1024](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1024)) - `opentelemetry-instrumentation-asgi` Capture custom request/response headers in span attributes - ([#1004])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1004) + ([#1004](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1004)) - `opentelemetry-instrumentation-psycopg2` extended the sql commenter support of dbapi into psycopg2 ([#940](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/940)) - `opentelemetry-instrumentation-falcon` Add support for falcon==1.4.1 - ([#1000])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1000) + ([#1000](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1000)) - `opentelemetry-instrumentation-falcon` Falcon: Capture custom request/response headers in span attributes - ([#1003])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1003) + ([#1003](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1003)) - `opentelemetry-instrumentation-elasticsearch` no longer creates unique span names by including search target, replaces them with `` and puts the value in attribute `elasticsearch.target` ([#1018](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1018)) - `opentelemetry-instrumentation-pyramid` Handle non-HTTPException exceptions @@ -166,17 +168,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-system-metrics` restore `SystemMetrics` instrumentation as `SystemMetricsInstrumentor` ([#1012](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1012)) - `opentelemetry-instrumentation-pyramid` Pyramid: Capture custom request/response headers in span attributes - ([#1022])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1022) + ([#1022](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1022)) ## [1.10.0-0.29b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.10.0-0.29b0) - 2022-03-10 - `opentelemetry-instrumentation-wsgi` Capture custom request/response headers in span attributes - ([#925])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/925) + ([#925](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/925)) - `opentelemetry-instrumentation-flask` Flask: Capture custom request/response headers in span attributes - ([#952])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/952) + ([#952](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/952)) - `opentelemetry-instrumentation-tornado` Tornado: Capture custom request/response headers in span attributes - ([#950])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/950) + ([#950](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/950)) ### Added @@ -971,7 +973,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#572](https://github.com/open-telemetry/opentelemetry-python/pull/572)) - `opentelemetry-ext-sqlite3` Initial release - `opentelemetry-ext-psycopg2` Implement instrumentor interface, enabling auto-instrumentation - ([#694]https://github.com/open-telemetry/opentelemetry-python/pull/694) + ([#694](https://github.com/open-telemetry/opentelemetry-python/pull/694)) - `opentelemetry-ext-asgi` Add ASGI middleware ([#716](https://github.com/open-telemetry/opentelemetry-python/pull/716)) - `opentelemetry-ext-django` Add exclude list for paths and hosts to prevent from tracing diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index 177bfe67b5..9b4b0c61fd 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -118,13 +118,62 @@ def serve(): server = grpc.server(futures.ThreadPoolExecutor(), interceptors = [server_interceptor()]) +Filters +------- + +If you prefer to filter specific requests to be instrumented, you can specify +the condition by assigning filters to instrumentors. + +You can write a global server instrumentor as follows: + +.. code-block:: + + from opentelemetry.instrumentation.grpc import filters, GrpcInstrumentorServer + + grpc_server_instrumentor = GrpcInstrumentorServer( + filter_ = filters.any_of( + filters.method_name("SimpleMethod"), + filters.method_name("ComplexMethod"), + ) + ) + grpc_server_instrumentor.instrument() + +You can also use the filters directly on the provided interceptors: + +.. code-block:: + + my_interceptor = server_interceptor( + filter_ = filters.negate(filters.method_name("TestMethod")) + ) + server = grpc.server(futures.ThreadPoolExecutor(), + interceptors = [my_interceptor]) + +``filter_`` option also applies to both global and manual client intrumentors. + + +Environment variable +-------------------- + +If you'd like to exclude specific services for the instrumentations, you can use +``OTEL_PYTHON_GRPC_EXCLUDED_SERVICES`` environment variables. + +For example, if you assign ``"GRPCTestServer,GRPCHealthServer"`` to the variable, +then the global interceptor automatically adds the filters to exclude requests to +services ``GRPCTestServer`` and ``GRPCHealthServer``. + """ -from typing import Collection +import os +from typing import Callable, Collection, List, Union import grpc # pylint:disable=import-self from wrapt import wrap_function_wrapper as _wrap from opentelemetry import trace +from opentelemetry.instrumentation.grpc.filters import ( + any_of, + negate, + service_name, +) from opentelemetry.instrumentation.grpc.grpcext import intercept_channel from opentelemetry.instrumentation.grpc.package import _instruments from opentelemetry.instrumentation.grpc.version import __version__ @@ -145,10 +194,26 @@ class GrpcInstrumentorServer(BaseInstrumentor): grpc_server_instrumentor = GrpcInstrumentorServer() grpc_server_instrumentor.instrument() + If you want to add a filter that only intercept requests + to match the condition, pass ``filter_`` to GrpcInstrumentorServer. + + grpc_server_instrumentor = GrpcInstrumentorServer( + filter_=filters.method_prefix("SimpleMethod")) + grpc_server_instrumentor.instrument() + """ # pylint:disable=attribute-defined-outside-init, redefined-outer-name + def __init__(self, filter_=None): + excluded_service_filter = _excluded_service_filter() + if excluded_service_filter is not None: + if filter_ is None: + filter_ = excluded_service_filter + else: + filter_ = any_of(filter_, excluded_service_filter) + self._filter = filter_ + def instrumentation_dependencies(self) -> Collection[str]: return _instruments @@ -160,11 +225,16 @@ def server(*args, **kwargs): if "interceptors" in kwargs: # add our interceptor as the first kwargs["interceptors"].insert( - 0, server_interceptor(tracer_provider=tracer_provider) + 0, + server_interceptor( + tracer_provider=tracer_provider, filter_=self._filter + ), ) else: kwargs["interceptors"] = [ - server_interceptor(tracer_provider=tracer_provider) + server_interceptor( + tracer_provider=tracer_provider, filter_=self._filter + ) ] return self._original_func(*args, **kwargs) @@ -183,8 +253,25 @@ class GrpcInstrumentorClient(BaseInstrumentor): grpc_client_instrumentor = GrpcInstrumentorClient() grpc_client_instrumentor.instrument() + If you want to add a filter that only intercept requests + to match the condition, pass ``filter_`` option to GrpcInstrumentorClient. + + grpc_client_instrumentor = GrpcInstrumentorClient( + filter_=filters.negate(filters.health_check()) + ) + grpc_client_instrumentor.instrument() + """ + def __init__(self, filter_=None): + excluded_service_filter = _excluded_service_filter() + if excluded_service_filter is not None: + if filter_ is None: + filter_ = excluded_service_filter + else: + filter_ = any_of(filter_, excluded_service_filter) + self._filter = filter_ + # Figures out which channel type we need to wrap def _which_channel(self, kwargs): # handle legacy argument @@ -221,16 +308,23 @@ def wrapper_fn(self, original_func, instance, args, kwargs): tracer_provider = kwargs.get("tracer_provider") return intercept_channel( channel, - client_interceptor(tracer_provider=tracer_provider), + client_interceptor( + tracer_provider=tracer_provider, + filter_=self._filter, + ), ) -def client_interceptor(tracer_provider=None): +def client_interceptor(tracer_provider=None, filter_=None): """Create a gRPC client channel interceptor. Args: tracer: The tracer to use to create client-side spans. + filter_: filter function that returns True if gRPC requests + matches the condition. Default is None and intercept + all requests. + Returns: An invocation-side interceptor object. """ @@ -238,15 +332,19 @@ def client_interceptor(tracer_provider=None): tracer = trace.get_tracer(__name__, __version__, tracer_provider) - return _client.OpenTelemetryClientInterceptor(tracer) + return _client.OpenTelemetryClientInterceptor(tracer, filter_=filter_) -def server_interceptor(tracer_provider=None): +def server_interceptor(tracer_provider=None, filter_=None): """Create a gRPC server interceptor. Args: tracer: The tracer to use to create server-side spans. + filter_: filter function that returns True if gRPC requests + matches the condition. Default is None and intercept + all requests. + Returns: A service-side interceptor object. """ @@ -254,4 +352,24 @@ def server_interceptor(tracer_provider=None): tracer = trace.get_tracer(__name__, __version__, tracer_provider) - return _server.OpenTelemetryServerInterceptor(tracer) + return _server.OpenTelemetryServerInterceptor(tracer, filter_=filter_) + + +def _excluded_service_filter() -> Union[Callable[[object], bool], None]: + services = _parse_services( + os.environ.get("OTEL_PYTHON_GRPC_EXCLUDED_SERVICES", "") + ) + if len(services) == 0: + return None + filters = (service_name(srv) for srv in services) + return negate(any_of(*filters)) + + +def _parse_services(excluded_services: str) -> List[str]: + if excluded_services != "": + excluded_service_list = [ + s.strip() for s in excluded_services.split(",") + ] + else: + excluded_service_list = [] + return excluded_service_list diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index b73b822b14..55a46d4a49 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -62,8 +62,9 @@ def callback(response_future): class OpenTelemetryClientInterceptor( grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor ): - def __init__(self, tracer): + def __init__(self, tracer, filter_=None): self._tracer = tracer + self._filter = filter_ def _start_span(self, method, **kwargs): service, meth = method.lstrip("/").split("/", 1) @@ -148,6 +149,8 @@ def _intercept(self, request, metadata, client_info, invoker): return self._trace_result(span, rpc_info, result) def intercept_unary(self, request, metadata, client_info, invoker): + if self._filter is not None and not self._filter(client_info): + return invoker(request, metadata) return self._intercept(request, metadata, client_info, invoker) # For RPCs that stream responses, the result can be a generator. To record @@ -188,6 +191,9 @@ def intercept_stream( if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return invoker(request_or_iterator, metadata) + if self._filter is not None and not self._filter(client_info): + return invoker(request_or_iterator, metadata) + if client_info.is_server_stream: return self._intercept_server_stream( request_or_iterator, metadata, client_info, invoker diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index a76a81d670..6a8c1ef58b 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -173,9 +173,10 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor): Usage:: tracer = some OpenTelemetry tracer + filter = filters.negate(filters.method_name("service.Foo")) interceptors = [ - OpenTelemetryServerInterceptor(tracer), + OpenTelemetryServerInterceptor(tracer, filter), ] server = grpc.server( @@ -184,8 +185,9 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor): """ - def __init__(self, tracer): + def __init__(self, tracer, filter_=None): self._tracer = tracer + self._filter = filter_ @contextmanager def _set_remote_context(self, servicer_context): @@ -261,6 +263,9 @@ def _start_span( ) def intercept_service(self, continuation, handler_call_details): + if self._filter is not None and not self._filter(handler_call_details): + return continuation(handler_call_details) + def telemetry_wrapper(behavior, request_streaming, response_streaming): def telemetry_interceptor(request_or_iterator, context): diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/filters/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/filters/__init__.py new file mode 100644 index 0000000000..905bb8d696 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/filters/__init__.py @@ -0,0 +1,208 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from typing import Callable, TypeVar + +import grpc + +TCallDetails = TypeVar( + "TCallDetails", grpc.HandlerCallDetails, grpc.ClientCallDetails +) +Condition = Callable[[TCallDetails], bool] + + +def _full_method(metadata): + name = "" + if isinstance(metadata, grpc.HandlerCallDetails): + name = metadata.method + # NOTE: replace here if there's better way to match cases to handle + # grpcext._interceptor._UnaryClientInfo/_StreamClientInfo + elif hasattr(metadata, "full_method"): + name = metadata.full_method + return name + + +def _split_full_method(metadata): + name = _full_method(metadata) + service, method = os.path.split(name) + if service != "": + service = os.path.normpath(service) + service = service.lstrip("/") + return (service, method) + + +def all_of(*args: Condition[TCallDetails]) -> Condition[TCallDetails]: + """Returns a filter function that returns True if all filter functions + assigned matches conditions. + + Args: + args (function): a list of filter function + + Returns: + A filter function that returns True if all filter functions + assigned matches conditions. + """ + + def filter_fn(metadata): + return all(func(metadata) for func in args) + + return filter_fn + + +def any_of(*args: Condition[TCallDetails]) -> Condition[TCallDetails]: + """Returns a filter function that returns True if any of filter functions + assigned matches conditions. + + Args: + args (function): a list of filter function + + Returns: + A filter function that returns True if any of filter functions + assigned matches conditions. + """ + + def filter_fn(metadata): + return any(func(metadata) for func in args) + + return filter_fn + + +def negate(func: Condition[TCallDetails]) -> Condition[TCallDetails]: + """Returns a filter function that negate the result of func + + Args: + func (function): filter function to negate the result + + Returns: + A filter function that negate the result of func + """ + + def filter_fn(metadata): + return not func(metadata) + + return filter_fn + + +def method_name(name: str) -> Condition[TCallDetails]: + """Returns a filter function that return True if + request's gRPC method name matches name. + + Args: + name (str): method name to match + + Returns: + A filter function that returns True if request's gRPC method + name matches name + """ + + def filter_fn(metadata): + _, method = _split_full_method(metadata) + return method == name + + return filter_fn + + +def method_prefix(prefix: str) -> Condition[TCallDetails]: + """Returns a filter function that return True if + request's gRPC method name starts with prefix. + + Args: + prefix (str): method prefix to match + + Returns: + A filter function that returns True if request's gRPC method + name starts with prefix + """ + + def filter_fn(metadata): + _, method = _split_full_method(metadata) + return method.startswith(prefix) + + return filter_fn + + +def full_method_name(name: str) -> Condition[TCallDetails]: + """Returns a filter function that return True if + request's gRPC full method name matches name. + + Args: + name (str): full method name to match + + Returns: + A filter function that returns True if request's gRPC full + method name matches name + """ + + def filter_fn(metadata): + fm = _full_method(metadata) + return fm == name + + return filter_fn + + +def service_name(name: str) -> Condition[TCallDetails]: + """Returns a filter function that return True if + request's gRPC service name matches name. + + Args: + name (str): service name to match + + Returns: + A filter function that returns True if request's gRPC service + name matches name + """ + + def filter_fn(metadata): + service, _ = _split_full_method(metadata) + return service == name + + return filter_fn + + +def service_prefix(prefix: str) -> Condition[TCallDetails]: + """Returns a filter function that return True if + request's gRPC service name starts with prefix. + + Args: + prefix (str): service prefix to match + + Returns: + A filter function that returns True if request's gRPC service + name starts with prefix + """ + + def filter_fn(metadata): + service, _ = _split_full_method(metadata) + return service.startswith(prefix) + + return filter_fn + + +def health_check() -> Condition[TCallDetails]: + """Returns a Filter that returns true if the request's + service name is health check defined by gRPC Health Checking Protocol. + https://github.com/grpc/grpc/blob/master/doc/health-checking.md + """ + return service_prefix("grpc.health.v1.Health") + + +__all__ = [ + "method_name", + "method_prefix", + "full_method_name", + "service_name", + "service_prefix", + "health_check", +] diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py new file mode 100644 index 0000000000..a15268464b --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py @@ -0,0 +1,682 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from unittest import mock + +import grpc +from tests.protobuf import ( # pylint: disable=no-name-in-module + test_server_pb2_grpc, +) + +import opentelemetry.instrumentation.grpc +from opentelemetry import context, trace +from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient, filters +from opentelemetry.instrumentation.grpc._client import ( + OpenTelemetryClientInterceptor, +) +from opentelemetry.instrumentation.grpc.grpcext._interceptor import ( + _UnaryClientInfo, +) +from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.propagate import get_global_textmap, set_global_textmap +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.test.mock_textmap import MockTextMapPropagator +from opentelemetry.test.test_base import TestBase + +from ._client import ( + bidirectional_streaming_method, + client_streaming_method, + server_streaming_method, + simple_method, + simple_method_future, +) +from ._server import create_test_server +from .protobuf.test_server_pb2 import Request + + +# User defined interceptor. Is used in the tests along with the opentelemetry client interceptor. +class Interceptor( + grpc.UnaryUnaryClientInterceptor, + grpc.UnaryStreamClientInterceptor, + grpc.StreamUnaryClientInterceptor, + grpc.StreamStreamClientInterceptor, +): + def __init__(self): + pass + + def intercept_unary_unary( + self, continuation, client_call_details, request + ): + return self._intercept_call(continuation, client_call_details, request) + + def intercept_unary_stream( + self, continuation, client_call_details, request + ): + return self._intercept_call(continuation, client_call_details, request) + + def intercept_stream_unary( + self, continuation, client_call_details, request_iterator + ): + return self._intercept_call( + continuation, client_call_details, request_iterator + ) + + def intercept_stream_stream( + self, continuation, client_call_details, request_iterator + ): + return self._intercept_call( + continuation, client_call_details, request_iterator + ) + + @staticmethod + def _intercept_call( + continuation, client_call_details, request_or_iterator + ): + return continuation(client_call_details, request_or_iterator) + + +class TestClientProtoFilterMethodName(TestBase): + def setUp(self): + super().setUp() + GrpcInstrumentorClient( + filter_=filters.method_name("SimpleMethod") + ).instrument() + self.server = create_test_server(25565) + self.server.start() + # use a user defined interceptor along with the opentelemetry client interceptor + interceptors = [Interceptor()] + self.channel = grpc.insecure_channel("localhost:25565") + self.channel = grpc.intercept_channel(self.channel, *interceptors) + self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) + + def tearDown(self): + super().tearDown() + GrpcInstrumentorClient().uninstrument() + self.server.stop(None) + self.channel.close() + + def test_unary_unary_future(self): + simple_method_future(self._stub).result() + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + def test_unary_unary(self): + simple_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + self.assertSpanHasAttributes( + span, + { + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + def test_unary_stream(self): + server_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + def test_stream_unary(self): + client_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + def test_stream_stream(self): + bidirectional_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + def test_error_simple(self): + with self.assertRaises(grpc.RpcError): + simple_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs( + span.status.status_code, + trace.StatusCode.ERROR, + ) + + def test_error_stream_unary(self): + with self.assertRaises(grpc.RpcError): + client_streaming_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + def test_error_unary_stream(self): + with self.assertRaises(grpc.RpcError): + server_streaming_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + def test_error_stream_stream(self): + with self.assertRaises(grpc.RpcError): + bidirectional_streaming_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + def test_client_interceptor_trace_context_propagation( + self, + ): # pylint: disable=no-self-use + """ensure that client interceptor correctly inject trace context into all outgoing requests.""" + previous_propagator = get_global_textmap() + try: + set_global_textmap(MockTextMapPropagator()) + interceptor = OpenTelemetryClientInterceptor(trace.NoOpTracer()) + + carrier = tuple() + + def invoker(request, metadata): + nonlocal carrier + carrier = metadata + return {} + + request = Request(client_id=1, request_data="data") + interceptor.intercept_unary( + request, + {}, + _UnaryClientInfo( + full_method="/GRPCTestServer/SimpleMethod", timeout=None + ), + invoker=invoker, + ) + + assert len(carrier) == 2 + assert carrier[0][0] == "mock-traceid" + assert carrier[0][1] == "0" + assert carrier[1][0] == "mock-spanid" + assert carrier[1][1] == "0" + + finally: + set_global_textmap(previous_propagator) + + +class TestClientProtoFilterMethodPrefix(TestBase): + def setUp(self): + super().setUp() + GrpcInstrumentorClient( + filter_=filters.method_prefix("Simple") + ).instrument() + self.server = create_test_server(25565) + self.server.start() + # use a user defined interceptor along with the opentelemetry client interceptor + interceptors = [Interceptor()] + self.channel = grpc.insecure_channel("localhost:25565") + self.channel = grpc.intercept_channel(self.channel, *interceptors) + self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) + + def tearDown(self): + super().tearDown() + GrpcInstrumentorClient().uninstrument() + self.server.stop(None) + self.channel.close() + + def test_unary_unary_future(self): + simple_method_future(self._stub).result() + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + def test_unary_unary(self): + simple_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + self.assertSpanHasAttributes( + span, + { + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + def test_unary_stream(self): + server_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + def test_stream_unary(self): + client_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + def test_stream_stream(self): + bidirectional_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + def test_error_simple(self): + with self.assertRaises(grpc.RpcError): + simple_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs( + span.status.status_code, + trace.StatusCode.ERROR, + ) + + def test_error_stream_unary(self): + with self.assertRaises(grpc.RpcError): + client_streaming_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + def test_error_unary_stream(self): + with self.assertRaises(grpc.RpcError): + server_streaming_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + def test_error_stream_stream(self): + with self.assertRaises(grpc.RpcError): + bidirectional_streaming_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + def test_client_interceptor_trace_context_propagation( + self, + ): # pylint: disable=no-self-use + """ensure that client interceptor correctly inject trace context into all outgoing requests.""" + previous_propagator = get_global_textmap() + try: + set_global_textmap(MockTextMapPropagator()) + interceptor = OpenTelemetryClientInterceptor(trace.NoOpTracer()) + + carrier = tuple() + + def invoker(request, metadata): + nonlocal carrier + carrier = metadata + return {} + + request = Request(client_id=1, request_data="data") + interceptor.intercept_unary( + request, + {}, + _UnaryClientInfo( + full_method="/GRPCTestServer/SimpleMethod", timeout=None + ), + invoker=invoker, + ) + + assert len(carrier) == 2 + assert carrier[0][0] == "mock-traceid" + assert carrier[0][1] == "0" + assert carrier[1][0] == "mock-spanid" + assert carrier[1][1] == "0" + + finally: + set_global_textmap(previous_propagator) + + +class TestClientProtoFilterByEnv(TestBase): + def setUp(self): + with mock.patch.dict( + os.environ, + { + "OTEL_PYTHON_GRPC_EXCLUDED_SERVICES": "GRPCMockServer,GRPCTestServer" + }, + ): + super().setUp() + GrpcInstrumentorClient().instrument() + self.server = create_test_server(25565) + self.server.start() + # use a user defined interceptor along with the opentelemetry client interceptor + interceptors = [Interceptor()] + self.channel = grpc.insecure_channel("localhost:25565") + self.channel = grpc.intercept_channel(self.channel, *interceptors) + self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) + + def tearDown(self): + super().tearDown() + GrpcInstrumentorClient().uninstrument() + self.server.stop(None) + self.channel.close() + + def test_unary_unary_future(self): + simple_method_future(self._stub).result() + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + def test_unary_unary(self): + simple_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + +class TestClientProtoFilterByEnvAndOption(TestBase): + def setUp(self): + with mock.patch.dict( + os.environ, + {"OTEL_PYTHON_GRPC_EXCLUDED_SERVICES": "GRPCMockServer"}, + ): + super().setUp() + GrpcInstrumentorClient( + filter_=filters.service_prefix("GRPCTestServer") + ).instrument() + self.server = create_test_server(25565) + self.server.start() + # use a user defined interceptor along with the opentelemetry client interceptor + interceptors = [Interceptor()] + self.channel = grpc.insecure_channel("localhost:25565") + self.channel = grpc.intercept_channel(self.channel, *interceptors) + self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) + + def tearDown(self): + super().tearDown() + GrpcInstrumentorClient().uninstrument() + self.server.stop(None) + self.channel.close() + + def test_unary_unary_future(self): + simple_method_future(self._stub).result() + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + def test_unary_unary(self): + simple_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + self.assertSpanHasAttributes( + span, + { + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + def test_unary_stream(self): + server_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/ServerStreamingMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + self.assertSpanHasAttributes( + span, + { + SpanAttributes.RPC_METHOD: "ServerStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + def test_stream_unary(self): + client_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/ClientStreamingMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + self.assertSpanHasAttributes( + span, + { + SpanAttributes.RPC_METHOD: "ClientStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + def test_stream_stream(self): + bidirectional_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual( + span.name, "/GRPCTestServer/BidirectionalStreamingMethod" + ) + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + self.assertSpanHasAttributes( + span, + { + SpanAttributes.RPC_METHOD: "BidirectionalStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + def test_error_simple(self): + with self.assertRaises(grpc.RpcError): + simple_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs( + span.status.status_code, + trace.StatusCode.ERROR, + ) + + def test_error_stream_unary(self): + with self.assertRaises(grpc.RpcError): + client_streaming_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs( + span.status.status_code, + trace.StatusCode.ERROR, + ) + + def test_error_unary_stream(self): + with self.assertRaises(grpc.RpcError): + server_streaming_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs( + span.status.status_code, + trace.StatusCode.ERROR, + ) + + def test_error_stream_stream(self): + with self.assertRaises(grpc.RpcError): + bidirectional_streaming_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs( + span.status.status_code, + trace.StatusCode.ERROR, + ) + + def test_client_interceptor_trace_context_propagation( + self, + ): # pylint: disable=no-self-use + """ensure that client interceptor correctly inject trace context into all outgoing requests.""" + previous_propagator = get_global_textmap() + try: + set_global_textmap(MockTextMapPropagator()) + interceptor = OpenTelemetryClientInterceptor(trace.NoOpTracer()) + + carrier = tuple() + + def invoker(request, metadata): + nonlocal carrier + carrier = metadata + return {} + + request = Request(client_id=1, request_data="data") + interceptor.intercept_unary( + request, + {}, + _UnaryClientInfo( + full_method="/GRPCTestServer/SimpleMethod", timeout=None + ), + invoker=invoker, + ) + + assert len(carrier) == 2 + assert carrier[0][0] == "mock-traceid" + assert carrier[0][1] == "0" + assert carrier[1][0] == "mock-spanid" + assert carrier[1][1] == "0" + + finally: + set_global_textmap(previous_propagator) + + def test_unary_unary_with_suppress_key(self): + token = context.attach( + context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True) + ) + try: + simple_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + finally: + context.detach(token) + self.assertEqual(len(spans), 0) + + def test_unary_stream_with_suppress_key(self): + token = context.attach( + context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True) + ) + try: + server_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + finally: + context.detach(token) + self.assertEqual(len(spans), 0) + + def test_stream_unary_with_suppress_key(self): + token = context.attach( + context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True) + ) + try: + client_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + finally: + context.detach(token) + self.assertEqual(len(spans), 0) + + def test_stream_stream_with_suppress_key(self): + token = context.attach( + context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True) + ) + try: + bidirectional_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + finally: + context.detach(token) + self.assertEqual(len(spans), 0) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_filters.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_filters.py new file mode 100644 index 0000000000..f7d69074ac --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_filters.py @@ -0,0 +1,357 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections + +import grpc +import pytest + +from opentelemetry.instrumentation.grpc import filters + + +class _HandlerCallDetails( + collections.namedtuple( + "_HanlderCallDetails", + ( + "method", + "invocation_metadata", + ), + ), + grpc.HandlerCallDetails, +): + pass + + +class _UnaryClientInfo( + collections.namedtuple("_UnaryClientInfo", ("full_method", "timeout")) +): + pass + + +class _StreamClientInfo( + collections.namedtuple( + "_StreamClientInfo", + ("full_method", "is_client_stream", "is_server_stream", "timeout"), + ) +): + pass + + +@pytest.mark.parametrize( + "test_case", + [ + ( + True, + "SimpleMethod", + _HandlerCallDetails( + method="SimpleMethod", + invocation_metadata=[("tracer", "foo"), ("caller", "bar")], + ), + ), + ( + False, + "SimpleMethod", + _HandlerCallDetails( + method="NotSimpleMethod", + invocation_metadata=[("tracer", "foo"), ("caller", "bar")], + ), + ), + ( + True, + "SimpleMethod", + _UnaryClientInfo( + full_method="/GRPCTestServer/SimpleMethod", + timeout=3000, + ), + ), + ( + False, + "SimpleMethod", + _UnaryClientInfo( + full_method="/GRPCTestServer/NotSimpleMethod", + timeout=3000, + ), + ), + ( + True, + "SimpleMethod", + _StreamClientInfo( + full_method="/GRPCTestServer/SimpleMethod", + is_client_stream=True, + is_server_stream=False, + timeout=3000, + ), + ), + ( + False, + "SimpleMethod", + _StreamClientInfo( + full_method="/GRPCTestServer/NotSimpleMethod", + is_client_stream=True, + is_server_stream=False, + timeout=3000, + ), + ), + ], +) +def test_method_name(test_case): + fn = filters.method_name(test_case[1]) + assert test_case[0] == fn(test_case[2]) + + +@pytest.mark.parametrize( + "test_case", + [ + ( + True, + "Simple", + _HandlerCallDetails( + method="SimpleMethod", + invocation_metadata=[("tracer", "foo"), ("caller", "bar")], + ), + ), + ( + False, + "Simple", + _HandlerCallDetails( + method="NotSimpleMethod", + invocation_metadata=[("tracer", "foo"), ("caller", "bar")], + ), + ), + ( + True, + "Simple", + _UnaryClientInfo( + full_method="/GRPCTestServer/SimpleMethod", + timeout=3000, + ), + ), + ( + False, + "Simple", + _UnaryClientInfo( + full_method="/GRPCTestServer/NotSimpleMethod", + timeout=3000, + ), + ), + ( + True, + "Simple", + _StreamClientInfo( + full_method="/GRPCTestServer/SimpleMethod", + is_client_stream=True, + is_server_stream=False, + timeout=3000, + ), + ), + ( + False, + "Simple", + _StreamClientInfo( + full_method="/GRPCTestServer/NotSimpleMethod", + is_client_stream=True, + is_server_stream=False, + timeout=3000, + ), + ), + ], +) +def test_method_prefix(test_case): + fn = filters.method_prefix(test_case[1]) + assert test_case[0] == fn(test_case[2]) + + +@pytest.mark.parametrize( + "test_case", + [ + ( + True, + "GRPCTestServer", + _UnaryClientInfo( + full_method="/GRPCTestServer/SimpleMethod", + timeout=3000, + ), + ), + ( + False, + "GRPCTestServer", + _UnaryClientInfo( + full_method="/GRPCRealServer/SimpleMethod", + timeout=3000, + ), + ), + ( + True, + "GRPCTestServer", + _StreamClientInfo( + full_method="/GRPCTestServer/SimpleMethod", + is_client_stream=True, + is_server_stream=False, + timeout=3000, + ), + ), + ( + False, + "GRPCTestServer", + _StreamClientInfo( + full_method="/GRPCRealServer/SimpleMethod", + is_client_stream=True, + is_server_stream=False, + timeout=3000, + ), + ), + ], +) +def test_service_name(test_case): + fn = filters.service_name(test_case[1]) + assert test_case[0] == fn(test_case[2]) + + +@pytest.mark.parametrize( + "test_case", + [ + ( + True, + "GRPCTest", + _UnaryClientInfo( + full_method="/GRPCTestServer/SimpleMethod", + timeout=3000, + ), + ), + ( + False, + "GRPCTest", + _UnaryClientInfo( + full_method="/GRPCRealServer/SimpleMethod", + timeout=3000, + ), + ), + ( + True, + "GRPCTest", + _StreamClientInfo( + full_method="/GRPCTestServer/SimpleMethod", + is_client_stream=True, + is_server_stream=False, + timeout=3000, + ), + ), + ( + False, + "GRPCTest", + _StreamClientInfo( + full_method="/GRPCRealServer/SimpleMethod", + is_client_stream=True, + is_server_stream=False, + timeout=3000, + ), + ), + ], +) +def test_service_prefix(test_case): + fn = filters.service_prefix(test_case[1]) + assert test_case[0] == fn(test_case[2]) + + +@pytest.mark.parametrize( + "test_case", + [ + ( + True, + _UnaryClientInfo( + full_method="/grpc.health.v1.Health/Check", + timeout=3000, + ), + ), + ( + False, + _UnaryClientInfo( + full_method="/GRPCRealServer/SimpleMethod", + timeout=3000, + ), + ), + ( + True, + _StreamClientInfo( + full_method="/grpc.health.v1.Health/Check", + is_client_stream=True, + is_server_stream=False, + timeout=3000, + ), + ), + ( + False, + _StreamClientInfo( + full_method="/GRPCRealServer/SimpleMethod", + is_client_stream=True, + is_server_stream=False, + timeout=3000, + ), + ), + ], +) +def test_health_check(test_case): + fn = filters.health_check() + assert test_case[0] == fn(test_case[1]) + + +@pytest.mark.parametrize( + "test_case", + [ + ( + True, + filters.all_of( + filters.method_name("SimpleMethod"), + filters.service_name("GRPCTestServer"), + ), + _UnaryClientInfo( + full_method="/GRPCTestServer/SimpleMethod", + timeout=3000, + ), + ), + ( + True, + filters.any_of( + filters.method_name("NotSimpleMethod"), + filters.service_name("GRPCTestServer"), + ), + _UnaryClientInfo( + full_method="/GRPCTestServer/SimpleMethod", + timeout=3000, + ), + ), + ( + True, + filters.any_of( + filters.service_name("GRPCMockServer"), + filters.service_name("GRPCTestServer"), + ), + _UnaryClientInfo( + full_method="/GRPCTestServer/SimpleMethod", + timeout=3000, + ), + ), + ( + False, + filters.negate(filters.method_name("SimpleMethod")), + _UnaryClientInfo( + full_method="/GRPCTestServer/SimpleMethod", + timeout=3000, + ), + ), + ], +) +def test_all_any_negate(test_case): + fn = test_case[1] + assert test_case[0] == fn(test_case[2]) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_filter.py new file mode 100644 index 0000000000..95e70236cb --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_filter.py @@ -0,0 +1,215 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=unused-argument +# pylint:disable=no-self-use + +from concurrent import futures + +import grpc + +import opentelemetry.instrumentation.grpc +from opentelemetry import trace +from opentelemetry.instrumentation.grpc import ( + GrpcInstrumentorServer, + filters, + server_interceptor, +) +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.test.test_base import TestBase + +from .protobuf.test_server_pb2 import Request, Response +from .protobuf.test_server_pb2_grpc import ( + GRPCTestServerServicer, + add_GRPCTestServerServicer_to_server, +) + + +class UnaryUnaryMethodHandler(grpc.RpcMethodHandler): + def __init__(self, handler): + self.request_streaming = False + self.response_streaming = False + self.request_deserializer = None + self.response_serializer = None + self.unary_unary = handler + self.unary_stream = None + self.stream_unary = None + self.stream_stream = None + + +class UnaryUnaryRpcHandler(grpc.GenericRpcHandler): + def __init__(self, handler): + self._unary_unary_handler = handler + + def service(self, handler_call_details): + return UnaryUnaryMethodHandler(self._unary_unary_handler) + + +class Servicer(GRPCTestServerServicer): + """Our test servicer""" + + # pylint:disable=C0103 + def SimpleMethod(self, request, context): + return Response( + server_id=request.client_id, + response_data=request.request_data, + ) + + # pylint:disable=C0103 + def ServerStreamingMethod(self, request, context): + for data in ("one", "two", "three"): + yield Response( + server_id=request.client_id, + response_data=data, + ) + + +class TestOpenTelemetryServerInterceptorFilterMethodName(TestBase): + def test_instrumentor(self): + def handler(request, context): + return b"" + + grpc_server_instrumentor = GrpcInstrumentorServer( + filter_=filters.method_name("handler") + ) + grpc_server_instrumentor.instrument() + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + ) + + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + rpc_call = "TestServicer/handler" + try: + server.start() + channel.unary_unary(rpc_call)(b"test") + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + # Check attributes + self.assertSpanHasAttributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "handler", + SpanAttributes.RPC_SERVICE: "TestServicer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + grpc_server_instrumentor.uninstrument() + + def test_uninstrument(self): + def handler(request, context): + return b"" + + grpc_server_instrumentor = GrpcInstrumentorServer( + filter_=filters.method_name("SimpleMethod") + ) + grpc_server_instrumentor.instrument() + grpc_server_instrumentor.uninstrument() + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + ) + + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + rpc_call = "TestServicer/test" + try: + server.start() + channel.unary_unary(rpc_call)(b"test") + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 0) + + def test_create_span(self): + """Check that the interceptor wraps calls with spans server-side.""" + + # Intercept gRPC calls... + interceptor = server_interceptor( + filter_=filters.method_name("SimpleMethod") + ) + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(Servicer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + channel.unary_unary(rpc_call)(msg) + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + # Check attributes + self.assertSpanHasAttributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + )