Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asgi middleware #1528

Merged
merged 6 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions docs/asgi-middleware.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
[[asgi-middleware]]
=== ASGI Middleware

experimental::[]

Incorporating Elastic APM into your ASGI-based project only requires a few easy
steps.

NOTE: Several ASGI frameworks are supported natively.
Please check <<supported-technologies,Supported Technologies>> for more information

[float]
[[asgi-installation]]
==== Installation

Install the Elastic APM agent using pip:

[source,bash]
----
$ pip install elastic-apm
----

or add `elastic-apm` to your project's `requirements.txt` file.


[float]
[[asgi-setup]]
==== Setup

To set up the agent, you need to initialize it with appropriate settings.

The settings are configured either via environment variables, or as
initialization arguments.

You can find a list of all available settings in the
<<configuration, Configuration>> page.

To set up the APM agent, wrap your ASGI app with the `ASGITracingMiddleware`:

[source,python]
----
from elasticapm.contrib.asgi import ASGITracingMiddleware

app = MyGenericASGIApp() # depending on framework

app = ASGITracingMiddleware(app)

----

Make sure to call <<api-set-transaction-name, `elasticapm.set_transaction_name()`>> with an appropriate transaction name in all your routes.

NOTE: Currently, the agent doesn't support automatic capturing of exceptions.
You can follow progress on this issue on https://github.com/elastic/apm-agent-python/issues/1548[Github].

[float]
[[supported-python-versions]]
==== Supported Python versions

A list of supported <<supported-python,Python>> versions can be found on our <<supported-technologies,Supported Technologies>> page.

NOTE: Elastic APM only supports `asyncio` when using Python 3.7+
226 changes: 226 additions & 0 deletions elasticapm/contrib/asgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
# BSD 3-Clause License
#
# Copyright (c) 2022, Elasticsearch BV
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import functools
import urllib.parse
from typing import TYPE_CHECKING, Optional, Tuple, Union

if TYPE_CHECKING:
from asgiref.typing import ASGIApplication, ASGIReceiveCallable, ASGISendCallable, Scope, ASGISendEvent

import elasticapm
from elasticapm import Client, get_client, instrument
from elasticapm.conf import constants
from elasticapm.contrib.asyncio.traces import set_context
from elasticapm.utils import default_ports, encoding
from elasticapm.utils.disttracing import TraceParent


