Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ext/pymongo: Add instrumentor #612

Merged
merged 11 commits into from
Apr 27, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from pymongo import MongoClient

from opentelemetry import trace as trace_api
from opentelemetry.ext.pymongo import trace_integration
from opentelemetry.ext.pymongo import PymongoInstrumentor
from opentelemetry.test.test_base import TestBase

MONGODB_HOST = os.getenv("MONGODB_HOST ", "localhost")
Expand All @@ -31,7 +31,7 @@ class TestFunctionalPymongo(TestBase):
def setUpClass(cls):
super().setUpClass()
cls._tracer = cls.tracer_provider.get_tracer(__name__)
trace_integration(cls.tracer_provider)
PymongoInstrumentor().instrument()
client = MongoClient(
MONGODB_HOST, MONGODB_PORT, serverSelectionTimeoutMS=2000
)
Expand Down Expand Up @@ -94,3 +94,23 @@ def test_delete(self):
with self._tracer.start_as_current_span("rootSpan"):
self._collection.delete_one({"name": "testName"})
self.validate_spans()

def test_uninstrument(self):
# check that integration is working
self._collection.find_one()
spans = self.memory_exporter.get_finished_spans()
self.memory_exporter.clear()
self.assertEqual(len(spans), 1)

# uninstrument and check not new spans are created
PymongoInstrumentor().uninstrument()
self._collection.find_one()
spans = self.memory_exporter.get_finished_spans()
self.memory_exporter.clear()
self.assertEqual(len(spans), 0)

