Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support aio_pika 8.x #1481

Merged
merged 5 commits into from
Feb 5, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1553](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1553))
- `opentelemetry/sdk/extension/aws` Implement [`aws.ecs.*`](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/cloud_provider/aws/ecs.md) and [`aws.logs.*`](https://opentelemetry.io/docs/reference/specification/resource/semantic_conventions/cloud_provider/aws/logs/) resource attributes in the `AwsEcsResourceDetector` detector when the ECS Metadata v4 is available
([#1212](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1212))
- `opentelemetry-instrumentation-aio-pika` Support `aio_pika` 8.x
([#1481](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1481))

### Fixed

Expand Down
2 changes: 1 addition & 1 deletion instrumentation/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

| Instrumentation | Supported Packages | Metrics support |
| --------------- | ------------------ | --------------- |
| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika ~= 7.2.0 | No
| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika >= 7.2.0, < 9.0.0 | No
| [opentelemetry-instrumentation-aiohttp-client](./opentelemetry-instrumentation-aiohttp-client) | aiohttp ~= 3.0 | No
| [opentelemetry-instrumentation-aiopg](./opentelemetry-instrumentation-aiopg) | aiopg >= 0.13.0, < 2.0.0 | No
| [opentelemetry-instrumentation-asgi](./opentelemetry-instrumentation-asgi) | asgiref ~= 3.0 | No
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dependencies = [

[project.optional-dependencies]
instruments = [
"aio_pika ~= 7.2.0",
"aio_pika >= 7.2.0, < 9.0.0",
]
test = [
"opentelemetry-instrumentation-aio-pika[instruments]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# limitations under the License.
from typing import Collection

_instruments: Collection[str] = ("aio_pika ~= 7.2.0",)
_instruments: Collection[str] = ("aio_pika >= 7.2.0, < 9.0.0",)
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
)
from opentelemetry.trace import Span, SpanKind, Tracer

_DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: 'rabbitmq'}
_DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: "rabbitmq"}


class SpanBuilder:
Expand All @@ -49,18 +49,30 @@ def set_destination(self, destination: str):
self._attributes[SpanAttributes.MESSAGING_DESTINATION] = destination

def set_channel(self, channel: AbstractChannel):
url = channel.connection.connection.url
self._attributes.update({
SpanAttributes.NET_PEER_NAME: url.host,
SpanAttributes.NET_PEER_PORT: url.port
})
connection = channel.connection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice that this change shouldn't be necessary - the constructor of Exchange doesn't get the connection as its first argument anymore (mosquito/aio-pika@c2ee4be). If you take that into consideration, this change would not be necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nozik I'm not sure I'm following, could you be a bit more specific with what you would think this set_channel method needs to change to if not what I have here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, over in https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1605/files#diff-500514cf8be14e4864fdf49daa4c761c0b1ad0c358cec84b4e26a2c62c1c5dffR57.

That change you made there is incomplete. The change I made here is necessary for aiopika 8.x, the call here fails otherwise:

url = channel.connection.connection.url

In your PR, you are still using an aio_pika v7 mock that assumes a property path from channel.connection.connection.url. In aio_pika v8, the extra .connection is removed, and it's just channel.connection.url.

The change occurred actually in the same commit you linked, these lines

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phillipuniverse Notice here the changes to exchange.py - the Exchange class no longer receives AbstractConnection as its first constructor argument. However, in test_get_publish_span we're still sending wrongfully sending it as the first argument. If you just check the condition of the aio-pika version and adapt the creation of the Exchange (simply remove the first ctor argument for version >= 8.0.0), all the other code changes will become redundant. I hope this helps.

Copy link
Contributor Author

@phillipuniverse phillipuniverse Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 changes that needed to happen to support aio_pika 8:

  1. The change you are describing to the constructor in the test as aio_pika 8 changed the signature
  2. The change in this method to deal with the changed shape of AbstractConnection

There aren’t good integration tests for aio_pika yet in this repo but I observed the problem in this method in my integration environment after patching the instrumentation version check to indiscriminately apply to aio_pika 8

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi there,
I'm also looking for support of aio_pika 8 so I wonder, when optimistically this PR could be merged and released? Thank you in advance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This instrumentation is supported and maintained by community members. Maintainers don't have any expertise in this instrumentation to judge the changes. The approval from the component owner gets it merged immediately and will be part of the next scheduled release.

if getattr(connection, "connection", None):
# aio_rmq 7
url = connection.connection.url
else:
# aio_rmq 8
url = connection.url
self._attributes.update(
{
SpanAttributes.NET_PEER_NAME: url.host,
SpanAttributes.NET_PEER_PORT: url.port,
}
)

def set_message(self, message: AbstractMessage):
properties = message.properties
if properties.message_id:
self._attributes[SpanAttributes.MESSAGING_MESSAGE_ID] = properties.message_id
self._attributes[
SpanAttributes.MESSAGING_MESSAGE_ID
] = properties.message_id
if properties.correlation_id:
self._attributes[SpanAttributes.MESSAGING_CONVERSATION_ID] = properties.correlation_id
self._attributes[
SpanAttributes.MESSAGING_CONVERSATION_ID
] = properties.correlation_id

def build(self) -> Optional[Span]:
if not is_instrumentation_enabled():
Expand All @@ -69,9 +81,11 @@ def build(self) -> Optional[Span]:
self._attributes[SpanAttributes.MESSAGING_OPERATION] = self._operation.value
else:
self._attributes[SpanAttributes.MESSAGING_TEMP_DESTINATION] = True
span = self._tracer.start_span(self._generate_span_name(), kind=self._kind, attributes=self._attributes)
span = self._tracer.start_span(
self._generate_span_name(), kind=self._kind, attributes=self._attributes
)
return span

def _generate_span_name(self) -> str:
operation_value = self._operation.value if self._operation else 'send'
return f'{self._destination} {operation_value}'
operation_value = self._operation.value if self._operation else "send"
return f"{self._destination} {operation_value}"
Comment on lines -76 to +91
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how this was passing the linter before. This (and other related changes) are the result of me:

  1. Modifying tox.ini to remove the --check-only flag to eachdist.py
  2. Run tox -e lint

Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
SERVER_URL = URL(
f"amqp://{SERVER_USER}:{SERVER_PASS}@{SERVER_HOST}:{SERVER_PORT}/"
)
CONNECTION = Namespace(connection=Namespace(url=SERVER_URL))
CHANNEL = Namespace(connection=CONNECTION, loop=None)
CONNECTION_7 = Namespace(connection=Namespace(url=SERVER_URL))
CONNECTION_8 = Namespace(url=SERVER_URL)
Comment on lines +18 to +19
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This namespace was a change between 7 and 8

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nozik this is the namespace change I was describing above

CHANNEL_7 = Namespace(connection=CONNECTION_7, loop=None)
CHANNEL_8 = Namespace(connection=CONNECTION_8, loop=None)
Comment on lines +18 to +21
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a great situation to completely mock this. Probably the correct evolution is to add a docker test that ensures that we can really publish and consume with both aio_pika 7 and 8. I can add that if required, but it's a little more involved.

MESSAGE = Namespace(
properties=Namespace(
message_id=MESSAGE_ID, correlation_id=CORRELATION_ID, headers={}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from unittest import TestCase, mock
from unittest import TestCase, mock, skipIf

from aio_pika import Queue
from aio_pika import Queue, version_info

from opentelemetry.instrumentation.aio_pika.callback_decorator import (
CallbackDecorator,
Expand All @@ -23,7 +23,8 @@
from opentelemetry.trace import SpanKind, get_tracer

from .consts import (
CHANNEL,
CHANNEL_7,
CHANNEL_8,
CORRELATION_ID,
EXCHANGE_NAME,
MESSAGE,
Expand All @@ -35,7 +36,8 @@
)


class TestInstrumentedQueue(TestCase):
@skipIf(version_info >= (8, 0), "Only for aio_pika 7")
class TestInstrumentedQueueAioRmq7(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME,
Expand All @@ -52,7 +54,7 @@ def setUp(self):
asyncio.set_event_loop(self.loop)

def test_get_callback_span(self):
queue = Queue(CHANNEL, QUEUE_NAME, False, False, False, None)
queue = Queue(CHANNEL_7, QUEUE_NAME, False, False, False, None)
tracer = mock.MagicMock()
CallbackDecorator(tracer, queue)._get_span(MESSAGE)
tracer.start_span.assert_called_once_with(
Expand All @@ -62,7 +64,47 @@ def test_get_callback_span(self):
)

def test_decorate_callback(self):
queue = Queue(CHANNEL, QUEUE_NAME, False, False, False, None)
queue = Queue(CHANNEL_7, QUEUE_NAME, False, False, False, None)
callback = mock.MagicMock(return_value=asyncio.sleep(0))
with mock.patch.object(
CallbackDecorator, "_get_span"
) as mocked_get_callback_span:
callback_decorator = CallbackDecorator(self.tracer, queue)
decorated_callback = callback_decorator.decorate(callback)
self.loop.run_until_complete(decorated_callback(MESSAGE))
mocked_get_callback_span.assert_called_once()
callback.assert_called_once_with(MESSAGE)


@skipIf(version_info <= (8, 0), "Only for aio_pika 8")
class TestInstrumentedQueueAioRmq8(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME,
SpanAttributes.NET_PEER_NAME: SERVER_HOST,
SpanAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID,
SpanAttributes.MESSAGING_OPERATION: "receive",
}

def setUp(self):
self.tracer = get_tracer(__name__)
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def test_get_callback_span(self):
queue = Queue(CHANNEL_8, QUEUE_NAME, False, False, False, None)
tracer = mock.MagicMock()
CallbackDecorator(tracer, queue)._get_span(MESSAGE)
tracer.start_span.assert_called_once_with(
f"{EXCHANGE_NAME} receive",
kind=SpanKind.CONSUMER,
attributes=self.EXPECTED_ATTRIBUTES,
)

def test_decorate_callback(self):
queue = Queue(CHANNEL_8, QUEUE_NAME, False, False, False, None)
callback = mock.MagicMock(return_value=asyncio.sleep(0))
with mock.patch.object(
CallbackDecorator, "_get_span"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
# limitations under the License.
import asyncio
from typing import Type
from unittest import TestCase, mock
from unittest import TestCase, mock, skipIf

from aio_pika import Exchange, RobustExchange
from aio_pika import Exchange, RobustExchange, version_info

from opentelemetry.instrumentation.aio_pika.publish_decorator import (
PublishDecorator,
Expand All @@ -24,8 +24,10 @@
from opentelemetry.trace import SpanKind, get_tracer

from .consts import (
CHANNEL,
CONNECTION,
CHANNEL_7,
CHANNEL_8,
CONNECTION_7,
CONNECTION_8,
CORRELATION_ID,
EXCHANGE_NAME,
MESSAGE,
Expand All @@ -37,7 +39,8 @@
)


class TestInstrumentedExchange(TestCase):
@skipIf(version_info >= (8, 0), "Only for aio_pika 7")
class TestInstrumentedExchangeAioRmq7(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}",
Expand All @@ -54,7 +57,7 @@ def setUp(self):
asyncio.set_event_loop(self.loop)

def test_get_publish_span(self):
exchange = Exchange(CONNECTION, CHANNEL, EXCHANGE_NAME)
exchange = Exchange(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME)
tracer = mock.MagicMock()
PublishDecorator(tracer, exchange)._get_publish_span(
MESSAGE, ROUTING_KEY
Expand All @@ -66,7 +69,60 @@ def test_get_publish_span(self):
)

def _test_publish(self, exchange_type: Type[Exchange]):
exchange = exchange_type(CONNECTION, CHANNEL, EXCHANGE_NAME)
exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME)
with mock.patch.object(
PublishDecorator, "_get_publish_span"
) as mock_get_publish_span:
with mock.patch.object(
Exchange, "publish", return_value=asyncio.sleep(0)
) as mock_publish:
decorated_publish = PublishDecorator(
self.tracer, exchange
).decorate(mock_publish)
self.loop.run_until_complete(
decorated_publish(MESSAGE, ROUTING_KEY)
)
mock_publish.assert_called_once()
mock_get_publish_span.assert_called_once()

def test_publish(self):
self._test_publish(Exchange)

def test_robust_publish(self):
self._test_publish(RobustExchange)


@skipIf(version_info <= (8, 0), "Only for aio_pika 8")
class TestInstrumentedExchangeAioRmq8(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}",
SpanAttributes.NET_PEER_NAME: SERVER_HOST,
SpanAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID,
SpanAttributes.MESSAGING_TEMP_DESTINATION: True,
}

def setUp(self):
self.tracer = get_tracer(__name__)
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def test_get_publish_span(self):
exchange = Exchange(CHANNEL_8, EXCHANGE_NAME)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Exchange constructor was a change between 7 and 8, and this is also what prompted me to decorate with skipIf.

tracer = mock.MagicMock()
PublishDecorator(tracer, exchange)._get_publish_span(
MESSAGE, ROUTING_KEY
)
tracer.start_span.assert_called_once_with(
f"{EXCHANGE_NAME},{ROUTING_KEY} send",
kind=SpanKind.PRODUCER,
attributes=self.EXPECTED_ATTRIBUTES,
)

def _test_publish(self, exchange_type: Type[Exchange]):
exchange = exchange_type(CONNECTION_8, CHANNEL_8, EXCHANGE_NAME)
with mock.patch.object(
PublishDecorator, "_get_publish_span"
) as mock_get_publish_span:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ class TestBuilder(TestCase):
def test_build(self):
builder = SpanBuilder(get_tracer(__name__))
builder.set_as_consumer()
builder.set_destination('destination')
builder.set_destination("destination")
span = builder.build()
self.assertTrue(isinstance(span, Span))
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

libraries = {
"aio_pika": {
"library": "aio_pika ~= 7.2.0",
"library": "aio_pika >= 7.2.0, < 9.0.0",
"instrumentation": "opentelemetry-instrumentation-aio-pika==0.37b0.dev",
},
"aiohttp": {
Expand Down
9 changes: 9 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ envlist =
py3{7,8,9,10,11}-test-instrumentation-pika{0,1}
pypy3-test-instrumentation-pika{0,1}

; opentelemetry-instrumentation-aio-pika
py3{7,8,9,10,11}-test-instrumentation-aio-pika{7,8}
pypy3-test-instrumentation-aio-pika{7,8}

; opentelemetry-instrumentation-kafka-python
py3{7,8,9,10,11}-test-instrumentation-kafka-python
pypy3-test-instrumentation-kafka-python
Expand Down Expand Up @@ -250,6 +254,8 @@ deps =
sqlalchemy14: sqlalchemy~=1.4
pika0: pika>=0.12.0,<1.0.0
pika1: pika>=1.0.0
aio-pika7: aio_pika~=7.2.0
aio-pika8: aio_pika>=8.0.0,<9.0.0
pymemcache135: pymemcache ==1.3.5
pymemcache200: pymemcache >2.0.0,<3.0.0
pymemcache300: pymemcache >3.0.0,<3.4.2
Expand Down Expand Up @@ -296,6 +302,7 @@ changedir =
test-instrumentation-logging: instrumentation/opentelemetry-instrumentation-logging/tests
test-instrumentation-mysql: instrumentation/opentelemetry-instrumentation-mysql/tests
test-instrumentation-pika{0,1}: instrumentation/opentelemetry-instrumentation-pika/tests
test-instrumentation-aio-pika{7,8}: instrumentation/opentelemetry-instrumentation-aio-pika/tests
test-instrumentation-psycopg2: instrumentation/opentelemetry-instrumentation-psycopg2/tests
test-instrumentation-pymemcache{135,200,300,342}: instrumentation/opentelemetry-instrumentation-pymemcache/tests
test-instrumentation-pymongo: instrumentation/opentelemetry-instrumentation-pymongo/tests
Expand Down Expand Up @@ -337,6 +344,8 @@ commands_pre =

pika{0,1}: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test]

aio-pika{7,8}: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-aio-pika[test]

kafka-python: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-kafka-python[test]

confluent-kafka: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-confluent-kafka[test]
Expand Down