Skip to content

Commit

Permalink
Adding psycopg2 integration (open-telemetry#298)
Browse files Browse the repository at this point in the history
  • Loading branch information
hectorhdzg authored and toumorokoshi committed Feb 17, 2020
1 parent ba919c2 commit 5463cef
Show file tree
Hide file tree
Showing 10 changed files with 312 additions and 60 deletions.
3 changes: 2 additions & 1 deletion ext/opentelemetry-ext-dbapi/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ Usage
from opentelemetry.trace import tracer_source
from opentelemetry.ext.dbapi import trace_integration
trace.set_preferred_tracer_source_implementation(lambda T: TracerSource())
tracer = trace.tracer_source().get_tracer(__name__)
# Ex: mysql.connector
trace_integration(tracer_source(), mysql.connector, "connect", "mysql")
Expand Down
119 changes: 62 additions & 57 deletions ext/opentelemetry-ext-dbapi/src/opentelemetry/ext/dbapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
https://www.python.org/dev/peps/pep-0249/
"""

import functools
import logging
import typing

Expand Down Expand Up @@ -72,16 +73,13 @@ def wrap_connect(


class DatabaseApiIntegration:
# pylint: disable=unused-argument
def __init__(
self,
tracer: Tracer,
database_component: str,
database_type: str = "sql",
connection_attributes=None,
):
if tracer is None:
raise ValueError("The tracer is not provided.")
self.connection_attributes = connection_attributes
if self.connection_attributes is None:
self.connection_attributes = {
Expand All @@ -107,18 +105,40 @@ def wrapped_connection(
"""Add object proxy to connection object.
"""
connection = connect_method(*args, **kwargs)
self.get_connection_attributes(connection)
traced_connection = TracedConnectionProxy(connection, self)
return traced_connection

def get_connection_attributes(self, connection):
# Populate span fields using connection
for key, value in self.connection_attributes.items():
attribute = getattr(connection, value, None)
# Allow attributes nested in connection object
attribute = functools.reduce(
lambda attribute, attribute_value: getattr(
attribute, attribute_value, None
),
value.split("."),
connection,
)
if attribute:
self.connection_props[key] = attribute
traced_connection = TracedConnection(connection, self)
return traced_connection
self.name = self.database_component
self.database = self.connection_props.get("database", "")
if self.database:
self.name += "." + self.database
user = self.connection_props.get("user")
if user is not None:
self.span_attributes["db.user"] = user
host = self.connection_props.get("host")
if host is not None:
self.span_attributes["net.peer.name"] = host
port = self.connection_props.get("port")
if port is not None:
self.span_attributes["net.peer.port"] = port


# pylint: disable=abstract-method
class TracedConnection(wrapt.ObjectProxy):

class TracedConnectionProxy(wrapt.ObjectProxy):
# pylint: disable=unused-argument
def __init__(
self,
Expand All @@ -130,62 +150,17 @@ def __init__(
wrapt.ObjectProxy.__init__(self, connection)
self._db_api_integration = db_api_integration

self._db_api_integration.name = (
self._db_api_integration.database_component
)
self._db_api_integration.database = self._db_api_integration.connection_props.get(
"database", ""
)
if self._db_api_integration.database:
self._db_api_integration.name += (
"." + self._db_api_integration.database
)
user = self._db_api_integration.connection_props.get("user")
if user is not None:
self._db_api_integration.span_attributes["db.user"] = user
host = self._db_api_integration.connection_props.get("host")
if host is not None:
self._db_api_integration.span_attributes["net.peer.name"] = host
port = self._db_api_integration.connection_props.get("port")
if port is not None:
self._db_api_integration.span_attributes["net.peer.port"] = port

def cursor(self, *args, **kwargs):
return TracedCursor(
return TracedCursorProxy(
self.__wrapped__.cursor(*args, **kwargs), self._db_api_integration
)


# pylint: disable=abstract-method
class TracedCursor(wrapt.ObjectProxy):

# pylint: disable=unused-argument
def __init__(
self,
cursor,
db_api_integration: DatabaseApiIntegration,
*args,
**kwargs
):
wrapt.ObjectProxy.__init__(self, cursor)
class TracedCursor:
def __init__(self, db_api_integration: DatabaseApiIntegration):
self._db_api_integration = db_api_integration

def execute(self, *args, **kwargs):
return self._traced_execution(
self.__wrapped__.execute, *args, **kwargs
)

def executemany(self, *args, **kwargs):
return self._traced_execution(
self.__wrapped__.executemany, *args, **kwargs
)

def callproc(self, *args, **kwargs):
return self._traced_execution(
self.__wrapped__.callproc, *args, **kwargs
)

def _traced_execution(
def traced_execution(
self,
query_method: typing.Callable[..., any],
*args: typing.Tuple[any, any],
Expand Down Expand Up @@ -223,3 +198,33 @@ def _traced_execution(
except Exception as ex: # pylint: disable=broad-except
span.set_status(Status(StatusCanonicalCode.UNKNOWN, str(ex)))
raise ex


# pylint: disable=abstract-method
class TracedCursorProxy(wrapt.ObjectProxy):

# pylint: disable=unused-argument
def __init__(
self,
cursor,
db_api_integration: DatabaseApiIntegration,
*args,
**kwargs
):
wrapt.ObjectProxy.__init__(self, cursor)
self._traced_cursor = TracedCursor(db_api_integration)

def execute(self, *args, **kwargs):
return self._traced_cursor.traced_execution(
self.__wrapped__.execute, *args, **kwargs
)

def executemany(self, *args, **kwargs):
return self._traced_cursor.traced_execution(
self.__wrapped__.executemany, *args, **kwargs
)

def callproc(self, *args, **kwargs):
return self._traced_cursor.traced_execution(
self.__wrapped__.callproc, *args, **kwargs
)
29 changes: 29 additions & 0 deletions ext/opentelemetry-ext-psycopg2/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
OpenTelemetry Psycopg integration
=================================

The integration with PostgreSQL supports the `Psycopg`_ library and is specified
to ``trace_integration`` using ``'PostgreSQL'``.

.. Psycopg: http://initd.org/psycopg/
Usage
-----

.. code:: python
import psycopg2
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerSource
from opentelemetry.trace.ext.psycopg2 import trace_integration
trace.set_preferred_tracer_source_implementation(lambda T: TracerSource())
tracer = trace.tracer_source().get_tracer(__name__)
trace_integration(tracer)
cnx = psycopg2.connect(database='Database')
cursor = cnx.cursor()
cursor.execute("INSERT INTO test (testField) VALUES (123)")
cursor.close()
cnx.close()
References
----------
* `OpenTelemetry Project <https://opentelemetry.io/>`_
47 changes: 47 additions & 0 deletions ext/opentelemetry-ext-psycopg2/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2020, OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
[metadata]
name = opentelemetry-ext-psycopg2
description = OpenTelemetry psycopg2 integration
long_description = file: README.rst
long_description_content_type = text/x-rst
author = OpenTelemetry Authors
author_email = [email protected]
url = https://github.com/open-telemetry/opentelemetry-python/ext/opentelemetry-ext-psycopg2
platforms = any
license = Apache-2.0
classifiers =
Development Status :: 3 - Alpha
Intended Audience :: Developers
License :: OSI Approved :: Apache Software License
Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3.4
Programming Language :: Python :: 3.5
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7

[options]
python_requires = >=3.4
package_dir=
=src
packages=find_namespace:
install_requires =
opentelemetry-api >= 0.4.dev0
psycopg2-binary >= 2.7.3.1
wrapt >= 1.0.0, < 2.0.0

[options.packages.find]
where = src
26 changes: 26 additions & 0 deletions ext/opentelemetry-ext-psycopg2/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2020, OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

import setuptools

BASE_DIR = os.path.dirname(__file__)
VERSION_FILENAME = os.path.join(
BASE_DIR, "src", "opentelemetry", "ext", "psycopg2", "version.py"
)
PACKAGE_INFO = {}
with open(VERSION_FILENAME) as f:
exec(f.read(), PACKAGE_INFO)

setuptools.setup(version=PACKAGE_INFO["__version__"])
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright 2020, OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The opentelemetry-ext-psycopg2 package allows tracing PostgreSQL queries made by the
Psycopg2 library.
"""

import logging
import typing

import psycopg2
import wrapt
from psycopg2.sql import Composable

from opentelemetry.ext.dbapi import DatabaseApiIntegration, TracedCursor
from opentelemetry.trace import Tracer

logger = logging.getLogger(__name__)

DATABASE_COMPONENT = "postgresql"
DATABASE_TYPE = "sql"


def trace_integration(tracer):
"""Integrate with PostgreSQL Psycopg library.
Psycopg: http://initd.org/psycopg/
"""

connection_attributes = {
"database": "info.dbname",
"port": "info.port",
"host": "info.host",
"user": "info.user",
}
db_integration = DatabaseApiIntegration(
tracer,
DATABASE_COMPONENT,
database_type=DATABASE_TYPE,
connection_attributes=connection_attributes,
)

# pylint: disable=unused-argument
def wrap_connect(
connect_func: typing.Callable[..., any],
instance: typing.Any,
args: typing.Tuple[any, any],
kwargs: typing.Dict[any, any],
):
connection = connect_func(*args, **kwargs)
db_integration.get_connection_attributes(connection)
connection.cursor_factory = PsycopgTraceCursor
return connection

try:
wrapt.wrap_function_wrapper(psycopg2, "connect", wrap_connect)
except Exception as ex: # pylint: disable=broad-except
logger.warning("Failed to integrate with pyscopg2. %s", str(ex))

class PsycopgTraceCursor(psycopg2.extensions.cursor):
def __init__(self, *args, **kwargs):
self._traced_cursor = TracedCursor(db_integration)
super(PsycopgTraceCursor, self).__init__(*args, **kwargs)

# pylint: disable=redefined-builtin
def execute(self, query, vars=None):
if isinstance(query, Composable):
query = query.as_string(self)
return self._traced_cursor.traced_execution(
super(PsycopgTraceCursor, self).execute, query, vars
)

# pylint: disable=redefined-builtin
def executemany(self, query, vars):
if isinstance(query, Composable):
query = query.as_string(self)
return self._traced_cursor.traced_execution(
super(PsycopgTraceCursor, self).executemany, query, vars
)

# pylint: disable=redefined-builtin
def callproc(self, procname, vars=None):
return self._traced_cursor.traced_execution(
super(PsycopgTraceCursor, self).callproc, procname, vars
)
Loading

0 comments on commit 5463cef

Please sign in to comment.