Skip to content

Commit

Permalink
Asgi middleware (elastic#1528)
Browse files Browse the repository at this point in the history
* initial version of the ASGI middleware

exception tracking is still missing, as I haven't figured out
yet why the exception doesn't bubble up to our middleware

* added docs

* mark the ASGI middleware as experimental and link to GH issue
  • Loading branch information
beniwohli authored Oct 11, 2022
1 parent b5e44db commit 515df75
Show file tree
Hide file tree
Showing 10 changed files with 530 additions and 0 deletions.
61 changes: 61 additions & 0 deletions docs/asgi-middleware.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
[[asgi-middleware]]
=== ASGI Middleware

experimental::[]

Incorporating Elastic APM into your ASGI-based project only requires a few easy
steps.

NOTE: Several ASGI frameworks are supported natively.
Please check <<supported-technologies,Supported Technologies>> for more information

[float]
[[asgi-installation]]
==== Installation

Install the Elastic APM agent using pip:

[source,bash]
----
$ pip install elastic-apm
----

or add `elastic-apm` to your project's `requirements.txt` file.


[float]
[[asgi-setup]]
==== Setup

To set up the agent, you need to initialize it with appropriate settings.

The settings are configured either via environment variables, or as
initialization arguments.

You can find a list of all available settings in the
<<configuration, Configuration>> page.

To set up the APM agent, wrap your ASGI app with the `ASGITracingMiddleware`:

[source,python]
----
from elasticapm.contrib.asgi import ASGITracingMiddleware
app = MyGenericASGIApp() # depending on framework
app = ASGITracingMiddleware(app)
----

Make sure to call <<api-set-transaction-name, `elasticapm.set_transaction_name()`>> with an appropriate transaction name in all your routes.

NOTE: Currently, the agent doesn't support automatic capturing of exceptions.
You can follow progress on this issue on https://github.com/elastic/apm-agent-python/issues/1548[Github].

[float]
[[supported-python-versions]]
==== Supported Python versions

A list of supported <<supported-python,Python>> versions can be found on our <<supported-technologies,Supported Technologies>> page.

NOTE: Elastic APM only supports `asyncio` when using Python 3.7+
226 changes: 226 additions & 0 deletions elasticapm/contrib/asgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
# BSD 3-Clause License
#
# Copyright (c) 2022, Elasticsearch BV
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import functools
import urllib.parse
from typing import TYPE_CHECKING, Optional, Tuple, Union

if TYPE_CHECKING:
from asgiref.typing import ASGIApplication, ASGIReceiveCallable, ASGISendCallable, Scope, ASGISendEvent

import elasticapm
from elasticapm import Client, get_client, instrument
from elasticapm.conf import constants
from elasticapm.contrib.asyncio.traces import set_context
from elasticapm.utils import default_ports, encoding
from elasticapm.utils.disttracing import TraceParent


def wrap_send(send, middleware):
@functools.wraps(send)
async def wrapped_send(message):
if message.get("type") == "http.response.start":
await set_context(lambda: middleware.get_data_from_response(message, constants.TRANSACTION), "response")
result = "HTTP {}xx".format(message["status"] // 100)
elasticapm.set_transaction_result(result, override=False)
await send(message)

return wrapped_send


class ASGITracingMiddleware:
__slots__ = ("_app", "client")

def __init__(self, app: "ASGIApplication", **options) -> None:
self._app = app
client = get_client()
if not client:
client = Client(**options)
self.client = client
if self.client.config.instrument and self.client.config.enabled:
instrument()

async def __call__(self, scope: "Scope", receive: "ASGIReceiveCallable", send: "ASGISendCallable") -> None:
if scope["type"] != "http":
await self._app(scope, receive, send)
return
send = wrap_send(send, self)
wrapped_receive = receive
url, url_dict = self.get_url(scope)
body = None
if not self.client.should_ignore_url(url):
self.client.begin_transaction(
transaction_type="request", trace_parent=TraceParent.from_headers(scope["headers"])
)
self.set_transaction_name(scope["method"], url)
if scope["method"] in constants.HTTP_WITH_BODY and self.client.config.capture_body != "off":
messages = []
more_body = True
while more_body:
message = await receive()
messages.append(message)
more_body = message.get("more_body", False)

body_raw = b"".join([message.get("body", b"") for message in messages])
body = str(body_raw, errors="ignore")

# Dispatch to the ASGI callable
async def wrapped_receive():
if messages:
return messages.pop(0)

# Once that's done we can just await any other messages.
return await receive()

await set_context(lambda: self.get_data_from_request(scope, constants.TRANSACTION, body), "request")

try:
await self._app(scope, wrapped_receive, send)
elasticapm.set_transaction_outcome(constants.OUTCOME.SUCCESS, override=False)
return
except Exception as exc:
self.client.capture_exception()
elasticapm.set_transaction_result("HTTP 5xx", override=False)
elasticapm.set_transaction_outcome(constants.OUTCOME.FAILURE, override=True)
elasticapm.set_context({"status_code": 500}, "response")
raise exc from None
finally:
self.client.end_transaction()

def get_headers(self, scope_or_message: Union["Scope", "ASGISendEvent"]) -> dict[str, str]:
headers = {}
for k, v in scope_or_message.get("headers", {}):
key = k.decode("latin1")
val = v.decode("latin1")
if key in headers:
headers[key] = f"{headers[key]}, {val}"
else:
headers[key] = val
return headers

def get_url(self, scope: "Scope", host: Optional[str] = None) -> Tuple[str, dict[str, str]]:
url_dict = {}
scheme = scope.get("scheme", "http")
server = scope.get("server", None)
path = scope.get("root_path", "") + scope.get("path", "")

url_dict["protocol"] = scheme + ":"

if host:
url = f"{scheme}://{host}{path}"
url_dict["hostname"] = host
elif server is not None:
host, port = server
url_dict["hostname"] = host
if port:
url_dict["port"] = port
default_port = default_ports.get(scheme, None)
if port != default_port:
url = f"{scheme}://{host}:{port}{path}"
else:
url = f"{scheme}://{host}{path}"
else:
url = path
qs = scope.get("query_string")
if qs:
query = "?" + urllib.parse.unquote(qs.decode("latin-1"))
url += query
url_dict["search"] = encoding.keyword_field(query)
url_dict["full"] = encoding.keyword_field(url)
return url, url_dict

def get_ip(self, scope: "Scope", headers: dict) -> Optional[str]:
x_forwarded_for = headers.get("x-forwarded-for")
remote_addr = headers.get("remote-addr")
ip: Optional[str] = None
if x_forwarded_for:
ip = x_forwarded_for.split(",")[0]
elif remote_addr:
ip = remote_addr
elif scope.get("client"):
ip = scope.get("client")[0]
return ip

async def get_data_from_request(self, scope: "Scope", event_type: str, body: Optional[str]) -> dict:
"""Loads data from incoming request for APM capturing.
Args:
request (Request)
config (Config)
event_type (str)
body (str)
Returns:
dict
"""
headers = self.get_headers(scope)
result = {
"method": scope["method"],
"socket": {"remote_address": self.get_ip(scope, headers)},
"cookies": headers.pop("cookies", {}),
}
if self.client.config.capture_headers:
result["headers"] = headers
if body and self.client.config.capture_body in ("all", event_type):
result["body"] = body
url, url_dict = self.get_url(scope)
result["url"] = url_dict

return result

async def get_data_from_response(self, message: dict, event_type: str) -> dict:
"""Loads data from response for APM capturing.
Args:
message (dict)
config (Config)
event_type (str)
Returns:
dict
"""
result = {}

if "status" in message:
result["status_code"] = message["status"]

if self.client.config.capture_headers and "headers" in message:
headers = self.get_headers(message)
if headers:
result["headers"] = headers

return result

def set_transaction_name(self, method: str, url: str):
"""
Default implementation sets transaction name to "METHOD unknown route".
Subclasses may add framework specific naming.
"""
elasticapm.set_transaction_name(f"{method.upper()} unknown route")
2 changes: 2 additions & 0 deletions elasticapm/utils/disttracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ def merge_duplicate_headers(cls, headers, key):
# this works for all known WSGI implementations
if isinstance(headers, list):
return ",".join([item[1] for item in headers if item[0] == key])
elif not hasattr(headers, "get") and hasattr(headers, "__iter__"):
return ",".join([item[1] for item in headers if item[0] == key])
return headers.get(key)

def _parse_tracestate(self, tracestate) -> Dict[str, str]:
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ markers =
httpx
prometheus_client
sanic
asgi
jinja2
aiobotocore
kafka
Expand Down
29 changes: 29 additions & 0 deletions tests/contrib/asgi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# BSD 3-Clause License
#
# Copyright (c) 2022, Elasticsearch BV
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
61 changes: 61 additions & 0 deletions tests/contrib/asgi/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# BSD 3-Clause License
#
# Copyright (c) 2022, Elasticsearch BV
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import asyncio

from quart import Quart, Response, jsonify

from elasticapm import async_capture_span

app = Quart(__name__)


@app.route("/", methods=["GET", "POST"])
async def root():
async with async_capture_span("sleep"):
await asyncio.sleep(0.001)
return "OK"


@app.route("/foo")
async def foo():
resp = Response("foo")
resp.headers["foo"] = "bar"
return resp


@app.route("/boom")
async def boom():
assert False


@app.route("/body")
async def json():
return jsonify({"hello": "world"})
Loading

0 comments on commit 515df75

Please sign in to comment.