Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrumentation for asyncpg #814

Merged
merged 16 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ext/opentelemetry-ext-asyncpg/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ package_dir=
=src
packages=find_namespace:
install_requires =
opentelemetry-api == 0.9.dev0
opentelemetry-api == 0.9b0
HiveTraum marked this conversation as resolved.
Show resolved Hide resolved
asyncpg >= 0.12.0

HiveTraum marked this conversation as resolved.
Show resolved Hide resolved
HiveTraum marked this conversation as resolved.
Show resolved Hide resolved
[options.extras_require]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
.. code-block:: python

import asyncpg
import opentelemetry.ext.asyncpg
from opentelemetry.ext.asyncpg import AsyncPGInstrumentor

# You can optionally pass a custom TracerProvider to AsyncPGInstrumentor.instrument()
opentelemetry.ext.asyncpg.AsyncPGInstrumentor().instrument()
AsyncPGInstrumentor().instrument()
conn = await asyncpg.connect(user='user', password='password',
database='database', host='127.0.0.1')
values = await conn.fetch('''SELECT 42;''')
Expand All @@ -34,113 +34,108 @@
---
"""

import functools

from asyncpg import Connection, exceptions
import asyncpg
import wrapt
from asyncpg import exceptions

from opentelemetry import trace
from opentelemetry.ext.asyncpg.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.trace import SpanKind
from opentelemetry.trace.status import Status, StatusCanonicalCode

_APPLIED = "_opentelemetry_ext_asyncpg_applied"
_APPLIED = "_opentelemetry_tracer"


def _exception_to_canonical_code(exc: Exception) -> StatusCanonicalCode:
if isinstance(exc, (exceptions.InterfaceError,),):
if isinstance(
exc, (exceptions.InterfaceError, exceptions.SyntaxOrAccessError),
):
return StatusCanonicalCode.INVALID_ARGUMENT
if isinstance(exc, exceptions.IdleInTransactionSessionTimeoutError):
return StatusCanonicalCode.DEADLINE_EXCEEDED
return StatusCanonicalCode.UNKNOWN


def _hydrate_span_from_args(span, *args, **__):
span.set_attribute("db.type", "sql")

if len(args) <= 0:
return span

connection = args[0]
if connection is not None:
params = getattr(connection, "_params", None)
if params is not None:
database_name = getattr(params, "database", None)
if database_name is not None:
span.set_attribute("db.instance", database_name)
def _hydrate_span_from_args(connection, query, parameters) -> dict:
span_attributes = {"db.type": "sql"}

database_user = getattr(params, "user", None)
if database_user is not None:
span.set_attribute("db.user", database_user)
params = getattr(connection, "_params", None)
span_attributes["db.instance"] = getattr(params, "database", None)
span_attributes["db.user"] = getattr(params, "user", None)

if len(args) > 1 and args[1] is not None:
span.set_attribute("db.statement", args[1])
if query is not None:
span_attributes["db.statement"] = query

if len(args) > 2 and args[2] is not None and len(args[2]) > 0:
span.set_attribute("db.statement.parameters", args[2])
if parameters is not None and len(parameters) > 0:
span_attributes["db.statement.parameters"] = str(parameters)

return span
return span_attributes


def _execute(wrapped, tracer_provider):
tracer = trace.get_tracer(__name__, "0.8", tracer_provider)
async def _do_execute(func, instance, args, kwargs):
span_attributes = _hydrate_span_from_args(instance, args[0], args[1:])
tracer = getattr(asyncpg, _APPLIED)

@functools.wraps(wrapped)
async def _method(*args, **kwargs):
exception = None

exception = None
with tracer.start_as_current_span(
"postgresql", kind=SpanKind.CLIENT
) as span:

with tracer.start_as_current_span(
"postgresql", kind=SpanKind.CLIENT
) as span:

span = _hydrate_span_from_args(span, *args, **kwargs)

try:
result = await wrapped(*args, **kwargs)
except Exception as exc: # pylint: disable=W0703
exception = exc
for attribute, value in span_attributes.items():
span.set_attribute(attribute, value)

try:
result = await func(*args, **kwargs)
except Exception as exc: # pylint: disable=W0703
exception = exc
raise
finally:
if exception is not None:
span.set_status(
Status(_exception_to_canonical_code(exception))
)
else:
span.set_status(Status(StatusCanonicalCode.OK))

if exception is not None:
raise exception.with_traceback(exception.__traceback__)

return result

setattr(_method, _APPLIED, True)
return _method
return result


class AsyncPGInstrumentor(BaseInstrumentor):
def instrument(self, **kwargs):
self._instrument(**kwargs)

def uninstrument(self, **kwargs):
self._uninstrument(**kwargs)

@staticmethod
def _instrument(**kwargs):
tracer_provider = kwargs.get("tracer_provider")

for method in ["_execute", "_executemany"]:
_original = getattr(Connection, method, None)
if hasattr(_original, _APPLIED) is False:
setattr(
Connection, method, _execute(_original, tracer_provider)
)

@staticmethod
def _uninstrument(**__):
for method in ["_execute", "_executemany"]:
_connection_method = getattr(Connection, method, None)
if _connection_method is not None and getattr(
_connection_method, _APPLIED, False
):
original = getattr(_connection_method, "__wrapped__", None)
if original is not None:
setattr(Connection, method, original)
def _instrument(self, **kwargs):
tracer_provider = kwargs.get(
"tracer_provider", trace.get_tracer_provider()
)
setattr(
asyncpg,
_APPLIED,
tracer_provider.get_tracer("asyncpg", __version__),
)
wrapt.wrap_function_wrapper(
"asyncpg.connection", "Connection.execute", _do_execute
)
wrapt.wrap_function_wrapper(
"asyncpg.connection", "Connection.executemany", _do_execute
)
wrapt.wrap_function_wrapper(
"asyncpg.connection", "Connection.fetch", _do_execute
)
wrapt.wrap_function_wrapper(
"asyncpg.connection", "Connection.fetchval", _do_execute
)
wrapt.wrap_function_wrapper(
"asyncpg.connection", "Connection.fetchrow", _do_execute
)

def _uninstrument(self, **__):
delattr(asyncpg, _APPLIED)
for method in [
"execute",
"executemany",
"fetch",
"fetchval",
"fetchrow",
]:
unwrap(asyncpg.Connection, method)
HiveTraum marked this conversation as resolved.
Show resolved Hide resolved
Loading