diff --git a/setup.py b/setup.py index 5d8f571..7b56de2 100644 --- a/setup.py +++ b/setup.py @@ -16,6 +16,8 @@ python_requires=">=3.7", install_requires=[ 'py_zipkin>=0.10.1', + 'opentelemetry-sdk>=0.26.1', + 'bravado>=11.0.3' ], keywords='zipkin', classifiers=[ diff --git a/swagger_zipkin/decorate_client.py b/swagger_zipkin/decorate_client.py index 0e18618..0d11295 100644 --- a/swagger_zipkin/decorate_client.py +++ b/swagger_zipkin/decorate_client.py @@ -44,7 +44,7 @@ class OperationDecorator(Generic[P, T]): :type func: callable """ - def __init__(self, operation: Resource, func: Callable[P, T]) -> None: + def __init__(self, operation: Operation, func: Callable[P, T]) -> None: self.operation = operation self.func = func @@ -56,8 +56,8 @@ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: def decorate_client( - api_client: Client, - func: Callable[P, T], + api_client: Resource, + func: Callable[[str, P.args, P.kwargs], T], name: str, ) -> Resource[P, T]: """A helper for decorating :class:`bravado.client.SwaggerClient`. diff --git a/swagger_zipkin/otel_decorator.py b/swagger_zipkin/otel_decorator.py new file mode 100644 index 0000000..ca40146 --- /dev/null +++ b/swagger_zipkin/otel_decorator.py @@ -0,0 +1,154 @@ +from __future__ import annotations + +from contextlib import contextmanager +from typing import Any +from typing import TypeVar + +from bravado.client import construct_request +from bravado.exception import HTTPError +from opentelemetry import trace +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.trace.span import format_span_id +from opentelemetry.trace.span import format_trace_id +from opentelemetry.trace.span import TraceFlags +from typing_extensions import ParamSpec + +from swagger_zipkin.decorate_client import Client +from swagger_zipkin.decorate_client import decorate_client +from swagger_zipkin.decorate_client import Resource + +T = TypeVar('T', covariant=True) +P = ParamSpec('P') + +tracer = trace.get_tracer("otel_decorator") + + +class OtelResourceDecorator: + """A wrapper to the swagger resource. + + :param resource: A resource object. eg. `client.pet`, `client.store`. + :type resource: :class:`swaggerpy.client.Resource` or :class:`bravado_core.resource.Resource` + """ + + def __init__(self, resource: Resource, client_identifier: str, smartstack_namespace: str) -> None: + self.resource = resource + self.client_identifier = client_identifier + self.smartstack_namespace = smartstack_namespace + + def __getattr__(self, name: str) -> Resource: + return decorate_client(self.resource, self.with_headers, name) + + def with_headers(self, call_name: str, *args: Any, **kwargs: Any) -> Any: + + with self.handle_exception(): + kwargs.setdefault('_request_options', {}) + request_options: dict = kwargs['_request_options'] + request_options.setdefault('headers', {}) + + operation = getattr(self.resource, call_name) + request = construct_request(operation, request_options, **kwargs) # type: ignore + + url = getattr(request, "url", "") + path = getattr(request, "path", "") + method = getattr(request, "method", "") + + parent_span = trace.get_current_span() + span_name = f"{method} {path}" + + with tracer.start_as_current_span( + span_name, kind=trace.SpanKind.CLIENT + ) as span: + span.set_attribute("url.path", url) + span.set_attribute("http.request.method", method) + span.set_attribute("client.namespace", self.client_identifier) + span.set_attribute("peer.service", self.smartstack_namespace) + span.set_attribute("server.namespace", self.smartstack_namespace) + span.set_attribute("http.response.status_code", "200") + + self.inject_otel_headers(kwargs, span) + self.inject_zipkin_headers(kwargs, span, parent_span) + + try: + operation(*args, **kwargs) + except HTTPError as e: + span.set_attribute("error.type", e.__class__.__name__) + span.set_status(trace.Status(trace.StatusCode.ERROR, e.message)) + span.set_attribute("http.response.status_code", e.status_code) + raise e + + return operation + + @contextmanager + def handle_exception( + self, + ) -> Any: + try: + yield + except Exception as e: + # not raising an exception if the instrumentation had a problem + raise e + + def __dir__(self) -> list[str]: + return dir(self.resource) # pragma: no cover + + def inject_otel_headers( + self, kwargs: dict[str, Any], current_span: trace.Span + ) -> None: + propagator = TraceContextTextMapPropagator() + carrier = kwargs['_request_options']["headers"] + propagator.inject(carrier=carrier, context=trace.set_span_in_context(current_span)) + + def inject_zipkin_headers( + self, kwargs: dict[str, Any], current_span: trace.Span, parent_span: trace.Span + ) -> None: + current_span_context = current_span.get_span_context() + kwargs["_request_options"]["headers"]["X-B3-TraceId"] = format_trace_id( + current_span_context.trace_id + ) + kwargs["_request_options"]["headers"]["X-B3-SpanId"] = format_span_id( + current_span_context.span_id + ) + if parent_span is not None and parent_span.is_recording(): + parent_span_context = parent_span.get_span_context() + kwargs["_request_options"]["headers"]["X-B3-ParentSpanId"] = format_span_id( + parent_span_context.span_id) + + kwargs["_request_options"]["headers"]["X-B3-Sampled"] = ( + "1" + if (current_span_context.trace_flags & TraceFlags.SAMPLED == TraceFlags.SAMPLED) + else "0" + ) + kwargs["_request_options"]["headers"]["X-B3-Flags"] = "0" + + +class OtelClientDecorator: + """A wrapper to swagger client (swagger-py or bravado) to pass on otel and zipkin + headers to the service call. It will also generate a CLIENT span for the outgoing call. + + Even though client is initialised once, all the calls made will have + independent spans. + + :param client: Swagger Client + :type client: :class:`swaggerpy.client.SwaggerClient` or :class:`bravado.client.SwaggerClient`. + :param client_identifier: the name of the service that is using this + generated clientlib + :type client_identifier: string + :param smartstack_namespace: the smartstack name of the paasta instance + this generated clientlib is hitting + :type smartstack_namespace: string + """ + + def __init__(self, client: Client, client_identifier: str, smartstack_namespace: str): + self._client = client + self.client_identifier = client_identifier + self.smartstack_namespace = smartstack_namespace + + def __getattr__(self, name: str) -> Client: + return OtelResourceDecorator( + getattr(self._client, name), + client_identifier=self.client_identifier, + smartstack_namespace=self.smartstack_namespace, + ) + + def __dir__(self) -> list[str]: + return dir(self._client) # pragma: no cover diff --git a/swagger_zipkin/zipkin_decorator.py b/swagger_zipkin/zipkin_decorator.py index 7cc9996..a678434 100644 --- a/swagger_zipkin/zipkin_decorator.py +++ b/swagger_zipkin/zipkin_decorator.py @@ -22,7 +22,7 @@ class ZipkinResourceDecorator: :type resource: :class:`swaggerpy.client.Resource` or :class:`bravado_core.resource.Resource` """ - def __init__(self, resource: Client, context_stack: Stack | None = None) -> None: + def __init__(self, resource: Resource, context_stack: Stack | None = None) -> None: self.resource = resource self._context_stack = context_stack diff --git a/tests/otel_decorator_test.py b/tests/otel_decorator_test.py new file mode 100644 index 0000000..904ffa6 --- /dev/null +++ b/tests/otel_decorator_test.py @@ -0,0 +1,186 @@ +from unittest import mock + +import pytest +from bravado.exception import HTTPError +from bravado.exception import HTTPInternalServerError +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.trace import SpanKind +from opentelemetry.trace.span import format_span_id +from opentelemetry.trace.span import format_trace_id + +from swagger_zipkin.otel_decorator import OtelClientDecorator +from swagger_zipkin.otel_decorator import OtelResourceDecorator + +memory_exporter = InMemorySpanExporter() +span_processor = SimpleSpanProcessor(memory_exporter) +trace.set_tracer_provider(TracerProvider()) +trace.get_tracer_provider().add_span_processor(span_processor) + +client_identifier = "test_client" +smartstack_namespace = "smartstack_namespace" +tracer = trace.get_tracer("otel_decorator") + + +@pytest.fixture +def setup(): + memory_exporter.clear() + + +@pytest.fixture +def get_request(): + mock_request = mock.Mock() + mock_request.url = "/sample-url" + mock_request.path = "/sample-url" + mock_request.method = "GET" + return mock_request + + +def create_request_options(parent_span: trace.Span, exported_span: trace.Span): + trace_id = format_trace_id(exported_span.get_span_context().trace_id) + span_id = format_span_id(exported_span.get_span_context().span_id) + + headers = {} + headers['headers'] = { + 'traceparent': f'00-{trace_id}-{span_id}-01', + 'X-B3-TraceId': trace_id, + 'X-B3-SpanId': span_id, + 'X-B3-Flags': '0', + 'X-B3-Sampled': '1', + } + if parent_span is not None: + headers['headers']['X-B3-ParentSpanId'] = format_span_id(parent_span.get_span_context().span_id) + + return headers + + +@mock.patch( + "swagger_zipkin.otel_decorator.construct_request", autospec=True +) +def test_client_request(mock_request, get_request, setup): + mock_request.return_value = get_request + + with tracer.start_as_current_span( + "parent_span", kind=trace.SpanKind.SERVER + ) as parent_span: + client = mock.Mock() + wrapped_client = OtelClientDecorator( + client, + client_identifier=client_identifier, + smartstack_namespace=smartstack_namespace + ) + resource = wrapped_client.resource + param1 = mock.Mock() + resource.operation(param1) + + assert len(memory_exporter.get_finished_spans()) == 1 + exported_span = memory_exporter.get_finished_spans()[0] + + client.resource.operation.assert_called_with( + param1, + _request_options=create_request_options(parent_span, exported_span) + ) + + assert exported_span.kind == SpanKind.CLIENT + assert exported_span.name == f"{get_request.method} {get_request.path}" + assert exported_span.attributes["url.path"] == get_request.path + assert exported_span.attributes["http.request.method"] == get_request.method + assert exported_span.attributes["client.namespace"] == client_identifier + assert exported_span.attributes["peer.service"] == smartstack_namespace + assert exported_span.attributes["server.namespace"] == smartstack_namespace + assert exported_span.attributes["http.response.status_code"] == "200" + + param2 = mock.Mock() + resource.operation(param2) + + assert len(memory_exporter.get_finished_spans()) == 2 + exported_span = memory_exporter.get_finished_spans()[1] + + client.resource.operation.assert_called_with( + param2, + _request_options=create_request_options(parent_span, exported_span) + ) + + assert exported_span.name == f"{get_request.method} {get_request.path}" + assert exported_span.attributes["url.path"] == get_request.path + assert exported_span.attributes["http.request.method"] == get_request.method + assert exported_span.attributes["client.namespace"] == client_identifier + assert exported_span.attributes["peer.service"] == smartstack_namespace + assert exported_span.attributes["server.namespace"] == smartstack_namespace + assert exported_span.attributes["http.response.status_code"] == "200" + + +@mock.patch( + "swagger_zipkin.otel_decorator.construct_request", autospec=True +) +def test_client_request_no_parent_span(mock_request, get_request, setup): + mock_request.return_value = get_request + + client = mock.Mock() + wrapped_client = OtelClientDecorator( + client, + client_identifier=client_identifier, + smartstack_namespace=smartstack_namespace + ) + resource = wrapped_client.resource + param = mock.Mock() + resource.operation(param) + + assert len(memory_exporter.get_finished_spans()) == 1 + exported_span = memory_exporter.get_finished_spans()[0] + + client.resource.operation.assert_called_with( + param, + _request_options=create_request_options(None, exported_span) + ) + + assert exported_span.kind == SpanKind.CLIENT + assert exported_span.name == f"{get_request.method} {get_request.path}" + assert exported_span.attributes["url.path"] == get_request.path + assert exported_span.attributes["http.request.method"] == get_request.method + assert exported_span.attributes["client.namespace"] == client_identifier + assert exported_span.attributes["peer.service"] == smartstack_namespace + assert exported_span.attributes["server.namespace"] == smartstack_namespace + assert exported_span.attributes["http.response.status_code"] == "200" + + +@mock.patch( + "swagger_zipkin.otel_decorator.construct_request", autospec=True +) +def test_with_headers_exception(mock_request, get_request, setup): + mock_request.return_value = get_request + + # Create a mock resource and configure it to raise an exception + mock_resource = mock.MagicMock() + mock_response = mock.MagicMock() + mock_response.status_code = "500" + mock_method = mock.MagicMock(side_effect=HTTPInternalServerError(response=mock_response)) + setattr(mock_resource, 'test_operation', mock_method) + + decorator = OtelResourceDecorator(resource=mock_resource, client_identifier="test_client", + smartstack_namespace="smartstack_namespace") + + args = () + kwargs = {'_request_options': {'headers': {}}} + + with pytest.raises(HTTPError): + decorator.with_headers("test_operation", *args, **kwargs) + + assert len(memory_exporter.get_finished_spans()) == 1 + exported_span = memory_exporter.get_finished_spans()[0] + + expected_headers = kwargs['_request_options']['headers'] + actual_headers = create_request_options(None, exported_span)['headers'] + assert expected_headers == actual_headers + + assert exported_span.kind == SpanKind.CLIENT + assert exported_span.name == f"{get_request.method} {get_request.path}" + assert exported_span.attributes["url.path"] == get_request.path + assert exported_span.attributes["http.request.method"] == get_request.method + assert exported_span.attributes["client.namespace"] == client_identifier + assert exported_span.attributes["peer.service"] == smartstack_namespace + assert exported_span.attributes["server.namespace"] == smartstack_namespace + assert exported_span.attributes["error.type"] == "HTTPInternalServerError" + assert exported_span.attributes["http.response.status_code"] == "500"