# re-enable and check that it works again
PymongoInstrumentor().instrument()
self._collection.find_one()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
3 changes: 3 additions & 0 deletions ext/opentelemetry-ext-pymongo/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Implement instrumentor interface ([#612](https://github.com/open-telemetry/opentelemetry-python/pull/612))
mauriciovasquezbernal marked this conversation as resolved.
Show resolved Hide resolved


## 0.4a0

Released 2020-02-21
Expand Down
5 changes: 5 additions & 0 deletions ext/opentelemetry-ext-pymongo/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ package_dir=
packages=find_namespace:
install_requires =
opentelemetry-api == 0.7.dev0
opentelemetry-auto-instrumentation == 0.7.dev0
pymongo ~= 3.1

[options.extras_require]
Expand All @@ -49,3 +50,7 @@ test =

[options.packages.find]
where = src

[options.entry_points]
opentelemetry_instrumentor =
pymongo = opentelemetry.ext.pymongo:PymongoInstrumentor
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# limitations under the License.

"""
The integration with MongoDB supports the `pymongo`_ library and is specified
to ``trace_integration`` using ``'pymongo'``.
The integration with MongoDB supports the `pymongo`_ library, it can be
enabled using the ``PymongoInstrumentor``.

.. _pymongo: https://pypi.org/project/pymongo

Expand All @@ -26,11 +26,11 @@
from pymongo import MongoClient
from opentelemetry import trace
from opentelemetry.trace import TracerProvider
from opentelemetry.trace.ext.pymongo import trace_integration
from opentelemetry.trace.ext.pymongo import PymongoInstrumentor

trace.set_tracer_provider(TracerProvider())

trace_integration()
PymongoInstrumentor().instrument()
client = MongoClient()
db = client["MongoDB_Database"]
collection = db["MongoDB_Collection"]
Expand All @@ -42,6 +42,8 @@

from pymongo import monitoring

from opentelemetry import trace
from opentelemetry.auto_instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.ext.pymongo.version import __version__
from opentelemetry.trace import SpanKind, get_tracer
from opentelemetry.trace.status import Status, StatusCanonicalCode
Expand All @@ -50,27 +52,16 @@
COMMAND_ATTRIBUTES = ["filter", "sort", "skip", "limit", "pipeline"]


def trace_integration(tracer_provider=None):
"""Integrate with pymongo to trace it using event listener.
https://api.mongodb.com/python/current/api/pymongo/monitoring.html

Args:
tracer_provider: The `TracerProvider` to use. If none is passed the
current configured one is used.
"""

tracer = get_tracer(__name__, __version__, tracer_provider)

monitoring.register(CommandTracer(tracer))


class CommandTracer(monitoring.CommandListener):
def __init__(self, tracer):
self._tracer = tracer
self._span_dict = {}
self.is_enabled = True

def started(self, event: monitoring.CommandStartedEvent):
""" Method to handle a pymongo CommandStartedEvent """
if not self.is_enabled:
return
command = event.command.get(event.command_name, "")
name = DATABASE_TYPE + "." + event.command_name
statement = event.command_name
Expand Down Expand Up @@ -103,38 +94,70 @@ def started(self, event: monitoring.CommandStartedEvent):
if span is not None:
span.set_status(Status(StatusCanonicalCode.INTERNAL, str(ex)))
span.end()
self._remove_span(event)
self._pop_span(event)

def succeeded(self, event: monitoring.CommandSucceededEvent):
""" Method to handle a pymongo CommandSucceededEvent """
span = self._get_span(event)
if span is not None:
span.set_attribute(
"db.mongo.duration_micros", event.duration_micros
)
span.set_status(Status(StatusCanonicalCode.OK, event.reply))
span.end()
self._remove_span(event)
if not self.is_enabled:
return
span = self._pop_span(event)
if span is None:
return
span.set_attribute("db.mongo.duration_micros", event.duration_micros)
span.set_status(Status(StatusCanonicalCode.OK, event.reply))
span.end()

def failed(self, event: monitoring.CommandFailedEvent):
""" Method to handle a pymongo CommandFailedEvent """
span = self._get_span(event)
if span is not None:
span.set_attribute(
"db.mongo.duration_micros", event.duration_micros
)
span.set_status(Status(StatusCanonicalCode.UNKNOWN, event.failure))
span.end()
self._remove_span(event)

def _get_span(self, event):
return self._span_dict.get(_get_span_dict_key(event))
if not self.is_enabled:
return
span = self._pop_span(event)
if span is None:
return
span.set_attribute("db.mongo.duration_micros", event.duration_micros)
span.set_status(Status(StatusCanonicalCode.UNKNOWN, event.failure))
span.end()

def _remove_span(self, event):
self._span_dict.pop(_get_span_dict_key(event))
def _pop_span(self, event):
return self._span_dict.pop(_get_span_dict_key(event), None)


def _get_span_dict_key(event):
if event.connection_id is not None:
return (event.request_id, event.connection_id)
return event.request_id


class PymongoInstrumentor(BaseInstrumentor):
_commandtracer_instance = None # type CommandTracer
# The instrumentation for PyMongo is based on the event listener interface
# https://api.mongodb.com/python/current/api/pymongo/monitoring.html.
# This interface only allows to register listeners and does not provide
# an unregister API. In order to provide a mechanishm to disable
# instrumentation an enabled flag is implemented in CommandTracer,
# it's checked in the different listeners.

def _instrument(self, **kwargs):
"""Integrate with pymongo to trace it using event listener.
https://api.mongodb.com/python/current/api/pymongo/monitoring.html

Args:
tracer_provider: The `TracerProvider` to use. If none is passed the
current configured one is used.
"""

tracer_provider = kwargs.get("tracer_provider")

# Create and register a CommandTracer only the first time
if self._commandtracer_instance is None:
tracer = get_tracer(__name__, __version__, tracer_provider)

self._commandtracer_instance = CommandTracer(tracer)
monitoring.register(self._commandtracer_instance)

# If already created, just enable it
self._commandtracer_instance.is_enabled = True

def _uninstrument(self, **kwargs):
if self._commandtracer_instance is not None:
self._commandtracer_instance.is_enabled = False
8 changes: 4 additions & 4 deletions ext/opentelemetry-ext-pymongo/tests/test_pymongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from unittest import mock

from opentelemetry import trace as trace_api
from opentelemetry.ext.pymongo import CommandTracer, trace_integration
from opentelemetry.ext.pymongo import CommandTracer, PymongoInstrumentor
from opentelemetry.test.test_base import TestBase


Expand All @@ -24,13 +24,13 @@ def setUp(self):
super().setUp()
self.tracer = self.tracer_provider.get_tracer(__name__)

def test_trace_integration(self):
def test_pymongo_instrumentor(self):
mock_register = mock.Mock()
patch = mock.patch(
"pymongo.monitoring.register", side_effect=mock_register
)
with patch:
trace_integration(self.tracer_provider)
PymongoInstrumentor().instrument()

self.assertTrue(mock_register.called)

Expand All @@ -50,7 +50,7 @@ def test_started(self):
# the memory exporter can't be used here because the span isn't ended
# yet
# pylint: disable=protected-access
span = command_tracer._get_span(mock_event)
span = command_tracer._pop_span(mock_event)
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)
self.assertEqual(span.name, "mongodb.command_name.find")
self.assertEqual(span.attributes["component"], "mongodb")
Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ commands_pre =

prometheus: pip install {toxinidir}/ext/opentelemetry-ext-prometheus

pymongo: pip install {toxinidir}/opentelemetry-auto-instrumentation
pymongo: pip install {toxinidir}/ext/opentelemetry-ext-pymongo[test]

psycopg2: pip install {toxinidir}/ext/opentelemetry-ext-dbapi
Expand Down