def wrap_send(send, middleware):
@functools.wraps(send)
async def wrapped_send(message):
if message.get("type") == "http.response.start":
await set_context(lambda: middleware.get_data_from_response(message, constants.TRANSACTION), "response")
result = "HTTP {}xx".format(message["status"] // 100)
elasticapm.set_transaction_result(result, override=False)
await send(message)

return wrapped_send


class ASGITracingMiddleware:
__slots__ = ("_app", "client")

def __init__(self, app: "ASGIApplication", **options) -> None:
self._app = app
client = get_client()
if not client:
client = Client(**options)
self.client = client
if self.client.config.instrument and self.client.config.enabled:
instrument()

async def __call__(self, scope: "Scope", receive: "ASGIReceiveCallable", send: "ASGISendCallable") -> None:
if scope["type"] != "http":
await self._app(scope, receive, send)
return
send = wrap_send(send, self)
wrapped_receive = receive
url, url_dict = self.get_url(scope)
body = None
if not self.client.should_ignore_url(url):
self.client.begin_transaction(
transaction_type="request", trace_parent=TraceParent.from_headers(scope["headers"])
)
self.set_transaction_name(scope["method"], url)
if scope["method"] in constants.HTTP_WITH_BODY and self.client.config.capture_body != "off":
messages = []
more_body = True
while more_body:
message = await receive()
messages.append(message)
more_body = message.get("more_body", False)

body_raw = b"".join([message.get("body", b"") for message in messages])
body = str(body_raw, errors="ignore")

# Dispatch to the ASGI callable
async def wrapped_receive():
if messages:
return messages.pop(0)

# Once that's done we can just await any other messages.
return await receive()

await set_context(lambda: self.get_data_from_request(scope, constants.TRANSACTION, body), "request")

try:
await self._app(scope, wrapped_receive, send)
elasticapm.set_transaction_outcome(constants.OUTCOME.SUCCESS, override=False)
return
except Exception as exc:
self.client.capture_exception()
elasticapm.set_transaction_result("HTTP 5xx", override=False)
elasticapm.set_transaction_outcome(constants.OUTCOME.FAILURE, override=True)
elasticapm.set_context({"status_code": 500}, "response")
raise exc from None
finally:
self.client.end_transaction()

def get_headers(self, scope_or_message: Union["Scope", "ASGISendEvent"]) -> dict[str, str]:
headers = {}
for k, v in scope_or_message.get("headers", {}):
key = k.decode("latin1")
val = v.decode("latin1")
if key in headers:
headers[key] = f"{headers[key]}, {val}"
else:
headers[key] = val
return headers

def get_url(self, scope: "Scope", host: Optional[str] = None) -> Tuple[str, dict[str, str]]:
url_dict = {}
scheme = scope.get("scheme", "http")
server = scope.get("server", None)
path = scope.get("root_path", "") + scope.get("path", "")

url_dict["protocol"] = scheme + ":"

if host:
url = f"{scheme}://{host}{path}"
url_dict["hostname"] = host
elif server is not None:
host, port = server
url_dict["hostname"] = host
if port:
url_dict["port"] = port
default_port = default_ports.get(scheme, None)
if port != default_port:
url = f"{scheme}://{host}:{port}{path}"
else:
url = f"{scheme}://{host}{path}"
else:
url = path
qs = scope.get("query_string")
if qs:
query = "?" + urllib.parse.unquote(qs.decode("latin-1"))
url += query
url_dict["search"] = encoding.keyword_field(query)
url_dict["full"] = encoding.keyword_field(url)
return url, url_dict

def get_ip(self, scope: "Scope", headers: dict) -> Optional[str]:
x_forwarded_for = headers.get("x-forwarded-for")
remote_addr = headers.get("remote-addr")
ip: Optional[str] = None
if x_forwarded_for:
ip = x_forwarded_for.split(",")[0]
elif remote_addr:
ip = remote_addr
elif scope.get("client"):
ip = scope.get("client")[0]
return ip

async def get_data_from_request(self, scope: "Scope", event_type: str, body: Optional[str]) -> dict:
"""Loads data from incoming request for APM capturing.

Args:
request (Request)
config (Config)
event_type (str)
body (str)

Returns:
dict
"""
headers = self.get_headers(scope)
result = {
"method": scope["method"],
"socket": {"remote_address": self.get_ip(scope, headers)},
"cookies": headers.pop("cookies", {}),
}
if self.client.config.capture_headers:
result["headers"] = headers
if body and self.client.config.capture_body in ("all", event_type):
result["body"] = body
url, url_dict = self.get_url(scope)
result["url"] = url_dict

return result

async def get_data_from_response(self, message: dict, event_type: str) -> dict:
"""Loads data from response for APM capturing.

Args:
message (dict)
config (Config)
event_type (str)

Returns:
dict
"""
result = {}

if "status" in message:
result["status_code"] = message["status"]

if self.client.config.capture_headers and "headers" in message:
headers = self.get_headers(message)
if headers:
result["headers"] = headers

return result

def set_transaction_name(self, method: str, url: str):
"""
Default implementation sets transaction name to "METHOD unknown route".
Subclasses may add framework specific naming.
"""
elasticapm.set_transaction_name(f"{method.upper()} unknown route")
2 changes: 2 additions & 0 deletions elasticapm/utils/disttracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ def merge_duplicate_headers(cls, headers, key):
# this works for all known WSGI implementations
if isinstance(headers, list):
return ",".join([item[1] for item in headers if item[0] == key])
elif not hasattr(headers, "get") and hasattr(headers, "__iter__"):
return ",".join([item[1] for item in headers if item[0] == key])
return headers.get(key)

def _parse_tracestate(self, tracestate) -> Dict[str, str]:
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ markers =
httpx
prometheus_client
sanic
asgi
jinja2
aiobotocore
kafka
Expand Down
29 changes: 29 additions & 0 deletions tests/contrib/asgi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# BSD 3-Clause License
#
# Copyright (c) 2022, Elasticsearch BV
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
61 changes: 61 additions & 0 deletions tests/contrib/asgi/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# BSD 3-Clause License
#
# Copyright (c) 2022, Elasticsearch BV
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import asyncio

from quart import Quart, Response, jsonify

from elasticapm import async_capture_span

app = Quart(__name__)


@app.route("/", methods=["GET", "POST"])
async def root():
async with async_capture_span("sleep"):
await asyncio.sleep(0.001)
return "OK"


@app.route("/foo")
async def foo():
resp = Response("foo")
resp.headers["foo"] = "bar"
return resp


@app.route("/boom")
async def boom():
assert False


@app.route("/body")
async def json():
return jsonify({"hello": "world"})
Loading