diff --git a/docs/asgi-middleware.asciidoc b/docs/asgi-middleware.asciidoc new file mode 100644 index 000000000..75607d8bc --- /dev/null +++ b/docs/asgi-middleware.asciidoc @@ -0,0 +1,61 @@ +[[asgi-middleware]] +=== ASGI Middleware + +experimental::[] + +Incorporating Elastic APM into your ASGI-based project only requires a few easy +steps. + +NOTE: Several ASGI frameworks are supported natively. +Please check <> for more information + +[float] +[[asgi-installation]] +==== Installation + +Install the Elastic APM agent using pip: + +[source,bash] +---- +$ pip install elastic-apm +---- + +or add `elastic-apm` to your project's `requirements.txt` file. + + +[float] +[[asgi-setup]] +==== Setup + +To set up the agent, you need to initialize it with appropriate settings. + +The settings are configured either via environment variables, or as +initialization arguments. + +You can find a list of all available settings in the +<> page. + +To set up the APM agent, wrap your ASGI app with the `ASGITracingMiddleware`: + +[source,python] +---- +from elasticapm.contrib.asgi import ASGITracingMiddleware + +app = MyGenericASGIApp() # depending on framework + +app = ASGITracingMiddleware(app) + +---- + +Make sure to call <> with an appropriate transaction name in all your routes. + +NOTE: Currently, the agent doesn't support automatic capturing of exceptions. +You can follow progress on this issue on https://github.com/elastic/apm-agent-python/issues/1548[Github]. + +[float] +[[supported-python-versions]] +==== Supported Python versions + +A list of supported <> versions can be found on our <> page. + +NOTE: Elastic APM only supports `asyncio` when using Python 3.7+ diff --git a/elasticapm/contrib/asgi.py b/elasticapm/contrib/asgi.py new file mode 100644 index 000000000..6b970a67c --- /dev/null +++ b/elasticapm/contrib/asgi.py @@ -0,0 +1,226 @@ +# BSD 3-Clause License +# +# Copyright (c) 2022, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import functools +import urllib.parse +from typing import TYPE_CHECKING, Optional, Tuple, Union + +if TYPE_CHECKING: + from asgiref.typing import ASGIApplication, ASGIReceiveCallable, ASGISendCallable, Scope, ASGISendEvent + +import elasticapm +from elasticapm import Client, get_client, instrument +from elasticapm.conf import constants +from elasticapm.contrib.asyncio.traces import set_context +from elasticapm.utils import default_ports, encoding +from elasticapm.utils.disttracing import TraceParent + + +def wrap_send(send, middleware): + @functools.wraps(send) + async def wrapped_send(message): + if message.get("type") == "http.response.start": + await set_context(lambda: middleware.get_data_from_response(message, constants.TRANSACTION), "response") + result = "HTTP {}xx".format(message["status"] // 100) + elasticapm.set_transaction_result(result, override=False) + await send(message) + + return wrapped_send + + +class ASGITracingMiddleware: + __slots__ = ("_app", "client") + + def __init__(self, app: "ASGIApplication", **options) -> None: + self._app = app + client = get_client() + if not client: + client = Client(**options) + self.client = client + if self.client.config.instrument and self.client.config.enabled: + instrument() + + async def __call__(self, scope: "Scope", receive: "ASGIReceiveCallable", send: "ASGISendCallable") -> None: + if scope["type"] != "http": + await self._app(scope, receive, send) + return + send = wrap_send(send, self) + wrapped_receive = receive + url, url_dict = self.get_url(scope) + body = None + if not self.client.should_ignore_url(url): + self.client.begin_transaction( + transaction_type="request", trace_parent=TraceParent.from_headers(scope["headers"]) + ) + self.set_transaction_name(scope["method"], url) + if scope["method"] in constants.HTTP_WITH_BODY and self.client.config.capture_body != "off": + messages = [] + more_body = True + while more_body: + message = await receive() + messages.append(message) + more_body = message.get("more_body", False) + + body_raw = b"".join([message.get("body", b"") for message in messages]) + body = str(body_raw, errors="ignore") + + # Dispatch to the ASGI callable + async def wrapped_receive(): + if messages: + return messages.pop(0) + + # Once that's done we can just await any other messages. + return await receive() + + await set_context(lambda: self.get_data_from_request(scope, constants.TRANSACTION, body), "request") + + try: + await self._app(scope, wrapped_receive, send) + elasticapm.set_transaction_outcome(constants.OUTCOME.SUCCESS, override=False) + return + except Exception as exc: + self.client.capture_exception() + elasticapm.set_transaction_result("HTTP 5xx", override=False) + elasticapm.set_transaction_outcome(constants.OUTCOME.FAILURE, override=True) + elasticapm.set_context({"status_code": 500}, "response") + raise exc from None + finally: + self.client.end_transaction() + + def get_headers(self, scope_or_message: Union["Scope", "ASGISendEvent"]) -> dict[str, str]: + headers = {} + for k, v in scope_or_message.get("headers", {}): + key = k.decode("latin1") + val = v.decode("latin1") + if key in headers: + headers[key] = f"{headers[key]}, {val}" + else: + headers[key] = val + return headers + + def get_url(self, scope: "Scope", host: Optional[str] = None) -> Tuple[str, dict[str, str]]: + url_dict = {} + scheme = scope.get("scheme", "http") + server = scope.get("server", None) + path = scope.get("root_path", "") + scope.get("path", "") + + url_dict["protocol"] = scheme + ":" + + if host: + url = f"{scheme}://{host}{path}" + url_dict["hostname"] = host + elif server is not None: + host, port = server + url_dict["hostname"] = host + if port: + url_dict["port"] = port + default_port = default_ports.get(scheme, None) + if port != default_port: + url = f"{scheme}://{host}:{port}{path}" + else: + url = f"{scheme}://{host}{path}" + else: + url = path + qs = scope.get("query_string") + if qs: + query = "?" + urllib.parse.unquote(qs.decode("latin-1")) + url += query + url_dict["search"] = encoding.keyword_field(query) + url_dict["full"] = encoding.keyword_field(url) + return url, url_dict + + def get_ip(self, scope: "Scope", headers: dict) -> Optional[str]: + x_forwarded_for = headers.get("x-forwarded-for") + remote_addr = headers.get("remote-addr") + ip: Optional[str] = None + if x_forwarded_for: + ip = x_forwarded_for.split(",")[0] + elif remote_addr: + ip = remote_addr + elif scope.get("client"): + ip = scope.get("client")[0] + return ip + + async def get_data_from_request(self, scope: "Scope", event_type: str, body: Optional[str]) -> dict: + """Loads data from incoming request for APM capturing. + + Args: + request (Request) + config (Config) + event_type (str) + body (str) + + Returns: + dict + """ + headers = self.get_headers(scope) + result = { + "method": scope["method"], + "socket": {"remote_address": self.get_ip(scope, headers)}, + "cookies": headers.pop("cookies", {}), + } + if self.client.config.capture_headers: + result["headers"] = headers + if body and self.client.config.capture_body in ("all", event_type): + result["body"] = body + url, url_dict = self.get_url(scope) + result["url"] = url_dict + + return result + + async def get_data_from_response(self, message: dict, event_type: str) -> dict: + """Loads data from response for APM capturing. + + Args: + message (dict) + config (Config) + event_type (str) + + Returns: + dict + """ + result = {} + + if "status" in message: + result["status_code"] = message["status"] + + if self.client.config.capture_headers and "headers" in message: + headers = self.get_headers(message) + if headers: + result["headers"] = headers + + return result + + def set_transaction_name(self, method: str, url: str): + """ + Default implementation sets transaction name to "METHOD unknown route". + Subclasses may add framework specific naming. + """ + elasticapm.set_transaction_name(f"{method.upper()} unknown route") diff --git a/elasticapm/utils/disttracing.py b/elasticapm/utils/disttracing.py index 1372b17af..63dd4996a 100644 --- a/elasticapm/utils/disttracing.py +++ b/elasticapm/utils/disttracing.py @@ -188,6 +188,8 @@ def merge_duplicate_headers(cls, headers, key): # this works for all known WSGI implementations if isinstance(headers, list): return ",".join([item[1] for item in headers if item[0] == key]) + elif not hasattr(headers, "get") and hasattr(headers, "__iter__"): + return ",".join([item[1] for item in headers if item[0] == key]) return headers.get(key) def _parse_tracestate(self, tracestate) -> Dict[str, str]: diff --git a/setup.cfg b/setup.cfg index 539d6a46f..2a70d8eb7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -160,6 +160,7 @@ markers = httpx prometheus_client sanic + asgi jinja2 aiobotocore kafka diff --git a/tests/contrib/asgi/__init__.py b/tests/contrib/asgi/__init__.py new file mode 100644 index 000000000..286063a32 --- /dev/null +++ b/tests/contrib/asgi/__init__.py @@ -0,0 +1,29 @@ +# BSD 3-Clause License +# +# Copyright (c) 2022, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/tests/contrib/asgi/app.py b/tests/contrib/asgi/app.py new file mode 100644 index 000000000..bd73caddf --- /dev/null +++ b/tests/contrib/asgi/app.py @@ -0,0 +1,61 @@ +# BSD 3-Clause License +# +# Copyright (c) 2022, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import asyncio + +from quart import Quart, Response, jsonify + +from elasticapm import async_capture_span + +app = Quart(__name__) + + +@app.route("/", methods=["GET", "POST"]) +async def root(): + async with async_capture_span("sleep"): + await asyncio.sleep(0.001) + return "OK" + + +@app.route("/foo") +async def foo(): + resp = Response("foo") + resp.headers["foo"] = "bar" + return resp + + +@app.route("/boom") +async def boom(): + assert False + + +@app.route("/body") +async def json(): + return jsonify({"hello": "world"}) diff --git a/tests/contrib/asgi/asgi_tests.py b/tests/contrib/asgi/asgi_tests.py new file mode 100644 index 000000000..824a23b68 --- /dev/null +++ b/tests/contrib/asgi/asgi_tests.py @@ -0,0 +1,139 @@ +# BSD 3-Clause License +# +# Copyright (c) 2022, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import pytest # isort:skip + +async_asgi_testclient = pytest.importorskip("async_asgi_testclient") # isort:skip + +from elasticapm.conf import constants +from elasticapm.contrib.asgi import ASGITracingMiddleware +from tests.contrib.asgi.app import app + +pytestmark = pytest.mark.asgi + + +@pytest.fixture(scope="function") +def instrumented_app(elasticapm_client): + return ASGITracingMiddleware(app) + + +@pytest.mark.asyncio +async def test_transaction_span(instrumented_app, elasticapm_client): + async with async_asgi_testclient.TestClient(instrumented_app) as client: + resp = await client.get("/") + assert resp.status_code == 200 + assert resp.text == "OK" + + assert len(elasticapm_client.events[constants.TRANSACTION]) == 1 + assert len(elasticapm_client.events[constants.SPAN]) == 1 + transaction = elasticapm_client.events[constants.TRANSACTION][0] + span = elasticapm_client.events[constants.SPAN][0] + assert transaction["name"] == "GET unknown route" + assert transaction["result"] == "HTTP 2xx" + assert transaction["outcome"] == "success" + assert transaction["context"]["request"]["url"]["full"] == "/" + assert transaction["context"]["response"]["status_code"] == 200 + + assert span["name"] == "sleep" + assert span["outcome"] == "success" + assert span["sync"] == False + + +@pytest.mark.asyncio +async def test_transaction_ignore_url(instrumented_app, elasticapm_client): + elasticapm_client.config.update("1", transaction_ignore_urls="/foo*") + + async with async_asgi_testclient.TestClient(instrumented_app) as client: + resp = await client.get("/foo") + assert resp.status_code == 200 + assert resp.text == "foo" + + assert len(elasticapm_client.events[constants.TRANSACTION]) == 0 + + +@pytest.mark.asyncio +async def test_transaction_headers(instrumented_app, elasticapm_client): + elasticapm_client.config.update("1", capture_headers="true") + + async with async_asgi_testclient.TestClient(instrumented_app) as client: + resp = await client.get("/foo", headers={"baz": "bazzinga"}) + assert resp.status_code == 200 + assert resp.text == "foo" + + assert len(elasticapm_client.events[constants.TRANSACTION]) == 1 + transaction = elasticapm_client.events[constants.TRANSACTION][0] + assert transaction["context"]["request"]["headers"]["baz"] == "bazzinga" + assert transaction["context"]["response"]["headers"]["foo"] == "bar" + + elasticapm_client.config.update("1", capture_headers="false") + + async with async_asgi_testclient.TestClient(instrumented_app) as client: + resp = await client.get("/foo", headers={"baz": "bazzinga"}) + assert resp.status_code == 200 + assert resp.text == "foo" + + transaction = elasticapm_client.events[constants.TRANSACTION][1] + assert "headers" not in transaction["context"]["request"] + assert "headers" not in transaction["context"]["response"] + + +@pytest.mark.asyncio +async def test_transaction_body(instrumented_app, elasticapm_client): + elasticapm_client.config.update("1", capture_body="transactions") + + async with async_asgi_testclient.TestClient(instrumented_app) as client: + resp = await client.post("/", data="foo") + assert resp.status_code == 200 + assert resp.text == "OK" + + transaction = elasticapm_client.events[constants.TRANSACTION][0] + assert "body" in transaction["context"]["request"] + assert transaction["context"]["request"]["body"] == "foo" + + +# for some reason, exceptions don't seem to bubble up to our middleware with the two ASGI +# frameworks I tested (Quart, Sanic) + +# @pytest.mark.asyncio +# async def test_transaction_exception(instrumented_app, elasticapm_client): +# async with async_asgi_testclient.TestClient(instrumented_app) as client: +# resp = await client.get("/boom") +# assert resp.status_code == 500 +# +# assert len(elasticapm_client.events[constants.TRANSACTION]) == 1 +# +# transaction = elasticapm_client.events[constants.TRANSACTION][0] +# assert transaction["name"] == "GET unknown route" +# assert transaction["result"] == "HTTP 5xx" +# # assert transaction["outcome"] == "failure" +# +# assert len(elasticapm_client.events[constants.ERROR]) == 1 +# error = elasticapm_client.events[constants.ERROR][0] +# pass diff --git a/tests/requirements/reqs-asgi-2.txt b/tests/requirements/reqs-asgi-2.txt new file mode 100644 index 000000000..eecc89d9a --- /dev/null +++ b/tests/requirements/reqs-asgi-2.txt @@ -0,0 +1,6 @@ +quart==0.6.13 +MarkupSafe<2.1 +jinja2==3.0.3 +async-asgi-testclient +asgiref +-r reqs-base.txt diff --git a/tests/requirements/reqs-asgi-newest.txt b/tests/requirements/reqs-asgi-newest.txt new file mode 100644 index 000000000..db55a6ebe --- /dev/null +++ b/tests/requirements/reqs-asgi-newest.txt @@ -0,0 +1,4 @@ +quart +async-asgi-testclient +asgiref +-r reqs-base.txt diff --git a/tests/scripts/envs/asgi.sh b/tests/scripts/envs/asgi.sh new file mode 100644 index 000000000..813f9e8b9 --- /dev/null +++ b/tests/scripts/envs/asgi.sh @@ -0,0 +1 @@ +export PYTEST_MARKER="-m asgi"