From 4e6bb752d25dce9c499a74009c2a747a3828bac7 Mon Sep 17 00:00:00 2001 From: Gal Date: Mon, 8 Apr 2024 13:35:31 +0300 Subject: [PATCH 01/11] feat(pika): adding support for channel.consume instrumentation --- .../instrumentation/pika/pika_instrumentor.py | 28 ++++++- .../instrumentation/pika/utils.py | 84 ++++++++++++++++++- 2 files changed, 109 insertions(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index 56c78a85c3..76261c89ce 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -20,7 +20,10 @@ import wrapt from packaging import version from pika.adapters import BlockingConnection -from pika.adapters.blocking_connection import BlockingChannel +from pika.adapters.blocking_connection import ( + BlockingChannel, + _QueueConsumerGeneratorInfo, +) from opentelemetry import trace from opentelemetry.instrumentation.instrumentor import BaseInstrumentor @@ -191,6 +194,24 @@ def wrapper(wrapped, instance, args, kwargs): wrapt.wrap_function_wrapper(channel, "basic_consume", wrapper) + @staticmethod + def _decorate_queue_consumer_generator( + tracer_provider: Optional[TracerProvider], + consume_hook: utils.HookT = utils.dummy_callback, + ) -> None: + tracer = trace.get_tracer(__name__, __version__, tracer_provider) + + def wrapper(wrapped, instance, args, kwargs): + res = wrapped(*args, **kwargs) + instance.pending_events = utils.ReadyMessagesDequeProxy( + instance.pending_events, instance, tracer, consume_hook + ) + return res + + wrapt.wrap_function_wrapper( + _QueueConsumerGeneratorInfo, "__init__", wrapper + ) + def _instrument(self, **kwargs: Dict[str, Any]) -> None: tracer_provider: TracerProvider = kwargs.get("tracer_provider", None) publish_hook: utils.HookT = kwargs.get( @@ -207,10 +228,15 @@ def _instrument(self, **kwargs: Dict[str, Any]) -> None: consume_hook=consume_hook, ) + self._decorate_queue_consumer_generator( + tracer_provider, consume_hook=consume_hook + ) + def _uninstrument(self, **kwargs: Dict[str, Any]) -> None: if hasattr(self, "__opentelemetry_tracer_provider"): delattr(self, "__opentelemetry_tracer_provider") unwrap(BlockingConnection, "channel") + unwrap(_QueueConsumerGeneratorInfo, "__init__") def instrumentation_dependencies(self) -> Collection[str]: return _instruments diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py index 6dab4fdfa9..5415c905de 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -1,8 +1,13 @@ from logging import getLogger from typing import Any, Callable, List, Optional +from pika.adapters.blocking_connection import ( + _ConsumerDeliveryEvt, + _QueueConsumerGeneratorInfo, +) from pika.channel import Channel from pika.spec import Basic, BasicProperties +from wrapt import ObjectProxy from opentelemetry import context, propagate, trace from opentelemetry.instrumentation.utils import is_instrumentation_enabled @@ -128,7 +133,7 @@ def decorated_function( def _get_span( tracer: Tracer, - channel: Channel, + channel: Optional[Channel], properties: BasicProperties, task_name: str, destination: str, @@ -157,7 +162,7 @@ def _generate_span_name( def _enrich_span( span: Span, - channel: Channel, + channel: Optional[Channel], properties: BasicProperties, task_destination: str, operation: Optional[MessagingOperationValues] = None, @@ -176,6 +181,8 @@ def _enrich_span( span.set_attribute( SpanAttributes.MESSAGING_CONVERSATION_ID, properties.correlation_id ) + if not channel: + return if not hasattr(channel.connection, "params"): span.set_attribute( SpanAttributes.NET_PEER_NAME, channel.connection._impl.params.host @@ -190,3 +197,76 @@ def _enrich_span( span.set_attribute( SpanAttributes.NET_PEER_PORT, channel.connection.params.port ) + + +class ReadyMessagesDequeProxy(ObjectProxy): + def __init__( + self, + wrapped, + queue_consumer_generator: _QueueConsumerGeneratorInfo, + tracer: Optional[Tracer], + consume_hook: HookT = dummy_callback, + ): + super().__init__(wrapped) + self._self_active_span: Optional[Span] = None + self._self_active_token = None + self._self_tracer = tracer + self._self_consume_hook = consume_hook + self._self_queue_consumer_generator = queue_consumer_generator + + def popleft(self, *args, **kwargs): + try: + # end active context if exists + if self._self_active_token: + context.detach(self._self_active_token) + except Exception as inst_exception: # pylint: disable=W0703 + _LOG.exception(inst_exception) + evt = self.__wrapped__.popleft(*args, **kwargs) + try: + # If a new message was received, create a span and set as active + if type(evt) is _ConsumerDeliveryEvt: + # start span + method = evt.method + properties = evt.properties + if not properties: + properties = BasicProperties(headers={}) + if properties.headers is None: + properties.headers = {} + ctx = propagate.extract( + properties.headers, getter=_pika_getter + ) + if not ctx: + ctx = context.get_current() + message_ctx_token = context.attach(ctx) + self._self_active_span = _get_span( + self._self_tracer, + None, + properties, + destination=method.exchange + if method.exchange + else method.routing_key, + span_kind=SpanKind.CONSUMER, + task_name=self._self_queue_consumer_generator.consumer_tag, + operation=MessagingOperationValues.RECEIVE, + ) + context.detach(message_ctx_token) + self._self_active_token = context.attach( + trace.set_span_in_context(self._self_active_span) + ) + try: + self._self_consume_hook( + self._self_active_span, evt.body, properties + ) + except Exception as hook_exception: # pylint: disable=W0703 + _LOG.exception(hook_exception) + + # We must end the span here, because the next place we can hook + # is not the end of the user code, but only when the next message + # arrives. we still set this span's context as the active context + # so user code that handles this message will co child-spans of + # this one. + self._self_active_span.end() + except Exception as inst_exception: # pylint: disable=W0703 + _LOG.exception(inst_exception) + finally: + return evt From f9ee608cd80e296a0c22f33e8a4300c3dfdcb2b7 Mon Sep 17 00:00:00 2001 From: Gal Date: Mon, 8 Apr 2024 15:02:24 +0300 Subject: [PATCH 02/11] updated changelog --- CHANGELOG.md | 4 ++++ .../src/opentelemetry/instrumentation/pika/utils.py | 4 ++-- .../tests/test_pika_instrumentation.py | 5 +++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c361881fd..24e971e009 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## Unreleased +### Added + +- `opentelemetry-instrumentation-pika` Instrumentation for `channel.consume()` function + ([#2397](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2397))) ### Breaking changes diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py index 5415c905de..8d62be62d9 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -268,5 +268,5 @@ def popleft(self, *args, **kwargs): self._self_active_span.end() except Exception as inst_exception: # pylint: disable=W0703 _LOG.exception(inst_exception) - finally: - return evt + + return evt diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py index 6e154c04f9..799f43f0a6 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -57,11 +57,15 @@ def test_instrument_api(self) -> None: @mock.patch( "opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_blocking_channel_consumers" ) + @mock.patch( + "opentelemetry.instrumentation.pika.PikaInstrumentor._decorate_queue_consumer_generator" + ) def test_instrument( self, instrument_blocking_channel_consumers: mock.MagicMock, instrument_basic_consume: mock.MagicMock, instrument_channel_functions: mock.MagicMock, + instrument_queue_consumer_generator: mock.MagicMock, ): PikaInstrumentor.instrument_channel(channel=self.channel) assert hasattr( @@ -70,6 +74,7 @@ def test_instrument( instrument_blocking_channel_consumers.assert_called_once() instrument_basic_consume.assert_called_once() instrument_channel_functions.assert_called_once() + instrument_queue_consumer_generator.assert_called_once() @mock.patch("opentelemetry.instrumentation.pika.utils._decorate_callback") def test_instrument_consumers( From 81bdcec4f5cbd01ef84bcba0627a38083e0e23a9 Mon Sep 17 00:00:00 2001 From: Gal Date: Mon, 8 Apr 2024 16:37:05 +0300 Subject: [PATCH 03/11] wip tests --- .../tests/test_pika_instrumentation.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py index 799f43f0a6..6e085da5b2 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -14,6 +14,7 @@ from unittest import TestCase, mock from pika.adapters import BlockingConnection +from pika.adapters.blocking_connection import _QueueConsumerGeneratorInfo from pika.channel import Channel from wrapt import BoundFunctionWrapper @@ -40,6 +41,9 @@ def test_instrument_api(self) -> None: self.assertTrue( isinstance(BlockingConnection.channel, BoundFunctionWrapper) ) + self.assertTrue( + isinstance(_QueueConsumerGeneratorInfo.__init__, BoundFunctionWrapper) + ) assert hasattr( instrumentation, "__opentelemetry_tracer_provider" ), "Tracer not stored for the object!" @@ -57,15 +61,11 @@ def test_instrument_api(self) -> None: @mock.patch( "opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_blocking_channel_consumers" ) - @mock.patch( - "opentelemetry.instrumentation.pika.PikaInstrumentor._decorate_queue_consumer_generator" - ) - def test_instrument( + def test_instrument_channel( self, instrument_blocking_channel_consumers: mock.MagicMock, instrument_basic_consume: mock.MagicMock, instrument_channel_functions: mock.MagicMock, - instrument_queue_consumer_generator: mock.MagicMock, ): PikaInstrumentor.instrument_channel(channel=self.channel) assert hasattr( @@ -74,7 +74,6 @@ def test_instrument( instrument_blocking_channel_consumers.assert_called_once() instrument_basic_consume.assert_called_once() instrument_channel_functions.assert_called_once() - instrument_queue_consumer_generator.assert_called_once() @mock.patch("opentelemetry.instrumentation.pika.utils._decorate_callback") def test_instrument_consumers( From 59d3b1b3b73f97b37c43088795e9b180e4734cb9 Mon Sep 17 00:00:00 2001 From: Gal Date: Mon, 8 Apr 2024 16:50:37 +0300 Subject: [PATCH 04/11] updating docs --- CHANGELOG.md | 3 ++- .../src/opentelemetry/instrumentation/pika/__init__.py | 9 +++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 24e971e009..4ad55d7292 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased ### Added -- `opentelemetry-instrumentation-pika` Instrumentation for `channel.consume()` function +- `opentelemetry-instrumentation-pika` Instrumentation for `channel.consume()` (supported + only for global, non channel specific instrumentation) ([#2397](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2397))) ### Breaking changes diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py index c745462cf3..aafcabf389 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py @@ -77,6 +77,15 @@ def consume_hook(span: Span, body: bytes, properties: BasicProperties): PikaInstrumentor.instrument_channel(channel, publish_hook=publish_hook, consume_hook=consume_hook) +Consumer Instrumentation +----- +For consumer instrumentation, pika supports two consuming modes: + +* Consumers using the `basic_consume` method which accepts a callback. This is supported for global instrumentation + (`PikaInstrumentor().instrument()`) as well channel specific instrumentation (`PikaInstrumentor().instrument_channel(channel)`) +* Consumers using the `consume` method which returns a generator over messages. This is supported for global + instrumentations only (`PikaInstrumentor().instrument()`) + API --- """ From 578197b69958e47f49156afbc7e2ad4cfe6b54d0 Mon Sep 17 00:00:00 2001 From: Gal Date: Mon, 8 Apr 2024 17:01:42 +0300 Subject: [PATCH 05/11] more tests --- .../tests/test_pika_instrumentation.py | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py index 6e085da5b2..ad519c4a35 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -22,7 +22,10 @@ from opentelemetry.instrumentation.pika.pika_instrumentor import ( _consumer_callback_attribute_name, ) -from opentelemetry.instrumentation.pika.utils import dummy_callback +from opentelemetry.instrumentation.pika.utils import ( + ReadyMessagesDequeProxy, + dummy_callback, +) from opentelemetry.trace import Tracer @@ -42,15 +45,22 @@ def test_instrument_api(self) -> None: isinstance(BlockingConnection.channel, BoundFunctionWrapper) ) self.assertTrue( - isinstance(_QueueConsumerGeneratorInfo.__init__, BoundFunctionWrapper) + isinstance( + _QueueConsumerGeneratorInfo.__init__, BoundFunctionWrapper + ) ) assert hasattr( instrumentation, "__opentelemetry_tracer_provider" ), "Tracer not stored for the object!" - instrumentation.uninstrument(channel=self.channel) + instrumentation.uninstrument() self.assertFalse( isinstance(BlockingConnection.channel, BoundFunctionWrapper) ) + self.assertFalse( + isinstance( + _QueueConsumerGeneratorInfo.__init__, BoundFunctionWrapper + ) + ) @mock.patch( "opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_functions" @@ -114,6 +124,23 @@ def test_instrument_basic_publish( self.channel.basic_publish, decorate_basic_publish.return_value ) + def test_instrument_queue_consumer_generator(self) -> None: + instrumentation = PikaInstrumentor() + instrumentation.instrument() + generator_info = _QueueConsumerGeneratorInfo( + params=("queue", False, False), consumer_tag="tag" + ) + self.assertTrue( + isinstance(generator_info.pending_events, ReadyMessagesDequeProxy) + ) + instrumentation.uninstrument() + generator_info = _QueueConsumerGeneratorInfo( + params=("queue", False, False), consumer_tag="tag" + ) + self.assertFalse( + isinstance(generator_info.pending_events, ReadyMessagesDequeProxy) + ) + def test_uninstrument_channel_functions(self) -> None: original_function = self.channel.basic_publish self.channel.basic_publish = mock.MagicMock() From 497190e16651ed90674c9cb6a79ae378a81bec70 Mon Sep 17 00:00:00 2001 From: Gal Date: Mon, 8 Apr 2024 17:06:41 +0300 Subject: [PATCH 06/11] removing span member on object proxy --- .../opentelemetry/instrumentation/pika/utils.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py index 8d62be62d9..16be2e5d5a 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -208,7 +208,6 @@ def __init__( consume_hook: HookT = dummy_callback, ): super().__init__(wrapped) - self._self_active_span: Optional[Span] = None self._self_active_token = None self._self_tracer = tracer self._self_consume_hook = consume_hook @@ -221,11 +220,12 @@ def popleft(self, *args, **kwargs): context.detach(self._self_active_token) except Exception as inst_exception: # pylint: disable=W0703 _LOG.exception(inst_exception) + evt = self.__wrapped__.popleft(*args, **kwargs) + try: - # If a new message was received, create a span and set as active + # If a new message was received, create a span and set as active context if type(evt) is _ConsumerDeliveryEvt: - # start span method = evt.method properties = evt.properties if not properties: @@ -238,7 +238,7 @@ def popleft(self, *args, **kwargs): if not ctx: ctx = context.get_current() message_ctx_token = context.attach(ctx) - self._self_active_span = _get_span( + span = _get_span( self._self_tracer, None, properties, @@ -251,12 +251,10 @@ def popleft(self, *args, **kwargs): ) context.detach(message_ctx_token) self._self_active_token = context.attach( - trace.set_span_in_context(self._self_active_span) + trace.set_span_in_context(span) ) try: - self._self_consume_hook( - self._self_active_span, evt.body, properties - ) + self._self_consume_hook(span, evt.body, properties) except Exception as hook_exception: # pylint: disable=W0703 _LOG.exception(hook_exception) @@ -265,7 +263,7 @@ def popleft(self, *args, **kwargs): # arrives. we still set this span's context as the active context # so user code that handles this message will co child-spans of # this one. - self._self_active_span.end() + span.end() except Exception as inst_exception: # pylint: disable=W0703 _LOG.exception(inst_exception) From f198f77ce4aea6b375fa07cda6335df7f5e74e17 Mon Sep 17 00:00:00 2001 From: Gal Date: Mon, 8 Apr 2024 17:53:33 +0300 Subject: [PATCH 07/11] adding test for ReadyMessagesDequeProxy --- .../tests/test_utils.py | 117 ++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py index ed33593389..c09511ee11 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py @@ -11,8 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import collections from unittest import TestCase, mock +from pika.adapters.blocking_connection import ( + _ConsumerCancellationEvt, + _ConsumerDeliveryEvt, + _QueueConsumerGeneratorInfo, +) from pika.channel import Channel from pika.spec import Basic, BasicProperties @@ -448,3 +454,114 @@ def test_decorate_basic_publish_when_span_is_not_recording( exchange_name, routing_key, mock_body, properties, False ) self.assertEqual(retval, callback.return_value) + + @mock.patch("opentelemetry.instrumentation.pika.utils._get_span") + @mock.patch("opentelemetry.propagate.extract") + @mock.patch("opentelemetry.context.detach") + @mock.patch("opentelemetry.context.attach") + @mock.patch("opentelemetry.context.get_current") + def test_decorate_deque_proxy( + self, + context_get_current: mock.MagicMock, + context_attach: mock.MagicMock, + context_detach: mock.MagicMock, + extract: mock.MagicMock, + get_span: mock.MagicMock, + ) -> None: + returned_span = mock.MagicMock() + get_span.return_value = returned_span + consume_hook = mock.MagicMock() + mock_task_name = "mock_task_name" + tracer = mock.MagicMock() + generator_info = mock.MagicMock( + spec=_QueueConsumerGeneratorInfo, + pending_events=mock.MagicMock(spec=collections.deque), + consumer_tag=mock_task_name, + ) + method = mock.MagicMock(spec=Basic.Deliver) + method.exchange = "test_exchange" + properties = mock.MagicMock() + mock_body = b"mock_body" + evt = _ConsumerDeliveryEvt(method, properties, mock_body) + generator_info.pending_events.popleft.return_value = evt + proxy = utils.ReadyMessagesDequeProxy( + generator_info.pending_events, generator_info, tracer, consume_hook + ) + + # First call (no detach cleanup) + res = proxy.popleft() + self.assertEqual(res, evt) + generator_info.pending_events.popleft.assert_called_once() + extract.assert_called_once_with( + properties.headers, getter=utils._pika_getter + ) + context_get_current.assert_called_once() + self.assertEqual(context_attach.call_count, 2) + self.assertEqual(context_detach.call_count, 1) + get_span.assert_called_once_with( + tracer, + None, + properties, + destination=method.exchange, + span_kind=SpanKind.CONSUMER, + task_name=mock_task_name, + operation=MessagingOperationValues.RECEIVE, + ) + consume_hook.assert_called_once() + returned_span.end.assert_called_once() + + generator_info.pending_events.reset_mock() + extract.reset_mock() + context_get_current.reset_mock() + get_span.reset_mock() + context_attach.reset_mock() + context_detach.reset_mock() + returned_span.end.reset_mock() + consume_hook.reset_mock() + + # Second call (has detach cleanup) + res = proxy.popleft() + self.assertEqual(res, evt) + generator_info.pending_events.popleft.assert_called_once() + extract.assert_called_once_with( + properties.headers, getter=utils._pika_getter + ) + context_get_current.assert_called_once() + self.assertEqual(context_attach.call_count, 2) + self.assertEqual(context_detach.call_count, 2) + get_span.assert_called_once_with( + tracer, + None, + properties, + destination=method.exchange, + span_kind=SpanKind.CONSUMER, + task_name=mock_task_name, + operation=MessagingOperationValues.RECEIVE, + ) + consume_hook.assert_called_once() + returned_span.end.assert_called_once() + generator_info.pending_events.reset_mock() + + extract.reset_mock() + context_get_current.reset_mock() + get_span.reset_mock() + context_attach.reset_mock() + context_detach.reset_mock() + returned_span.end.reset_mock() + consume_hook.reset_mock() + + # Third call (cancellation event) + evt = _ConsumerCancellationEvt("") + generator_info.pending_events.popleft.return_value = evt + + res = proxy.popleft() + + self.assertEqual(res, evt) + generator_info.pending_events.popleft.assert_called_once() + extract.assert_not_called() + context_get_current.not_called() + context_detach.assert_called_once() + context_attach.assert_not_called() + get_span.assert_not_called() + consume_hook.assert_not_called() + returned_span.end.assert_not_called() From 08299da32c86e109fd39bb1e68a8ff6c281193f1 Mon Sep 17 00:00:00 2001 From: Gal Date: Mon, 8 Apr 2024 18:11:53 +0300 Subject: [PATCH 08/11] adding tests --- .../src/opentelemetry/instrumentation/pika/utils.py | 3 ++- .../tests/test_utils.py | 11 +++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py index 16be2e5d5a..c4e7b2e1e8 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -199,6 +199,7 @@ def _enrich_span( ) +# pylint:disable=abstract-method class ReadyMessagesDequeProxy(ObjectProxy): def __init__( self, @@ -225,7 +226,7 @@ def popleft(self, *args, **kwargs): try: # If a new message was received, create a span and set as active context - if type(evt) is _ConsumerDeliveryEvt: + if isinstance(evt, _ConsumerDeliveryEvt): method = evt.method properties = evt.properties if not properties: diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py index c09511ee11..d651ea64c9 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py @@ -455,6 +455,7 @@ def test_decorate_basic_publish_when_span_is_not_recording( ) self.assertEqual(retval, callback.return_value) + # pylint: disable=too-many-statements @mock.patch("opentelemetry.instrumentation.pika.utils._get_span") @mock.patch("opentelemetry.propagate.extract") @mock.patch("opentelemetry.context.detach") @@ -471,18 +472,16 @@ def test_decorate_deque_proxy( returned_span = mock.MagicMock() get_span.return_value = returned_span consume_hook = mock.MagicMock() - mock_task_name = "mock_task_name" tracer = mock.MagicMock() generator_info = mock.MagicMock( spec=_QueueConsumerGeneratorInfo, pending_events=mock.MagicMock(spec=collections.deque), - consumer_tag=mock_task_name, + consumer_tag="mock_task_name", ) method = mock.MagicMock(spec=Basic.Deliver) method.exchange = "test_exchange" properties = mock.MagicMock() - mock_body = b"mock_body" - evt = _ConsumerDeliveryEvt(method, properties, mock_body) + evt = _ConsumerDeliveryEvt(method, properties, b"mock_body") generator_info.pending_events.popleft.return_value = evt proxy = utils.ReadyMessagesDequeProxy( generator_info.pending_events, generator_info, tracer, consume_hook @@ -504,7 +503,7 @@ def test_decorate_deque_proxy( properties, destination=method.exchange, span_kind=SpanKind.CONSUMER, - task_name=mock_task_name, + task_name=generator_info.consumer_tag, operation=MessagingOperationValues.RECEIVE, ) consume_hook.assert_called_once() @@ -535,7 +534,7 @@ def test_decorate_deque_proxy( properties, destination=method.exchange, span_kind=SpanKind.CONSUMER, - task_name=mock_task_name, + task_name=generator_info.consumer_tag, operation=MessagingOperationValues.RECEIVE, ) consume_hook.assert_called_once() From 25b52dd4789a9f9ecb685fee4e7d58674528aa5f Mon Sep 17 00:00:00 2001 From: Gal Date: Mon, 8 Apr 2024 18:14:37 +0300 Subject: [PATCH 09/11] better comment on span.end() --- .../src/opentelemetry/instrumentation/pika/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py index c4e7b2e1e8..f28e30ff7c 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -262,8 +262,8 @@ def popleft(self, *args, **kwargs): # We must end the span here, because the next place we can hook # is not the end of the user code, but only when the next message # arrives. we still set this span's context as the active context - # so user code that handles this message will co child-spans of - # this one. + # so spans created by user code that handles this message will be + # children of this one. span.end() except Exception as inst_exception: # pylint: disable=W0703 _LOG.exception(inst_exception) From 82863576987e75de2cb36b9174b62647af80c85c Mon Sep 17 00:00:00 2001 From: Gal Date: Mon, 8 Apr 2024 18:27:48 +0300 Subject: [PATCH 10/11] fixing docs --- .../src/opentelemetry/instrumentation/pika/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py index aafcabf389..d9cec06525 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py @@ -78,7 +78,7 @@ def consume_hook(span: Span, body: bytes, properties: BasicProperties): PikaInstrumentor.instrument_channel(channel, publish_hook=publish_hook, consume_hook=consume_hook) Consumer Instrumentation ------ +------------------------ For consumer instrumentation, pika supports two consuming modes: * Consumers using the `basic_consume` method which accepts a callback. This is supported for global instrumentation From 2c631ebbc6968c13fc7bf59c9d24e401ccc5a71d Mon Sep 17 00:00:00 2001 From: Gal Date: Mon, 8 Apr 2024 19:02:42 +0300 Subject: [PATCH 11/11] ending span even on exceptions --- .../instrumentation/pika/utils.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py index f28e30ff7c..5afa5d9ee6 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -250,21 +250,21 @@ def popleft(self, *args, **kwargs): task_name=self._self_queue_consumer_generator.consumer_tag, operation=MessagingOperationValues.RECEIVE, ) - context.detach(message_ctx_token) - self._self_active_token = context.attach( - trace.set_span_in_context(span) - ) try: + context.detach(message_ctx_token) + self._self_active_token = context.attach( + trace.set_span_in_context(span) + ) self._self_consume_hook(span, evt.body, properties) except Exception as hook_exception: # pylint: disable=W0703 _LOG.exception(hook_exception) - - # We must end the span here, because the next place we can hook - # is not the end of the user code, but only when the next message - # arrives. we still set this span's context as the active context - # so spans created by user code that handles this message will be - # children of this one. - span.end() + finally: + # We must end the span here, because the next place we can hook + # is not the end of the user code, but only when the next message + # arrives. we still set this span's context as the active context + # so spans created by user code that handles this message will be + # children of this one. + span.end() except Exception as inst_exception: # pylint: disable=W0703 _LOG.exception(inst_exception)