diff --git a/CHANGELOG.md b/CHANGELOG.md index 99b702799e..57b92e590c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,9 +16,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1553](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1553)) - `opentelemetry/sdk/extension/aws` Implement [`aws.ecs.*`](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/cloud_provider/aws/ecs.md) and [`aws.logs.*`](https://opentelemetry.io/docs/reference/specification/resource/semantic_conventions/cloud_provider/aws/logs/) resource attributes in the `AwsEcsResourceDetector` detector when the ECS Metadata v4 is available ([#1212](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1212)) +- `opentelemetry-instrumentation-aio-pika` Support `aio_pika` 8.x + ([#1481](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1481)) - `opentelemetry-instrumentation-aws-lambda` Flush `MeterProvider` at end of function invocation. ([#1613](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1613)) -- Fix aiohttp bug with unset `trace_configs` ([#1592](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1592)) +- Fix aiohttp bug with unset `trace_configs` + ([#1592](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1592)) - `opentelemetry-instrumentation-django` Allow explicit `excluded_urls` configuration through `instrument()` ([#1618](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1618)) diff --git a/instrumentation/README.md b/instrumentation/README.md index a269b09397..b1482a0227 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -1,7 +1,7 @@ | Instrumentation | Supported Packages | Metrics support | | --------------- | ------------------ | --------------- | -| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika ~= 7.2.0 | No +| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika >= 7.2.0, < 9.0.0 | No | [opentelemetry-instrumentation-aiohttp-client](./opentelemetry-instrumentation-aiohttp-client) | aiohttp ~= 3.0 | No | [opentelemetry-instrumentation-aiopg](./opentelemetry-instrumentation-aiopg) | aiopg >= 0.13.0, < 2.0.0 | No | [opentelemetry-instrumentation-asgi](./opentelemetry-instrumentation-asgi) | asgiref ~= 3.0 | No diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/pyproject.toml b/instrumentation/opentelemetry-instrumentation-aio-pika/pyproject.toml index 4511254a70..994642e22a 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/pyproject.toml +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/pyproject.toml @@ -31,7 +31,7 @@ dependencies = [ [project.optional-dependencies] instruments = [ - "aio_pika ~= 7.2.0", + "aio_pika >= 7.2.0, < 9.0.0", ] test = [ "opentelemetry-instrumentation-aio-pika[instruments]", diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/package.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/package.py index 6c7ed74ea4..285e9f99cb 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/package.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/package.py @@ -13,4 +13,4 @@ # limitations under the License. from typing import Collection -_instruments: Collection[str] = ("aio_pika ~= 7.2.0",) +_instruments: Collection[str] = ("aio_pika >= 7.2.0, < 9.0.0",) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py index a61209e0ce..056f3dab25 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py @@ -24,7 +24,7 @@ ) from opentelemetry.trace import Span, SpanKind, Tracer -_DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: 'rabbitmq'} +_DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: "rabbitmq"} class SpanBuilder: @@ -49,18 +49,30 @@ def set_destination(self, destination: str): self._attributes[SpanAttributes.MESSAGING_DESTINATION] = destination def set_channel(self, channel: AbstractChannel): - url = channel.connection.connection.url - self._attributes.update({ - SpanAttributes.NET_PEER_NAME: url.host, - SpanAttributes.NET_PEER_PORT: url.port - }) + connection = channel.connection + if getattr(connection, "connection", None): + # aio_rmq 7 + url = connection.connection.url + else: + # aio_rmq 8 + url = connection.url + self._attributes.update( + { + SpanAttributes.NET_PEER_NAME: url.host, + SpanAttributes.NET_PEER_PORT: url.port, + } + ) def set_message(self, message: AbstractMessage): properties = message.properties if properties.message_id: - self._attributes[SpanAttributes.MESSAGING_MESSAGE_ID] = properties.message_id + self._attributes[ + SpanAttributes.MESSAGING_MESSAGE_ID + ] = properties.message_id if properties.correlation_id: - self._attributes[SpanAttributes.MESSAGING_CONVERSATION_ID] = properties.correlation_id + self._attributes[ + SpanAttributes.MESSAGING_CONVERSATION_ID + ] = properties.correlation_id def build(self) -> Optional[Span]: if not is_instrumentation_enabled(): @@ -69,9 +81,11 @@ def build(self) -> Optional[Span]: self._attributes[SpanAttributes.MESSAGING_OPERATION] = self._operation.value else: self._attributes[SpanAttributes.MESSAGING_TEMP_DESTINATION] = True - span = self._tracer.start_span(self._generate_span_name(), kind=self._kind, attributes=self._attributes) + span = self._tracer.start_span( + self._generate_span_name(), kind=self._kind, attributes=self._attributes + ) return span def _generate_span_name(self) -> str: - operation_value = self._operation.value if self._operation else 'send' - return f'{self._destination} {operation_value}' + operation_value = self._operation.value if self._operation else "send" + return f"{self._destination} {operation_value}" diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py index ada7080192..7e34c73c63 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py @@ -15,8 +15,10 @@ SERVER_URL = URL( f"amqp://{SERVER_USER}:{SERVER_PASS}@{SERVER_HOST}:{SERVER_PORT}/" ) -CONNECTION = Namespace(connection=Namespace(url=SERVER_URL)) -CHANNEL = Namespace(connection=CONNECTION, loop=None) +CONNECTION_7 = Namespace(connection=Namespace(url=SERVER_URL)) +CONNECTION_8 = Namespace(url=SERVER_URL) +CHANNEL_7 = Namespace(connection=CONNECTION_7, loop=None) +CHANNEL_8 = Namespace(connection=CONNECTION_8, loop=None) MESSAGE = Namespace( properties=Namespace( message_id=MESSAGE_ID, correlation_id=CORRELATION_ID, headers={} diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py index 70883c116c..7c9288a657 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio -from unittest import TestCase, mock +from unittest import TestCase, mock, skipIf -from aio_pika import Queue +from aio_pika import Queue, version_info from opentelemetry.instrumentation.aio_pika.callback_decorator import ( CallbackDecorator, @@ -23,7 +23,8 @@ from opentelemetry.trace import SpanKind, get_tracer from .consts import ( - CHANNEL, + CHANNEL_7, + CHANNEL_8, CORRELATION_ID, EXCHANGE_NAME, MESSAGE, @@ -35,7 +36,8 @@ ) -class TestInstrumentedQueue(TestCase): +@skipIf(version_info >= (8, 0), "Only for aio_pika 7") +class TestInstrumentedQueueAioRmq7(TestCase): EXPECTED_ATTRIBUTES = { SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME, @@ -52,7 +54,7 @@ def setUp(self): asyncio.set_event_loop(self.loop) def test_get_callback_span(self): - queue = Queue(CHANNEL, QUEUE_NAME, False, False, False, None) + queue = Queue(CHANNEL_7, QUEUE_NAME, False, False, False, None) tracer = mock.MagicMock() CallbackDecorator(tracer, queue)._get_span(MESSAGE) tracer.start_span.assert_called_once_with( @@ -62,7 +64,47 @@ def test_get_callback_span(self): ) def test_decorate_callback(self): - queue = Queue(CHANNEL, QUEUE_NAME, False, False, False, None) + queue = Queue(CHANNEL_7, QUEUE_NAME, False, False, False, None) + callback = mock.MagicMock(return_value=asyncio.sleep(0)) + with mock.patch.object( + CallbackDecorator, "_get_span" + ) as mocked_get_callback_span: + callback_decorator = CallbackDecorator(self.tracer, queue) + decorated_callback = callback_decorator.decorate(callback) + self.loop.run_until_complete(decorated_callback(MESSAGE)) + mocked_get_callback_span.assert_called_once() + callback.assert_called_once_with(MESSAGE) + + +@skipIf(version_info <= (8, 0), "Only for aio_pika 8") +class TestInstrumentedQueueAioRmq8(TestCase): + EXPECTED_ATTRIBUTES = { + SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, + SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME, + SpanAttributes.NET_PEER_NAME: SERVER_HOST, + SpanAttributes.NET_PEER_PORT: SERVER_PORT, + SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, + SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID, + SpanAttributes.MESSAGING_OPERATION: "receive", + } + + def setUp(self): + self.tracer = get_tracer(__name__) + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def test_get_callback_span(self): + queue = Queue(CHANNEL_8, QUEUE_NAME, False, False, False, None) + tracer = mock.MagicMock() + CallbackDecorator(tracer, queue)._get_span(MESSAGE) + tracer.start_span.assert_called_once_with( + f"{EXCHANGE_NAME} receive", + kind=SpanKind.CONSUMER, + attributes=self.EXPECTED_ATTRIBUTES, + ) + + def test_decorate_callback(self): + queue = Queue(CHANNEL_8, QUEUE_NAME, False, False, False, None) callback = mock.MagicMock(return_value=asyncio.sleep(0)) with mock.patch.object( CallbackDecorator, "_get_span" diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py index 80dfa3182b..e5586b9a00 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py @@ -13,9 +13,9 @@ # limitations under the License. import asyncio from typing import Type -from unittest import TestCase, mock +from unittest import TestCase, mock, skipIf -from aio_pika import Exchange, RobustExchange +from aio_pika import Exchange, RobustExchange, version_info from opentelemetry.instrumentation.aio_pika.publish_decorator import ( PublishDecorator, @@ -24,8 +24,10 @@ from opentelemetry.trace import SpanKind, get_tracer from .consts import ( - CHANNEL, - CONNECTION, + CHANNEL_7, + CHANNEL_8, + CONNECTION_7, + CONNECTION_8, CORRELATION_ID, EXCHANGE_NAME, MESSAGE, @@ -37,7 +39,8 @@ ) -class TestInstrumentedExchange(TestCase): +@skipIf(version_info >= (8, 0), "Only for aio_pika 7") +class TestInstrumentedExchangeAioRmq7(TestCase): EXPECTED_ATTRIBUTES = { SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}", @@ -54,7 +57,7 @@ def setUp(self): asyncio.set_event_loop(self.loop) def test_get_publish_span(self): - exchange = Exchange(CONNECTION, CHANNEL, EXCHANGE_NAME) + exchange = Exchange(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME) tracer = mock.MagicMock() PublishDecorator(tracer, exchange)._get_publish_span( MESSAGE, ROUTING_KEY @@ -66,7 +69,60 @@ def test_get_publish_span(self): ) def _test_publish(self, exchange_type: Type[Exchange]): - exchange = exchange_type(CONNECTION, CHANNEL, EXCHANGE_NAME) + exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME) + with mock.patch.object( + PublishDecorator, "_get_publish_span" + ) as mock_get_publish_span: + with mock.patch.object( + Exchange, "publish", return_value=asyncio.sleep(0) + ) as mock_publish: + decorated_publish = PublishDecorator( + self.tracer, exchange + ).decorate(mock_publish) + self.loop.run_until_complete( + decorated_publish(MESSAGE, ROUTING_KEY) + ) + mock_publish.assert_called_once() + mock_get_publish_span.assert_called_once() + + def test_publish(self): + self._test_publish(Exchange) + + def test_robust_publish(self): + self._test_publish(RobustExchange) + + +@skipIf(version_info <= (8, 0), "Only for aio_pika 8") +class TestInstrumentedExchangeAioRmq8(TestCase): + EXPECTED_ATTRIBUTES = { + SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, + SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}", + SpanAttributes.NET_PEER_NAME: SERVER_HOST, + SpanAttributes.NET_PEER_PORT: SERVER_PORT, + SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, + SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID, + SpanAttributes.MESSAGING_TEMP_DESTINATION: True, + } + + def setUp(self): + self.tracer = get_tracer(__name__) + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def test_get_publish_span(self): + exchange = Exchange(CHANNEL_8, EXCHANGE_NAME) + tracer = mock.MagicMock() + PublishDecorator(tracer, exchange)._get_publish_span( + MESSAGE, ROUTING_KEY + ) + tracer.start_span.assert_called_once_with( + f"{EXCHANGE_NAME},{ROUTING_KEY} send", + kind=SpanKind.PRODUCER, + attributes=self.EXPECTED_ATTRIBUTES, + ) + + def _test_publish(self, exchange_type: Type[Exchange]): + exchange = exchange_type(CONNECTION_8, CHANNEL_8, EXCHANGE_NAME) with mock.patch.object( PublishDecorator, "_get_publish_span" ) as mock_get_publish_span: diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py index 5f87d53846..a4a1d8ec8b 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py @@ -21,6 +21,6 @@ class TestBuilder(TestCase): def test_build(self): builder = SpanBuilder(get_tracer(__name__)) builder.set_as_consumer() - builder.set_destination('destination') + builder.set_destination("destination") span = builder.build() self.assertTrue(isinstance(span, Span)) diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py index 36fda70ab1..20c5a0b725 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py @@ -17,7 +17,7 @@ libraries = { "aio_pika": { - "library": "aio_pika ~= 7.2.0", + "library": "aio_pika >= 7.2.0, < 9.0.0", "instrumentation": "opentelemetry-instrumentation-aio-pika==0.37b0.dev", }, "aiohttp": { diff --git a/tox.ini b/tox.ini index 162656a4d0..e51515f906 100644 --- a/tox.ini +++ b/tox.ini @@ -206,6 +206,10 @@ envlist = py3{7,8,9,10,11}-test-instrumentation-pika{0,1} pypy3-test-instrumentation-pika{0,1} + ; opentelemetry-instrumentation-aio-pika + py3{7,8,9,10,11}-test-instrumentation-aio-pika{7,8} + pypy3-test-instrumentation-aio-pika{7,8} + ; opentelemetry-instrumentation-kafka-python py3{7,8,9,10,11}-test-instrumentation-kafka-python pypy3-test-instrumentation-kafka-python @@ -250,6 +254,8 @@ deps = sqlalchemy14: sqlalchemy~=1.4 pika0: pika>=0.12.0,<1.0.0 pika1: pika>=1.0.0 + aio-pika7: aio_pika~=7.2.0 + aio-pika8: aio_pika>=8.0.0,<9.0.0 pymemcache135: pymemcache ==1.3.5 pymemcache200: pymemcache >2.0.0,<3.0.0 pymemcache300: pymemcache >3.0.0,<3.4.2 @@ -296,6 +302,7 @@ changedir = test-instrumentation-logging: instrumentation/opentelemetry-instrumentation-logging/tests test-instrumentation-mysql: instrumentation/opentelemetry-instrumentation-mysql/tests test-instrumentation-pika{0,1}: instrumentation/opentelemetry-instrumentation-pika/tests + test-instrumentation-aio-pika{7,8}: instrumentation/opentelemetry-instrumentation-aio-pika/tests test-instrumentation-psycopg2: instrumentation/opentelemetry-instrumentation-psycopg2/tests test-instrumentation-pymemcache{135,200,300,342}: instrumentation/opentelemetry-instrumentation-pymemcache/tests test-instrumentation-pymongo: instrumentation/opentelemetry-instrumentation-pymongo/tests @@ -337,6 +344,8 @@ commands_pre = pika{0,1}: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test] + aio-pika{7,8}: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-aio-pika[test] + kafka-python: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-kafka-python[test] confluent-kafka: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-confluent-kafka[test]