diff --git a/instrumentation/README.md b/instrumentation/README.md index a269b09397..721b0926d2 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -1,7 +1,7 @@ | Instrumentation | Supported Packages | Metrics support | | --------------- | ------------------ | --------------- | -| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika ~= 7.2.0 | No +| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika >= 7.2.0 | No | [opentelemetry-instrumentation-aiohttp-client](./opentelemetry-instrumentation-aiohttp-client) | aiohttp ~= 3.0 | No | [opentelemetry-instrumentation-aiopg](./opentelemetry-instrumentation-aiopg) | aiopg >= 0.13.0, < 2.0.0 | No | [opentelemetry-instrumentation-asgi](./opentelemetry-instrumentation-asgi) | asgiref ~= 3.0 | No @@ -42,4 +42,4 @@ | [opentelemetry-instrumentation-tortoiseorm](./opentelemetry-instrumentation-tortoiseorm) | tortoise-orm >= 0.17.0 | No | [opentelemetry-instrumentation-urllib](./opentelemetry-instrumentation-urllib) | urllib | Yes | [opentelemetry-instrumentation-urllib3](./opentelemetry-instrumentation-urllib3) | urllib3 >= 1.0.0, < 2.0.0 | Yes -| [opentelemetry-instrumentation-wsgi](./opentelemetry-instrumentation-wsgi) | wsgi | Yes \ No newline at end of file +| [opentelemetry-instrumentation-wsgi](./opentelemetry-instrumentation-wsgi) | wsgi | Yes diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py index a61209e0ce..efcb65661a 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py @@ -49,7 +49,13 @@ def set_destination(self, destination: str): self._attributes[SpanAttributes.MESSAGING_DESTINATION] = destination def set_channel(self, channel: AbstractChannel): - url = channel.connection.connection.url + connection = channel.connection + if getattr(connection, "connection"): + # aio_rmq 7 + url = connection.connection.url + else: + # aio_rmq 8 + url = connection.url self._attributes.update({ SpanAttributes.NET_PEER_NAME: url.host, SpanAttributes.NET_PEER_PORT: url.port diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py index ada7080192..601d1a445f 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py @@ -15,7 +15,8 @@ SERVER_URL = URL( f"amqp://{SERVER_USER}:{SERVER_PASS}@{SERVER_HOST}:{SERVER_PORT}/" ) -CONNECTION = Namespace(connection=Namespace(url=SERVER_URL)) +CONNECTION_7 = Namespace(connection=Namespace(url=SERVER_URL)) +CONNECTION_8 = Namespace(connection=Namespace(url=SERVER_URL)) CHANNEL = Namespace(connection=CONNECTION, loop=None) MESSAGE = Namespace( properties=Namespace( diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py index 80dfa3182b..35ddb6bf29 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py @@ -25,7 +25,8 @@ from .consts import ( CHANNEL, - CONNECTION, + CONNECTION_7, + CONNECTION_8, CORRELATION_ID, EXCHANGE_NAME, MESSAGE, @@ -37,7 +38,7 @@ ) -class TestInstrumentedExchange(TestCase): +class TestInstrumentedExchangeAioRmq7(TestCase): EXPECTED_ATTRIBUTES = { SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}", @@ -54,7 +55,7 @@ def setUp(self): asyncio.set_event_loop(self.loop) def test_get_publish_span(self): - exchange = Exchange(CONNECTION, CHANNEL, EXCHANGE_NAME) + exchange = Exchange(CONNECTION_7, CHANNEL, EXCHANGE_NAME) tracer = mock.MagicMock() PublishDecorator(tracer, exchange)._get_publish_span( MESSAGE, ROUTING_KEY @@ -66,7 +67,59 @@ def test_get_publish_span(self): ) def _test_publish(self, exchange_type: Type[Exchange]): - exchange = exchange_type(CONNECTION, CHANNEL, EXCHANGE_NAME) + exchange = exchange_type(CONNECTION_7, CHANNEL, EXCHANGE_NAME) + with mock.patch.object( + PublishDecorator, "_get_publish_span" + ) as mock_get_publish_span: + with mock.patch.object( + Exchange, "publish", return_value=asyncio.sleep(0) + ) as mock_publish: + decorated_publish = PublishDecorator( + self.tracer, exchange + ).decorate(mock_publish) + self.loop.run_until_complete( + decorated_publish(MESSAGE, ROUTING_KEY) + ) + mock_publish.assert_called_once() + mock_get_publish_span.assert_called_once() + + def test_publish(self): + self._test_publish(Exchange) + + def test_robust_publish(self): + self._test_publish(RobustExchange) + + +class TestInstrumentedExchangeAioRmq8(TestCase): + EXPECTED_ATTRIBUTES = { + SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, + SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}", + SpanAttributes.NET_PEER_NAME: SERVER_HOST, + SpanAttributes.NET_PEER_PORT: SERVER_PORT, + SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, + SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID, + SpanAttributes.MESSAGING_TEMP_DESTINATION: True, + } + + def setUp(self): + self.tracer = get_tracer(__name__) + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def test_get_publish_span(self): + exchange = Exchange(CONNECTION_8, CHANNEL, EXCHANGE_NAME) + tracer = mock.MagicMock() + PublishDecorator(tracer, exchange)._get_publish_span( + MESSAGE, ROUTING_KEY + ) + tracer.start_span.assert_called_once_with( + f"{EXCHANGE_NAME},{ROUTING_KEY} send", + kind=SpanKind.PRODUCER, + attributes=self.EXPECTED_ATTRIBUTES, + ) + + def _test_publish(self, exchange_type: Type[Exchange]): + exchange = exchange_type(CONNECTION_8, CHANNEL, EXCHANGE_NAME) with mock.patch.object( PublishDecorator, "_get_publish_span" ) as mock_get_publish_span: