Skip to content

Commit

Permalink
Support aio_pika 8-style Connection class shape
Browse files Browse the repository at this point in the history
  • Loading branch information
phillipuniverse committed Jan 16, 2023
1 parent 647badc commit 9bd55b1
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 8 deletions.
4 changes: 2 additions & 2 deletions instrumentation/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

| Instrumentation | Supported Packages | Metrics support |
| --------------- | ------------------ | --------------- |
| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika ~= 7.2.0 | No
| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika >= 7.2.0 | No
| [opentelemetry-instrumentation-aiohttp-client](./opentelemetry-instrumentation-aiohttp-client) | aiohttp ~= 3.0 | No
| [opentelemetry-instrumentation-aiopg](./opentelemetry-instrumentation-aiopg) | aiopg >= 0.13.0, < 2.0.0 | No
| [opentelemetry-instrumentation-asgi](./opentelemetry-instrumentation-asgi) | asgiref ~= 3.0 | No
Expand Down Expand Up @@ -42,4 +42,4 @@
| [opentelemetry-instrumentation-tortoiseorm](./opentelemetry-instrumentation-tortoiseorm) | tortoise-orm >= 0.17.0 | No
| [opentelemetry-instrumentation-urllib](./opentelemetry-instrumentation-urllib) | urllib | Yes
| [opentelemetry-instrumentation-urllib3](./opentelemetry-instrumentation-urllib3) | urllib3 >= 1.0.0, < 2.0.0 | Yes
| [opentelemetry-instrumentation-wsgi](./opentelemetry-instrumentation-wsgi) | wsgi | Yes
| [opentelemetry-instrumentation-wsgi](./opentelemetry-instrumentation-wsgi) | wsgi | Yes
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ def set_destination(self, destination: str):
self._attributes[SpanAttributes.MESSAGING_DESTINATION] = destination

def set_channel(self, channel: AbstractChannel):
url = channel.connection.connection.url
connection = channel.connection
if getattr(connection, "connection"):
# aio_rmq 7
url = connection.connection.url
else:
# aio_rmq 8
url = connection.url
self._attributes.update({
SpanAttributes.NET_PEER_NAME: url.host,
SpanAttributes.NET_PEER_PORT: url.port
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
SERVER_URL = URL(
f"amqp://{SERVER_USER}:{SERVER_PASS}@{SERVER_HOST}:{SERVER_PORT}/"
)
CONNECTION = Namespace(connection=Namespace(url=SERVER_URL))
CONNECTION_7 = Namespace(connection=Namespace(url=SERVER_URL))
CONNECTION_8 = Namespace(connection=Namespace(url=SERVER_URL))
CHANNEL = Namespace(connection=CONNECTION, loop=None)
MESSAGE = Namespace(
properties=Namespace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

from .consts import (
CHANNEL,
CONNECTION,
CONNECTION_7,
CONNECTION_8,
CORRELATION_ID,
EXCHANGE_NAME,
MESSAGE,
Expand All @@ -37,7 +38,7 @@
)


class TestInstrumentedExchange(TestCase):
class TestInstrumentedExchangeAioRmq7(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}",
Expand All @@ -54,7 +55,7 @@ def setUp(self):
asyncio.set_event_loop(self.loop)

def test_get_publish_span(self):
exchange = Exchange(CONNECTION, CHANNEL, EXCHANGE_NAME)
exchange = Exchange(CONNECTION_7, CHANNEL, EXCHANGE_NAME)
tracer = mock.MagicMock()
PublishDecorator(tracer, exchange)._get_publish_span(
MESSAGE, ROUTING_KEY
Expand All @@ -66,7 +67,59 @@ def test_get_publish_span(self):
)

def _test_publish(self, exchange_type: Type[Exchange]):
exchange = exchange_type(CONNECTION, CHANNEL, EXCHANGE_NAME)
exchange = exchange_type(CONNECTION_7, CHANNEL, EXCHANGE_NAME)
with mock.patch.object(
PublishDecorator, "_get_publish_span"
) as mock_get_publish_span:
with mock.patch.object(
Exchange, "publish", return_value=asyncio.sleep(0)
) as mock_publish:
decorated_publish = PublishDecorator(
self.tracer, exchange
).decorate(mock_publish)
self.loop.run_until_complete(
decorated_publish(MESSAGE, ROUTING_KEY)
)
mock_publish.assert_called_once()
mock_get_publish_span.assert_called_once()

def test_publish(self):
self._test_publish(Exchange)

def test_robust_publish(self):
self._test_publish(RobustExchange)


class TestInstrumentedExchangeAioRmq8(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}",
SpanAttributes.NET_PEER_NAME: SERVER_HOST,
SpanAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID,
SpanAttributes.MESSAGING_TEMP_DESTINATION: True,
}

def setUp(self):
self.tracer = get_tracer(__name__)
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def test_get_publish_span(self):
exchange = Exchange(CONNECTION_8, CHANNEL, EXCHANGE_NAME)
tracer = mock.MagicMock()
PublishDecorator(tracer, exchange)._get_publish_span(
MESSAGE, ROUTING_KEY
)
tracer.start_span.assert_called_once_with(
f"{EXCHANGE_NAME},{ROUTING_KEY} send",
kind=SpanKind.PRODUCER,
attributes=self.EXPECTED_ATTRIBUTES,
)

def _test_publish(self, exchange_type: Type[Exchange]):
exchange = exchange_type(CONNECTION_8, CHANNEL, EXCHANGE_NAME)
with mock.patch.object(
PublishDecorator, "_get_publish_span"
) as mock_get_publish_span:
Expand Down

0 comments on commit 9bd55b1

Please sign in to comment.