Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pika): adding support for channel.consume instrumentation #2397

Merged
merged 14 commits into from
Apr 21, 2024
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added

- `opentelemetry-instrumentation-pika` Instrumentation for `channel.consume()` (supported
only for global, non channel specific instrumentation)
([#2397](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2397)))


### Breaking changes

- Rename `type` attribute to `asgi.event.type` in `opentelemetry-instrumentation-asgi`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ def consume_hook(span: Span, body: bytes, properties: BasicProperties):

PikaInstrumentor.instrument_channel(channel, publish_hook=publish_hook, consume_hook=consume_hook)

Consumer Instrumentation
------------------------
For consumer instrumentation, pika supports two consuming modes:

* Consumers using the `basic_consume` method which accepts a callback. This is supported for global instrumentation
(`PikaInstrumentor().instrument()`) as well channel specific instrumentation (`PikaInstrumentor().instrument_channel(channel)`)
* Consumers using the `consume` method which returns a generator over messages. This is supported for global
instrumentations only (`PikaInstrumentor().instrument()`)

API
---
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import wrapt
from packaging import version
from pika.adapters import BlockingConnection
from pika.adapters.blocking_connection import BlockingChannel
from pika.adapters.blocking_connection import (
BlockingChannel,
_QueueConsumerGeneratorInfo,
)

from opentelemetry import trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
Expand Down Expand Up @@ -191,6 +194,24 @@ def wrapper(wrapped, instance, args, kwargs):

wrapt.wrap_function_wrapper(channel, "basic_consume", wrapper)

@staticmethod
def _decorate_queue_consumer_generator(
tracer_provider: Optional[TracerProvider],
consume_hook: utils.HookT = utils.dummy_callback,
) -> None:
tracer = trace.get_tracer(__name__, __version__, tracer_provider)

def wrapper(wrapped, instance, args, kwargs):
res = wrapped(*args, **kwargs)
instance.pending_events = utils.ReadyMessagesDequeProxy(
instance.pending_events, instance, tracer, consume_hook
)
return res

wrapt.wrap_function_wrapper(
_QueueConsumerGeneratorInfo, "__init__", wrapper
)

def _instrument(self, **kwargs: Dict[str, Any]) -> None:
tracer_provider: TracerProvider = kwargs.get("tracer_provider", None)
publish_hook: utils.HookT = kwargs.get(
Expand All @@ -207,10 +228,15 @@ def _instrument(self, **kwargs: Dict[str, Any]) -> None:
consume_hook=consume_hook,
)

self._decorate_queue_consumer_generator(
tracer_provider, consume_hook=consume_hook
)

def _uninstrument(self, **kwargs: Dict[str, Any]) -> None:
if hasattr(self, "__opentelemetry_tracer_provider"):
delattr(self, "__opentelemetry_tracer_provider")
unwrap(BlockingConnection, "channel")
unwrap(_QueueConsumerGeneratorInfo, "__init__")

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from logging import getLogger
from typing import Any, Callable, List, Optional

from pika.adapters.blocking_connection import (
_ConsumerDeliveryEvt,
_QueueConsumerGeneratorInfo,
)
from pika.channel import Channel
from pika.spec import Basic, BasicProperties
from wrapt import ObjectProxy

from opentelemetry import context, propagate, trace
from opentelemetry.instrumentation.utils import is_instrumentation_enabled
Expand Down Expand Up @@ -128,7 +133,7 @@ def decorated_function(

def _get_span(
tracer: Tracer,
channel: Channel,
channel: Optional[Channel],
properties: BasicProperties,
task_name: str,
destination: str,
Expand Down Expand Up @@ -157,7 +162,7 @@ def _generate_span_name(

def _enrich_span(
span: Span,
channel: Channel,
channel: Optional[Channel],
properties: BasicProperties,
task_destination: str,
operation: Optional[MessagingOperationValues] = None,
Expand All @@ -176,6 +181,8 @@ def _enrich_span(
span.set_attribute(
SpanAttributes.MESSAGING_CONVERSATION_ID, properties.correlation_id
)
if not channel:
return
if not hasattr(channel.connection, "params"):
span.set_attribute(
SpanAttributes.NET_PEER_NAME, channel.connection._impl.params.host
Expand All @@ -190,3 +197,75 @@ def _enrich_span(
span.set_attribute(
SpanAttributes.NET_PEER_PORT, channel.connection.params.port
)


# pylint:disable=abstract-method
class ReadyMessagesDequeProxy(ObjectProxy):
def __init__(
self,
wrapped,
queue_consumer_generator: _QueueConsumerGeneratorInfo,
tracer: Optional[Tracer],
consume_hook: HookT = dummy_callback,
):
super().__init__(wrapped)
self._self_active_token = None
self._self_tracer = tracer
self._self_consume_hook = consume_hook
self._self_queue_consumer_generator = queue_consumer_generator

def popleft(self, *args, **kwargs):
try:
# end active context if exists
if self._self_active_token:
context.detach(self._self_active_token)
except Exception as inst_exception: # pylint: disable=W0703
_LOG.exception(inst_exception)

evt = self.__wrapped__.popleft(*args, **kwargs)

try:
# If a new message was received, create a span and set as active context
if isinstance(evt, _ConsumerDeliveryEvt):
method = evt.method
properties = evt.properties
if not properties:
properties = BasicProperties(headers={})
if properties.headers is None:
properties.headers = {}
ctx = propagate.extract(
properties.headers, getter=_pika_getter
)
if not ctx:
ctx = context.get_current()
message_ctx_token = context.attach(ctx)
span = _get_span(
self._self_tracer,
None,
properties,
destination=method.exchange
if method.exchange
else method.routing_key,
span_kind=SpanKind.CONSUMER,
task_name=self._self_queue_consumer_generator.consumer_tag,
operation=MessagingOperationValues.RECEIVE,
)
try:
context.detach(message_ctx_token)
self._self_active_token = context.attach(
trace.set_span_in_context(span)
)
self._self_consume_hook(span, evt.body, properties)
except Exception as hook_exception: # pylint: disable=W0703
_LOG.exception(hook_exception)
finally:
# We must end the span here, because the next place we can hook
# is not the end of the user code, but only when the next message
# arrives. we still set this span's context as the active context
# so spans created by user code that handles this message will be
# children of this one.
span.end()
except Exception as inst_exception: # pylint: disable=W0703
_LOG.exception(inst_exception)

return evt
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@
from unittest import TestCase, mock

from pika.adapters import BlockingConnection
from pika.adapters.blocking_connection import _QueueConsumerGeneratorInfo
from pika.channel import Channel
from wrapt import BoundFunctionWrapper

from opentelemetry.instrumentation.pika import PikaInstrumentor
from opentelemetry.instrumentation.pika.pika_instrumentor import (
_consumer_callback_attribute_name,
)
from opentelemetry.instrumentation.pika.utils import dummy_callback
from opentelemetry.instrumentation.pika.utils import (
ReadyMessagesDequeProxy,
dummy_callback,
)
from opentelemetry.trace import Tracer


Expand All @@ -40,13 +44,23 @@ def test_instrument_api(self) -> None:
self.assertTrue(
isinstance(BlockingConnection.channel, BoundFunctionWrapper)
)
self.assertTrue(
isinstance(
_QueueConsumerGeneratorInfo.__init__, BoundFunctionWrapper
)
)
assert hasattr(
instrumentation, "__opentelemetry_tracer_provider"
), "Tracer not stored for the object!"
instrumentation.uninstrument(channel=self.channel)
instrumentation.uninstrument()
self.assertFalse(
isinstance(BlockingConnection.channel, BoundFunctionWrapper)
)
self.assertFalse(
isinstance(
_QueueConsumerGeneratorInfo.__init__, BoundFunctionWrapper
)
)

@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_functions"
Expand All @@ -57,7 +71,7 @@ def test_instrument_api(self) -> None:
@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_blocking_channel_consumers"
)
def test_instrument(
def test_instrument_channel(
self,
instrument_blocking_channel_consumers: mock.MagicMock,
instrument_basic_consume: mock.MagicMock,
Expand Down Expand Up @@ -110,6 +124,23 @@ def test_instrument_basic_publish(
self.channel.basic_publish, decorate_basic_publish.return_value
)

def test_instrument_queue_consumer_generator(self) -> None:
instrumentation = PikaInstrumentor()
instrumentation.instrument()
generator_info = _QueueConsumerGeneratorInfo(
params=("queue", False, False), consumer_tag="tag"
)
self.assertTrue(
isinstance(generator_info.pending_events, ReadyMessagesDequeProxy)
)
instrumentation.uninstrument()
generator_info = _QueueConsumerGeneratorInfo(
params=("queue", False, False), consumer_tag="tag"
)
self.assertFalse(
isinstance(generator_info.pending_events, ReadyMessagesDequeProxy)
)

def test_uninstrument_channel_functions(self) -> None:
original_function = self.channel.basic_publish
self.channel.basic_publish = mock.MagicMock()
Expand Down
Loading
Loading