From 6acb1c5c6728210b20771dd2a9392abbf96d3956 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Mon, 13 Sep 2021 11:50:16 +0300 Subject: [PATCH] Added initial code --- .../README.rst | 23 ++++ .../setup.cfg | 50 ++++++++ .../setup.py | 38 ++++++ .../instrumentation/pika/__init__.py | 2 + .../instrumentation/pika/package.py | 16 +++ .../instrumentation/pika/pika_instrumentor.py | 108 ++++++++++++++++++ .../instrumentation/pika/utils.py | 78 +++++++++++++ .../instrumentation/pika/version.py | 15 +++ 8 files changed, 330 insertions(+) create mode 100644 instrumentation/opentelemetry-instrumentation-pika/README.rst create mode 100644 instrumentation/opentelemetry-instrumentation-pika/setup.cfg create mode 100644 instrumentation/opentelemetry-instrumentation-pika/setup.py create mode 100644 instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py create mode 100644 instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py create mode 100644 instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py create mode 100644 instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py create mode 100644 instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/version.py diff --git a/instrumentation/opentelemetry-instrumentation-pika/README.rst b/instrumentation/opentelemetry-instrumentation-pika/README.rst new file mode 100644 index 0000000000..d94cde9629 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/README.rst @@ -0,0 +1,23 @@ +OpenTelemetry Instrumentation +=========================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-pika.svg + :target: https://pypi.org/project/opentelemetry-instrumentation-pika/ + +This library allows tracing requests made by the library. + +Installation +------------ + +:: + + pip install opentelemetry-instrumentation-pika + + +References +---------- + +* `OpenTelemetry pika/ Tracing `_ +* `OpenTelemetry Project `_ diff --git a/instrumentation/opentelemetry-instrumentation-pika/setup.cfg b/instrumentation/opentelemetry-instrumentation-pika/setup.cfg new file mode 100644 index 0000000000..70e4f7e35f --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/setup.cfg @@ -0,0 +1,50 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[metadata] +name = opentelemetry-instrumentation-pika +description = OpenTelemetry pika instrumentation +long_description = file: README.rst +long_description_content_type = text/x-rst +author = OpenTelemetry Authors +author_email = cncf-opentelemetry-contributors@lists.cncf.io +url = https://github.com/open-telemetry/opentelemetry-python-contrib/instrumentation/opentelemetry-instrumentation-pika +platforms = any +license = Apache-2.0 +classifiers = + Development Status :: 4 - Beta + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + +[options] +python_requires = >=3.6 +package_dir= + =src +packages=find_namespace: + +install_requires = + opentelemetry-api ~= 1.5 + pika >= 1.1.0 + +[options.extras_require] +test = + +[options.packages.find] +where = src + diff --git a/instrumentation/opentelemetry-instrumentation-pika/setup.py b/instrumentation/opentelemetry-instrumentation-pika/setup.py new file mode 100644 index 0000000000..a54b3092c0 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/setup.py @@ -0,0 +1,38 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +import setuptools + +BASE_DIR = os.path.dirname(__file__) +VERSION_FILENAME = os.path.join( + BASE_DIR, + "src", + "opentelemetry", + "instrumentation", + "pika", + "version.py", +) +PACKAGE_INFO = {} +with open(VERSION_FILENAME) as f: + exec(f.read(), PACKAGE_INFO) + +setuptools.setup( + version=PACKAGE_INFO["__version__"], + entry_points={ + "opentelemetry_instrumentor": [ + "pika = opentelemetry.instrumentation.pika:PikaInstrumentor" + ] + }, +) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py new file mode 100644 index 0000000000..0cb0d945a1 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py @@ -0,0 +1,2 @@ +from .version import __version__ +from .pika_instrumentor import PikaInstrumentation diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py new file mode 100644 index 0000000000..b991957291 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py @@ -0,0 +1,16 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +_instruments = ("pika >= 1.1.0",) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py new file mode 100644 index 0000000000..92c3fb16fa --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -0,0 +1,108 @@ +import pika +from logging import getLogger +from opentelemetry import trace +from typing import Dict, Callable +from typing import Collection, Any +from pika.adapters import BaseConnection +from opentelemetry.propagate import inject +from opentelemetry.instrumentation.pika import utils +from opentelemetry.trace import Tracer, TracerProvider +from opentelemetry.semconv.trace import MessagingOperationValues +from opentelemetry.instrumentation.pika.package import _instruments +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor + + +_LOG = getLogger(__name__) +CTX_KEY = "__otel_task_span" + + +class PikaInstrumentation(BaseInstrumentor): + @staticmethod + def _instrument_consumers( + consumers_dict: Dict[str, Callable[..., Any]], tracer: Tracer + ) -> Any: + for key, callback in consumers_dict.items(): + + def decorated_callback( + channel: pika.channel.Channel, + method: pika.spec.Basic.Deliver, + properties: pika.spec.BasicProperties, + body: bytes, + ) -> Any: + if not properties: + properties = pika.spec.BasicProperties() + span = utils.get_span( + tracer, + channel, + properties, + task_name=key, + operation=MessagingOperationValues.RECEIVE, + ) + with trace.use_span(span, end_on_exit=True): + inject(properties.headers) + retval = callback(channel, method, properties, body) + return retval + + decorated_callback.__setattr__("_original_callback", callback) + consumers_dict[key] = decorated_callback + + @staticmethod + def _instrument_publish(channel: Any, tracer: Tracer) -> None: + original_basic_publish = channel.basic_publish + + def decorated_basic_publish( + exchange, routing_key, body, properties=None, mandatory=False + ): + if not properties: + properties = pika.spec.BasicProperties() + span = utils.get_span( + tracer, + channel, + properties, + task_name="(temporary)", + operation=None, + ) + with trace.use_span(span, end_on_exit=True): + inject(properties.headers) + retval = original_basic_publish( + exchange, routing_key, body, properties, mandatory + ) + return retval + + decorated_basic_publish.__setattr__( + "_original_function", original_basic_publish + ) + channel.basic_publish = decorated_basic_publish + + @staticmethod + def instrument_channel( + channel: Any, tracer_provider: TracerProvider + ) -> None: + if not hasattr(channel, "_impl") or not isinstance( + channel._impl, pika.channel.Channel + ): + _LOG.error("Could not find implementation for provided channel!") + return + tracer = trace.get_tracer(__name__, pika.__version__, tracer_provider) + if channel._impl._consumers: + PikaInstrumentation._instrument_consumers( + channel._impl._consumers, tracer + ) + PikaInstrumentation._instrument_publish(channel, tracer) + + def _uninstrument(self, connection: Any, **kwargs: Dict[str, Any]) -> None: + if not hasattr(connection, "_impl") or not isinstance( + connection._impl, BaseConnection + ): + _LOG.error("Could not find implementation for provided channel!") + return + for key, callback in connection._impl._consumers: + if hasattr(callback, "_original_callback"): + connection._consumers[key] = callback._original_callback + if hasattr(connection.basic_publish, "_original_function"): + connection.basic_publish = ( + connection.basic_publish._original_function + ) + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py new file mode 100644 index 0000000000..3789da44e2 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -0,0 +1,78 @@ +from typing import Optional +from pika.channel import Channel +from pika.spec import BasicProperties +from opentelemetry.trace import Tracer +from opentelemetry.trace.span import Span +from opentelemetry.propagate import extract +from opentelemetry.propagators.textmap import Getter +from opentelemetry.semconv.trace import ( + SpanAttributes, + MessagingOperationValues, +) + + +class PikaGetter(Getter): + def get(self, carrier, key): + value = carrier.get(key, None) + if value is None: + return None + return (value,) + + def keys(self, carrier): + return [] + + +pika_getter = PikaGetter() + + +def get_span( + tracer: Tracer, + channel: Channel, + properties: BasicProperties, + task_name: str, + operation: Optional[MessagingOperationValues], +) -> Span: + if properties.headers is None: + properties.headers = {} + ctx = extract(properties.headers, getter=pika_getter) + span = tracer.start_span( + context=ctx, name=generate_span_name(task_name, operation) + ) + enrich_span(span, channel, properties, task_name, operation) + return span + + +def generate_span_name( + task_name: str, operation: MessagingOperationValues +) -> str: + return f"{task_name} {operation.value}" + + +def enrich_span( + span: Span, + channel: Channel, + properties: BasicProperties, + task_destination: str, + operation: Optional[MessagingOperationValues], +) -> None: + span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "rabbitmq") + if operation: + span.set_attribute(SpanAttributes.MESSAGING_OPERATION, operation.value) + else: + span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) + span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, task_destination) + span.set_attribute( + SpanAttributes.MESSAGING_DESTINATION_KIND, properties.type + ) + span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, properties.message_id + ) + span.set_attribute( + SpanAttributes.MESSAGING_CONVERSATION_ID, properties.correlation_id + ) + span.set_attribute( + SpanAttributes.NET_PEER_NAME, channel.connection.params.host + ) + span.set_attribute( + SpanAttributes.NET_PEER_PORT, channel.connection.params.port + ) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/version.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/version.py new file mode 100644 index 0000000000..d33bd87ce4 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.24b0"