Skip to content

Commit

Permalink
Add tracing (#27)
Browse files Browse the repository at this point in the history
* Add tracing functions to RabbitMQ Publisher and subscriber
  • Loading branch information
t-persson authored Apr 18, 2024
1 parent 89e5f3d commit b580e4f
Show file tree
Hide file tree
Showing 18 changed files with 463 additions and 53 deletions.
4 changes: 1 addition & 3 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
import inspect
import shutil

__location__ = os.path.join(
os.getcwd(), os.path.dirname(inspect.getfile(inspect.currentframe()))
)
__location__ = os.path.join(os.getcwd(), os.path.dirname(inspect.getfile(inspect.currentframe())))

# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
Expand Down
4 changes: 4 additions & 0 deletions pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[messages control]
disable=
fixme,
duplicate-code
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ eiffellib[rabbitmq]~=2.4
requests~=2.31
kubernetes~=26.1
pyyaml~=6.0
opentelemetry-api~=1.21
opentelemetry-semantic-conventions~=0.42b0
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ install_requires =
requests~=2.31
kubernetes~=26.1
pyyaml~=6.0
opentelemetry-api~=1.21
opentelemetry-semantic-conventions~=0.42b0

# Require a specific Python version, e.g. Python 2.7 or >= 3.4
python_requires = >=3.4
Expand Down
16 changes: 16 additions & 0 deletions src/etos_lib/eiffel/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright Axis Communications AB.
#
# For a full list of individual contributors, please see the commit history.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""ETOS library eiffel helpers."""
117 changes: 117 additions & 0 deletions src/etos_lib/eiffel/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Copyright Axis Communications AB.
#
# For a full list of individual contributors, please see the commit history.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common functions for the eiffel helpers."""
from collections.abc import MutableMapping, Sequence
from typing import Iterable, Optional

from eiffellib.events.eiffel_base_event import EiffelBaseEvent
from opentelemetry.semconv.trace import MessagingOperationValues, SpanAttributes
from opentelemetry.trace.span import Span
from pika.channel import Channel
from pika.spec import BasicProperties

from ..lib.config import Config

PUBLISHER_TEMPLATE = "{SERVER_ADDRESS}:{SERVER_PORT},{RABBITMQ_VHOST},{RABBITMQ_EXCHANGE}"
CONSUMER_TEMPLATE = "{RABBITMQ_QUEUE_NAME}"
# pylint:disable=too-many-arguments


def add_span_attributes(
span: Span,
channel: Channel,
properties: BasicProperties,
routing_key: str,
operation: MessagingOperationValues,
destination_name: Optional[str] = None,
) -> None:
"""Add rabbitmq properties to a span.
Copied and modified from:
https://github.com/open-telemetry/opentelemetry-python-contrib/blob/main/instrumentation/opentelemetry-instrumentation-pika
"""
ssl = bool(Config().get("rabbitmq").get("ssl"))

span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "rabbitmq")
span.set_attribute(SpanAttributes.MESSAGING_OPERATION, operation.value)
span.set_attribute(SpanAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY, routing_key)

if destination_name is not None:
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION_NAME, destination_name)
span.set_attribute("messaging.destination_publish.name", properties.type)
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION_TEMPLATE, CONSUMER_TEMPLATE)
else:
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION_NAME, properties.type)
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION_TEMPLATE, PUBLISHER_TEMPLATE)

span.set_attribute(SpanAttributes.NETWORK_PROTOCOL_NAME, "amqps" if ssl else "amqp")
span.set_attribute(SpanAttributes.NETWORK_TYPE, "ipv4")
span.set_attribute(SpanAttributes.NETWORK_TRANSPORT, "tcp")

span.set_attribute(SpanAttributes.SERVER_ADDRESS, channel.connection.params.host)
span.set_attribute(SpanAttributes.SERVER_PORT, channel.connection.params.port)


def add_span_eiffel_attributes(span: Span, event: EiffelBaseEvent) -> None:
"""Add Eiffel properties to a span."""
span.set_attribute(SpanAttributes.EVENT_NAME, event.meta.type)
span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, event.meta.event_id)


def _flatten(d: dict, parent_key: str = "", sep: str = ".") -> Iterable[str, str]:
"""Flatten a dictionary to be compatible with opentelemetry."""
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, MutableMapping):
yield from _flatten_dict(v, new_key, sep=sep).items()
elif isinstance(v, list):
for i, lv in enumerate(v):
if isinstance(lv, str):
yield new_key, v
break
if isinstance(lv, MutableMapping):
new_key = new_key + sep + str(i)
yield from _flatten_dict(lv, new_key, sep=sep).items()
else:
yield new_key, v


def _flatten_dict(d: MutableMapping, parent_key: str = "", sep: str = ".") -> dict:
"""Call flatten on a dictionary."""
return dict(_flatten(d, parent_key, sep))


def _links_to_dict(links: Sequence) -> MutableMapping:
"""Convert an Eiffel links structure to a dictionary."""
dict_links = {}
for link in links:
key = link["type"].lower()
if key in dict_links:
if not isinstance(dict_links[key], list):
dict_links[key] = [dict_links[key]]
dict_links[key].append(link["target"])
else:
dict_links[key] = link["target"]
return dict_links


def add_event(event: EiffelBaseEvent) -> dict:
"""Add event data to a dictionary."""
attributes = {}
event_json = event.json
event_json["links"] = _links_to_dict(event_json.pop("links"))
attributes.update(**_flatten_dict(event_json, parent_key="eiffel"))
return attributes
128 changes: 128 additions & 0 deletions src/etos_lib/eiffel/publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright Axis Communications AB.
#
# For a full list of individual contributors, please see the commit history.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Custom publishers for eiffellib."""
import logging
import time
from copy import deepcopy
from threading import current_thread

from eiffellib.events.eiffel_base_event import EiffelBaseEvent
from eiffellib.publishers.rabbitmq_publisher import RabbitMQPublisher
from opentelemetry import propagate, trace
from opentelemetry.semconv.trace import MessagingOperationValues
from opentelemetry.trace import SpanKind
from pika.spec import BasicProperties

from .common import add_event, add_span_attributes, add_span_eiffel_attributes

_LOG = logging.getLogger(__name__)


class TracingRabbitMQPublisher(RabbitMQPublisher):
"""Custom RabbitMQ publisher that propagates otel trace information to headers."""

def __init__(self, *args, **kwargs):
"""Get a tracer."""
# Must import this here, otherwise there would be a cyclic import problem.
# pylint:disable=cyclic-import,import-outside-toplevel
from etos_lib import __version__

super().__init__(*args, **kwargs)
self.tracer = trace.get_tracer(
__name__,
__version__,
schema_url="https://opentelemetry.io/schemas/1.11.0",
)
self.destination = f"{self.parameters.host},{self.parameters.virtual_host},{self.exchange}"

def send_event(self, event: EiffelBaseEvent, block: bool = True) -> None:
"""Validate and send an eiffel event to the rabbitmq server.
This method will set the source on all events if there is a source
added to the :obj:`RabbitMQPublisher`.
If the routing key is set to None in the :obj:`RabbitMQPublisher` this
method will use the routing key from the event that is being sent.
The event domainId will also be added to `meta.source` if it is set to
anything other than the default value. If there is no domainId
set on the event, then the domainId from the source in the
:obj:`RabbitMQPublisher` will be used in the routing key, with a default
value taken from the :obj:`eiffellib.events.eiffel_base_event.EiffelBaseEvent`.
:param event: Event to send.
:type event: :obj:`eiffellib.events.eiffel_base_event.EiffelBaseEvent`
:param block: Set to True in order to block for channel to become ready.
Default: True
:type block: bool
"""
if block:
self.wait_start()
while self._channel is None or not self._channel.is_open:
time.sleep(0.1)

properties = BasicProperties(
content_type="application/json", delivery_mode=2, headers={}, type=self.destination
)

source = deepcopy(self.source)
if self.routing_key is None and event.domain_id != EiffelBaseEvent.domain_id:
source = source or {}
source["domainId"] = event.domain_id
elif self.routing_key is None and source is not None:
# EiffelBaseEvent.domain_id will be the default value.
# By using that value instead of setting the default in this
# method there will only be one place to set the default (the events).
event.domain_id = source.get("domainId", EiffelBaseEvent.domain_id)
if source is not None:
event.meta.add("source", source)
event.validate()
routing_key = self.routing_key or event.routing_key

task_name = f"{self.exchange if self.exchange else routing_key} send"
span = self.tracer.start_span(
name=task_name,
kind=SpanKind.PRODUCER,
)
if span.is_recording():
add_span_attributes(
span,
self._channel,
properties,
routing_key,
MessagingOperationValues.PUBLISH,
)
add_span_eiffel_attributes(span, event)

_LOG.debug("[%s] Attempting to acquire 'send_event' lock", current_thread().name)
with self._lock, trace.use_span(span, end_on_exit=True) as _span:
_LOG.debug("[%s] 'send_event' Lock acquired", current_thread().name)
propagate.inject(properties.headers)
if properties.headers == {}: # Tracing is not enabled?
properties.headers = None
try:
self._channel.basic_publish(
self.exchange,
routing_key,
event.serialized,
properties,
)
_span.add_event("Published event", attributes=add_event(event))
except Exception as exception: # pylint:disable=broad-except
self._nacked_deliveries.append(event)
_span.record_exception(exception, escaped=True)
return
self._delivered += 1
self._deliveries[self._delivered] = event
_LOG.debug("[%s] 'send_event' Lock released", current_thread().name)
Loading

0 comments on commit b580e4f

Please sign in to comment.