diff --git a/.pylintrc b/.pylintrc index 01c96ea3953..5f9463df7d5 100644 --- a/.pylintrc +++ b/.pylintrc @@ -226,7 +226,7 @@ dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_ # Argument names that match this expression will be ignored. Default to name # with leading underscore. -ignored-argument-names=_.*|^ignored_|^unused_ +ignored-argument-names=_.*|^ignored_|^unused_|^kwargs|^args # Tells whether we should check for unused import in __init__ files. init-import=no diff --git a/dev-requirements.txt b/dev-requirements.txt index 8ea405d92ab..3808b02143a 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -8,4 +8,4 @@ sphinx-rtd-theme~=0.4 sphinx-autodoc-typehints~=1.10.2 pytest!=5.2.3 pytest-cov>=2.8 -readme-renderer~=24.0 \ No newline at end of file +readme-renderer~=24.0 diff --git a/docs-requirements.txt b/docs-requirements.txt index 10ccf1b21c3..84f80b30e59 100644 --- a/docs-requirements.txt +++ b/docs-requirements.txt @@ -21,6 +21,7 @@ redis>=2.6 sqlalchemy>=1.0 thrift>=0.10.0 wrapt>=1.0.0,<2.0.0 +celery>=4.0 psutil~=5.7.0 boto~=2.0 google-cloud-trace >=0.23.0 diff --git a/docs/ext/celery/celery.rst b/docs/ext/celery/celery.rst new file mode 100644 index 00000000000..125233e006f --- /dev/null +++ b/docs/ext/celery/celery.rst @@ -0,0 +1,7 @@ +OpenTelemetry Celery Instrumentation +==================================== + +.. automodule:: opentelemetry.ext.celery + :members: + :undoc-members: + :show-inheritance: diff --git a/ext/opentelemetry-ext-celery/CHANGELOG.md b/ext/opentelemetry-ext-celery/CHANGELOG.md new file mode 100644 index 00000000000..dd6bf18c9d3 --- /dev/null +++ b/ext/opentelemetry-ext-celery/CHANGELOG.md @@ -0,0 +1,5 @@ +# Changelog + +## Unreleased + +- Add instrumentation for Celery ([#780](https://github.com/open-telemetry/opentelemetry-python/pull/780)) diff --git a/ext/opentelemetry-ext-celery/LICENSE b/ext/opentelemetry-ext-celery/LICENSE new file mode 100644 index 00000000000..261eeb9e9f8 --- /dev/null +++ b/ext/opentelemetry-ext-celery/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/ext/opentelemetry-ext-celery/MANIFEST.in b/ext/opentelemetry-ext-celery/MANIFEST.in new file mode 100644 index 00000000000..aed3e33273b --- /dev/null +++ b/ext/opentelemetry-ext-celery/MANIFEST.in @@ -0,0 +1,9 @@ +graft src +graft tests +global-exclude *.pyc +global-exclude *.pyo +global-exclude __pycache__/* +include CHANGELOG.md +include MANIFEST.in +include README.rst +include LICENSE diff --git a/ext/opentelemetry-ext-celery/README.rst b/ext/opentelemetry-ext-celery/README.rst new file mode 100644 index 00000000000..feabf6809fc --- /dev/null +++ b/ext/opentelemetry-ext-celery/README.rst @@ -0,0 +1,50 @@ +OpenTelemetry Celery Instrumentation +==================================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-ext-celery.svg + :target: https://pypi.org/project/opentelemetry-ext-celery/ + +Instrumentation for Celery. + + +Installation +------------ + +:: + + pip install opentelemetry-ext-celery + +Usage +----- + +* Start broker backend + +:: + docker run -p 5672:5672 rabbitmq + + +* Run instrumented task + +.. code-block:: python + + from opentelemetry.ext.celery import CeleryInstrumentor + + CeleryInstrumentor().instrument() + + from celery import Celery + + app = Celery("tasks", broker="amqp://localhost") + + @app.task + def add(x, y): + return x + y + + add.delay(42, 50) + +References +---------- +* `OpenTelemetry Celery Instrumentation `_ +* `OpenTelemetry Project `_ + diff --git a/ext/opentelemetry-ext-celery/setup.cfg b/ext/opentelemetry-ext-celery/setup.cfg new file mode 100644 index 00000000000..ecb42b7fb74 --- /dev/null +++ b/ext/opentelemetry-ext-celery/setup.cfg @@ -0,0 +1,56 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[metadata] +name = opentelemetry-ext-celery +description = OpenTelemetry Celery Instrumentation +long_description = file: README.rst +long_description_content_type = text/x-rst +author = OpenTelemetry Authors +author_email = cncf-opentelemetry-contributors@lists.cncf.io +url = https://github.com/open-telemetry/opentelemetry-python/tree/master/ext/opentelemetry-ext-celery +platforms = any +license = Apache-2.0 +classifiers = + Development Status :: 4 - Beta + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.5 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + +[options] +python_requires = >=3.5 +package_dir= + =src +packages=find_namespace: +install_requires = + opentelemetry-api == 0.10.dev0 + opentelemetry-instrumentation == 0.10.dev0 + celery ~= 4.0 + +[options.extras_require] +test = + pytest + opentelemetry-test == 0.10.dev0 + +[options.packages.find] +where = src + +[options.entry_points] +opentelemetry_instrumentor = + celery = opentelemetry.ext.celery:CeleryInstrumentor diff --git a/ext/opentelemetry-ext-celery/setup.py b/ext/opentelemetry-ext-celery/setup.py new file mode 100644 index 00000000000..40d1d7aaba9 --- /dev/null +++ b/ext/opentelemetry-ext-celery/setup.py @@ -0,0 +1,26 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +import setuptools + +BASE_DIR = os.path.dirname(__file__) +VERSION_FILENAME = os.path.join( + BASE_DIR, "src", "opentelemetry", "ext", "celery", "version.py" +) +PACKAGE_INFO = {} +with open(VERSION_FILENAME) as f: + exec(f.read(), PACKAGE_INFO) + +setuptools.setup(version=PACKAGE_INFO["__version__"]) diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py new file mode 100644 index 00000000000..9ce31f34f65 --- /dev/null +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py @@ -0,0 +1,223 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Instrument `celery`_ to trace Celery applications. + +.. _celery: https://pypi.org/project/celery/ + +Usage +----- + +* Start broker backend + +.. code:: + + docker run -p 5672:5672 rabbitmq + + +* Run instrumented task + +.. code:: python + + from opentelemetry.ext.celery import CeleryInstrumentor + + CeleryInstrumentor().instrument() + + from celery import Celery + + app = Celery("tasks", broker="amqp://localhost") + + @app.task + def add(x, y): + return x + y + + add.delay(42, 50) + +API +--- +""" + +import logging +import signal + +from celery import signals # pylint: disable=no-name-in-module + +from opentelemetry import trace +from opentelemetry.ext.celery import utils +from opentelemetry.ext.celery.version import __version__ +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.trace.status import Status, StatusCanonicalCode + +logger = logging.getLogger(__name__) + +# Task operations +_TASK_TAG_KEY = "celery.action" +_TASK_APPLY_ASYNC = "apply_async" +_TASK_RUN = "run" + +_TASK_RETRY_REASON_KEY = "celery.retry.reason" +_TASK_REVOKED_REASON_KEY = "celery.revoked.reason" +_TASK_REVOKED_TERMINATED_SIGNAL_KEY = "celery.terminated.signal" +_TASK_NAME_KEY = "celery.task_name" +_MESSAGE_ID_ATTRIBUTE_NAME = "messaging.message_id" + + +class CeleryInstrumentor(BaseInstrumentor): + def _instrument(self, **kwargs): + tracer_provider = kwargs.get("tracer_provider") + + # pylint: disable=attribute-defined-outside-init + self._tracer = trace.get_tracer(__name__, __version__, tracer_provider) + + signals.task_prerun.connect(self._trace_prerun, weak=False) + signals.task_postrun.connect(self._trace_postrun, weak=False) + signals.before_task_publish.connect( + self._trace_before_publish, weak=False + ) + signals.after_task_publish.connect( + self._trace_after_publish, weak=False + ) + signals.task_failure.connect(self._trace_failure, weak=False) + signals.task_retry.connect(self._trace_retry, weak=False) + + def _uninstrument(self, **kwargs): + signals.task_prerun.disconnect(self._trace_prerun) + signals.task_postrun.disconnect(self._trace_postrun) + signals.before_task_publish.disconnect(self._trace_before_publish) + signals.after_task_publish.disconnect(self._trace_after_publish) + signals.task_failure.disconnect(self._trace_failure) + signals.task_retry.disconnect(self._trace_retry) + + def _trace_prerun(self, *args, **kwargs): + task = utils.retrieve_task(kwargs) + task_id = utils.retrieve_task_id(kwargs) + + if task is None or task_id is None: + return + + logger.debug("prerun signal start task_id=%s", task_id) + + span = self._tracer.start_span(task.name, kind=trace.SpanKind.CONSUMER) + + activation = self._tracer.use_span(span, end_on_exit=True) + activation.__enter__() + utils.attach_span(task, task_id, (span, activation)) + + @staticmethod + def _trace_postrun(*args, **kwargs): + task = utils.retrieve_task(kwargs) + task_id = utils.retrieve_task_id(kwargs) + + if task is None or task_id is None: + return + + logger.debug("postrun signal task_id=%s", task_id) + + # retrieve and finish the Span + span, activation = utils.retrieve_span(task, task_id) + if span is None: + logger.warning("no existing span found for task_id=%s", task_id) + return + + # request context tags + span.set_attribute(_TASK_TAG_KEY, _TASK_RUN) + utils.set_attributes_from_context(span, kwargs) + utils.set_attributes_from_context(span, task.request) + span.set_attribute(_TASK_NAME_KEY, task.name) + + activation.__exit__(None, None, None) + utils.detach_span(task, task_id) + + def _trace_before_publish(self, *args, **kwargs): + task = utils.retrieve_task_from_sender(kwargs) + task_id = utils.retrieve_task_id_from_message(kwargs) + + if task is None or task_id is None: + return + + span = self._tracer.start_span(task.name, kind=trace.SpanKind.PRODUCER) + + # apply some attributes here because most of the data is not available + span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC) + span.set_attribute(_MESSAGE_ID_ATTRIBUTE_NAME, task_id) + span.set_attribute(_TASK_NAME_KEY, task.name) + utils.set_attributes_from_context(span, kwargs) + + activation = self._tracer.use_span(span, end_on_exit=True) + activation.__enter__() + utils.attach_span(task, task_id, (span, activation), is_publish=True) + + @staticmethod + def _trace_after_publish(*args, **kwargs): + task = utils.retrieve_task_from_sender(kwargs) + task_id = utils.retrieve_task_id_from_message(kwargs) + + if task is None or task_id is None: + return + + # retrieve and finish the Span + _, activation = utils.retrieve_span(task, task_id, is_publish=True) + if activation is None: + logger.warning("no existing span found for task_id=%s", task_id) + return + + activation.__exit__(None, None, None) + utils.detach_span(task, task_id, is_publish=True) + + @staticmethod + def _trace_failure(*args, **kwargs): + task = utils.retrieve_task_from_sender(kwargs) + task_id = utils.retrieve_task_id(kwargs) + + if task is None or task_id is None: + return + + # retrieve and pass exception info to activation + span, _ = utils.retrieve_span(task, task_id) + if span is None: + return + + status_kwargs = {"canonical_code": StatusCanonicalCode.UNKNOWN} + + ex = kwargs.get("einfo") + + if ( + hasattr(task, "throws") + and ex is not None + and isinstance(ex.exception, task.throws) + ): + return + + if ex is not None: + status_kwargs["description"] = str(ex) + + span.set_status(Status(**status_kwargs)) + + @staticmethod + def _trace_retry(*args, **kwargs): + task = utils.retrieve_task_from_sender(kwargs) + task_id = utils.retrieve_task_id_from_request(kwargs) + reason = utils.retrieve_reason(kwargs) + + if task is None or task_id is None or reason is None: + return + + span, _ = utils.retrieve_span(task, task_id) + if span is None: + return + + # Add retry reason metadata to span + # Use `str(reason)` instead of `reason.message` in case we get + # something that isn't an `Exception` + span.set_attribute(_TASK_RETRY_REASON_KEY, str(reason)) diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py new file mode 100644 index 00000000000..60fe52f04ef --- /dev/null +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py @@ -0,0 +1,219 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from celery import registry # pylint: disable=no-name-in-module + +logger = logging.getLogger(__name__) + +# Celery Context key +CTX_KEY = "__otel_task_span" + +# Celery Context attributes +CELERY_CONTEXT_ATTRIBUTES = ( + "compression", + "correlation_id", + "countdown", + "delivery_info", + "declare", + "eta", + "exchange", + "expires", + "hostname", + "id", + "priority", + "queue", + "reply_to", + "retries", + "routing_key", + "serializer", + "timelimit", + "origin", + "state", +) + + +# pylint:disable=too-many-branches +def set_attributes_from_context(span, context): + """Helper to extract meta values from a Celery Context""" + for key in CELERY_CONTEXT_ATTRIBUTES: + value = context.get(key) + + # Skip this key if it is not set + if value is None or value == "": + continue + + # Skip `timelimit` if it is not set (it's default/unset value is a + # tuple or a list of `None` values + if key == "timelimit" and value in [(None, None), [None, None]]: + continue + + # Skip `retries` if it's value is `0` + if key == "retries" and value == 0: + continue + + attribute_name = None + + # Celery 4.0 uses `origin` instead of `hostname`; this change preserves + # the same name for the tag despite Celery version + if key == "origin": + key = "hostname" + + elif key == "delivery_info": + # Get also destination from this + routing_key = value.get("routing_key") + if routing_key is not None: + span.set_attribute("messaging.destination", routing_key) + value = str(value) + + elif key == "id": + attribute_name = "messaging.message_id" + + elif key == "correlation_id": + attribute_name = "messaging.conversation_id" + + elif key == "routing_key": + attribute_name = "messaging.destination" + + # according to https://docs.celeryproject.org/en/stable/userguide/routing.html#exchange-types + elif key == "declare": + attribute_name = "messaging.destination_kind" + for declare in value: + if declare.exchange.type == "direct": + value = "queue" + break + if declare.exchange.type == "topic": + value = "topic" + break + + # set attribute name if not set specially for a key + if attribute_name is None: + attribute_name = "celery.{}".format(key) + + span.set_attribute(attribute_name, value) + + +def attach_span(task, task_id, span, is_publish=False): + """Helper to propagate a `Span` for the given `Task` instance. This + function uses a `dict` that stores the Span using the + `(task_id, is_publish)` as a key. This is useful when information must be + propagated from one Celery signal to another. + + We use (task_id, is_publish) for the key to ensure that publishing a + task from within another task does not cause any conflicts. + + This mostly happens when either a task fails and a retry policy is in place, + or when a task is manually retries (e.g. `task.retry()`), we end up trying + to publish a task with the same id as the task currently running. + + Previously publishing the new task would overwrite the existing `celery.run` span + in the `dict` causing that span to be forgotten and never finished + NOTE: We cannot test for this well yet, because we do not run a celery worker, + and cannot run `task.apply_async()` + """ + span_dict = getattr(task, CTX_KEY, None) + if span_dict is None: + span_dict = dict() + setattr(task, CTX_KEY, span_dict) + + span_dict[(task_id, is_publish)] = span + + +def detach_span(task, task_id, is_publish=False): + """Helper to remove a `Span` in a Celery task when it's propagated. + This function handles tasks where the `Span` is not attached. + """ + span_dict = getattr(task, CTX_KEY, None) + if span_dict is None: + return + + # See note in `attach_span` for key info + span_dict.pop((task_id, is_publish), (None, None)) + + +def retrieve_span(task, task_id, is_publish=False): + """Helper to retrieve an active `Span` stored in a `Task` + instance + """ + span_dict = getattr(task, CTX_KEY, None) + if span_dict is None: + return (None, None) + + # See note in `attach_span` for key info + return span_dict.get((task_id, is_publish), (None, None)) + + +def retrieve_task(kwargs): + task = kwargs.get("task") + if task is None: + logger.debug("Unable to retrieve task from signal arguments") + return task + + +def retrieve_task_from_sender(kwargs): + sender = kwargs.get("sender") + if sender is None: + logger.debug("Unable to retrieve the sender from signal arguments") + + # before and after publish signals sender is the task name + # for retry and failure signals sender is the task object + if isinstance(sender, str): + sender = registry.tasks.get(sender) + if sender is None: + logger.debug("Unable to retrieve the task from sender=%s", sender) + + return sender + + +def retrieve_task_id(kwargs): + task_id = kwargs.get("task_id") + if task_id is None: + logger.debug("Unable to retrieve task_id from signal arguments") + return task_id + + +def retrieve_task_id_from_request(kwargs): + # retry signal does not include task_id as argument so use request argument + request = kwargs.get("request") + if request is None: + logger.debug("Unable to retrieve the request from signal arguments") + + task_id = getattr(request, "id") + if task_id is None: + logger.debug("Unable to retrieve the task_id from the request") + + return task_id + + +def retrieve_task_id_from_message(kwargs): + """Helper to retrieve the `Task` identifier from the message `body`. + This helper supports Protocol Version 1 and 2. The Protocol is well + detailed in the official documentation: + http://docs.celeryproject.org/en/latest/internals/protocol.html + """ + headers = kwargs.get("headers") + body = kwargs.get("body") + if headers is not None and len(headers) > 0: + # Protocol Version 2 (default from Celery 4.0) + return headers.get("id") + # Protocol Version 1 + return body.get("id") + + +def retrieve_reason(kwargs): + reason = kwargs.get("reason") + if not reason: + logger.debug("Unable to retrieve the retry reason") + return reason diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/version.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/version.py new file mode 100644 index 00000000000..6d4fefa599e --- /dev/null +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.10.dev0" diff --git a/ext/opentelemetry-ext-celery/tests/__init__.py b/ext/opentelemetry-ext-celery/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ext/opentelemetry-ext-celery/tests/test_utils.py b/ext/opentelemetry-ext-celery/tests/test_utils.py new file mode 100644 index 00000000000..b5e8163def9 --- /dev/null +++ b/ext/opentelemetry-ext-celery/tests/test_utils.py @@ -0,0 +1,208 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest import mock + +from celery import Celery + +from opentelemetry import trace as trace_api +from opentelemetry.ext.celery import utils +from opentelemetry.sdk import trace + + +class TestUtils(unittest.TestCase): + def setUp(self): + self.app = Celery("celery.test_app") + + def test_set_attributes_from_context(self): + # it should extract only relevant keys + context = { + "correlation_id": "44b7f305", + "delivery_info": {"eager": True}, + "eta": "soon", + "expires": "later", + "hostname": "localhost", + "id": "44b7f305", + "reply_to": "44b7f305", + "retries": 4, + "timelimit": ("now", "later"), + "custom_meta": "custom_value", + "routing_key": "celery", + } + + span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext)) + utils.set_attributes_from_context(span, context) + + self.assertEqual( + span.attributes.get("messaging.message_id"), "44b7f305" + ) + self.assertEqual( + span.attributes.get("messaging.conversation_id"), "44b7f305" + ) + self.assertEqual( + span.attributes.get("messaging.destination"), "celery" + ) + + self.assertEqual( + span.attributes["celery.delivery_info"], str({"eager": True}) + ) + self.assertEqual(span.attributes.get("celery.eta"), "soon") + self.assertEqual(span.attributes.get("celery.expires"), "later") + self.assertEqual(span.attributes.get("celery.hostname"), "localhost") + + self.assertEqual(span.attributes.get("celery.reply_to"), "44b7f305") + self.assertEqual(span.attributes.get("celery.retries"), 4) + self.assertEqual( + span.attributes.get("celery.timelimit"), ("now", "later") + ) + self.assertNotIn("custom_meta", span.attributes) + + def test_set_attributes_from_context_empty_keys(self): + # it should not extract empty keys + context = { + "correlation_id": None, + "exchange": "", + "timelimit": (None, None), + "retries": 0, + } + + span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext)) + utils.set_attributes_from_context(span, context) + + self.assertEqual(len(span.attributes), 0) + # edge case: `timelimit` can also be a list of None values + context = { + "timelimit": [None, None], + } + + utils.set_attributes_from_context(span, context) + + self.assertEqual(len(span.attributes), 0) + + def test_span_propagation(self): + # ensure spans getter and setter works properly + @self.app.task + def fn_task(): + return 42 + + # propagate and retrieve a Span + task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f" + span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext)) + utils.attach_span(fn_task, task_id, span) + span_after = utils.retrieve_span(fn_task, task_id) + self.assertIs(span, span_after) + + def test_span_delete(self): + # ensure the helper removes properly a propagated Span + @self.app.task + def fn_task(): + return 42 + + # propagate a Span + task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f" + span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext)) + utils.attach_span(fn_task, task_id, span) + # delete the Span + utils.detach_span(fn_task, task_id) + self.assertEqual(utils.retrieve_span(fn_task, task_id), (None, None)) + + def test_span_delete_empty(self): + # ensure detach_span doesn't raise an exception if span is not present + @self.app.task + def fn_task(): + return 42 + + # delete the Span + task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f" + try: + utils.detach_span(fn_task, task_id) + self.assertEqual( + utils.retrieve_span(fn_task, task_id), (None, None) + ) + except Exception as ex: # pylint: disable=broad-except + self.fail("Exception was raised: %s" % ex) + + def test_task_id_from_protocol_v1(self): + # ensures a `task_id` is properly returned when Protocol v1 is used. + # `context` is an example of an emitted Signal with Protocol v1 + context = { + "body": { + "expires": None, + "utc": True, + "args": ["user"], + "chord": None, + "callbacks": None, + "errbacks": None, + "taskset": None, + "id": "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7", + "retries": 0, + "task": "tests.contrib.celery.test_integration.fn_task_parameters", + "timelimit": (None, None), + "eta": None, + "kwargs": {"force_logout": True}, + }, + "sender": "tests.contrib.celery.test_integration.fn_task_parameters", + "exchange": "celery", + "routing_key": "celery", + "retry_policy": None, + "headers": {}, + "properties": {}, + } + + task_id = utils.retrieve_task_id_from_message(context) + self.assertEqual(task_id, "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7") + + def test_task_id_from_protocol_v2(self): + # ensures a `task_id` is properly returned when Protocol v2 is used. + # `context` is an example of an emitted Signal with Protocol v2 + context = { + "body": ( + ["user"], + {"force_logout": True}, + { + u"chord": None, + u"callbacks": None, + u"errbacks": None, + u"chain": None, + }, + ), + "sender": u"tests.contrib.celery.test_integration.fn_task_parameters", + "exchange": u"", + "routing_key": u"celery", + "retry_policy": None, + "headers": { + u"origin": u"gen83744@hostname", + u"root_id": "7e917b83-4018-431d-9832-73a28e1fb6c0", + u"expires": None, + u"shadow": None, + u"id": "7e917b83-4018-431d-9832-73a28e1fb6c0", + u"kwargsrepr": u"{'force_logout': True}", + u"lang": u"py", + u"retries": 0, + u"task": u"tests.contrib.celery.test_integration.fn_task_parameters", + u"group": None, + u"timelimit": [None, None], + u"parent_id": None, + u"argsrepr": u"['user']", + u"eta": None, + }, + "properties": { + u"reply_to": "c3054a07-5b28-3855-b18c-1623a24aaeca", + u"correlation_id": "7e917b83-4018-431d-9832-73a28e1fb6c0", + }, + } + + task_id = utils.retrieve_task_id_from_message(context) + self.assertEqual(task_id, "7e917b83-4018-431d-9832-73a28e1fb6c0") diff --git a/ext/opentelemetry-ext-docker-tests/tests/celery/conftest.py b/ext/opentelemetry-ext-docker-tests/tests/celery/conftest.py new file mode 100644 index 00000000000..0e6976382e7 --- /dev/null +++ b/ext/opentelemetry-ext-docker-tests/tests/celery/conftest.py @@ -0,0 +1,92 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from functools import wraps + +import pytest + +from opentelemetry import trace as trace_api +from opentelemetry.ext.celery import CeleryInstrumentor +from opentelemetry.sdk.trace import TracerProvider, export +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) + +REDIS_HOST = os.getenv("REDIS_HOST", "localhost") +REDIS_PORT = int(os.getenv("REDIS_PORT ", "6379")) +REDIS_URL = "redis://{host}:{port}".format(host=REDIS_HOST, port=REDIS_PORT) +BROKER_URL = "{redis}/{db}".format(redis=REDIS_URL, db=0) +BACKEND_URL = "{redis}/{db}".format(redis=REDIS_URL, db=1) + + +@pytest.fixture(scope="session") +def celery_config(): + return {"broker_url": BROKER_URL, "result_backend": BACKEND_URL} + + +@pytest.fixture +def celery_worker_parameters(): + return { + # See https://github.com/celery/celery/issues/3642#issuecomment-457773294 + "perform_ping_check": False, + } + + +@pytest.fixture(autouse=True) +def patch_celery_app(celery_app, celery_worker): + """Patch task decorator on app fixture to reload worker""" + # See https://github.com/celery/celery/issues/3642 + def wrap_task(fn): + @wraps(fn) + def wrapper(*args, **kwargs): + result = fn(*args, **kwargs) + celery_worker.reload() + return result + + return wrapper + + celery_app.task = wrap_task(celery_app.task) + + +@pytest.fixture(autouse=True) +def instrument(tracer_provider, memory_exporter): + CeleryInstrumentor().instrument(tracer_provider=tracer_provider) + memory_exporter.clear() + + yield + + CeleryInstrumentor().uninstrument() + + +@pytest.fixture(scope="session") +def tracer_provider(memory_exporter): + original_tracer_provider = trace_api.get_tracer_provider() + + tracer_provider = TracerProvider() + + span_processor = export.SimpleExportSpanProcessor(memory_exporter) + tracer_provider.add_span_processor(span_processor) + + trace_api.set_tracer_provider(tracer_provider) + + yield tracer_provider + + trace_api.set_tracer_provider(original_tracer_provider) + + +@pytest.fixture(scope="session") +def memory_exporter(): + memory_exporter = InMemorySpanExporter() + return memory_exporter diff --git a/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py b/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py new file mode 100644 index 00000000000..570a12b7588 --- /dev/null +++ b/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py @@ -0,0 +1,523 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import time + +import celery +import pytest +from celery import signals +from celery.exceptions import Retry + +import opentelemetry.ext.celery +from opentelemetry import trace as trace_api +from opentelemetry.ext.celery import CeleryInstrumentor +from opentelemetry.sdk import resources +from opentelemetry.sdk.trace import TracerProvider, export +from opentelemetry.trace.status import StatusCanonicalCode + + +class MyException(Exception): + pass + + +def test_instrumentation_info(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 + + result = fn_task.apply_async() + assert result.get() == 42 + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 2 + + async_span, run_span = spans + + assert ( + async_span.instrumentation_info.name + == opentelemetry.ext.celery.__name__ + ) + assert ( + async_span.instrumentation_info.version + == opentelemetry.ext.celery.__version__ + ) + assert ( + run_span.instrumentation_info.name == opentelemetry.ext.celery.__name__ + ) + assert ( + run_span.instrumentation_info.version + == opentelemetry.ext.celery.__version__ + ) + + +def test_fn_task_run(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 + + t = fn_task.run() + assert t == 42 + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 0 + + +def test_fn_task_call(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 + + t = fn_task() + assert t == 42 + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 0 + + +def test_fn_task_apply(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 + + t = fn_task.apply() + assert t.successful() is True + assert t.result == 42 + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.name == "test_celery_functional.fn_task" + assert span.attributes.get("messaging.message_id") == t.task_id + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task" + ) + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "SUCCESS" + + +def test_fn_task_apply_bind(celery_app, memory_exporter): + @celery_app.task(bind=True) + def fn_task(self): + return self + + t = fn_task.apply() + assert t.successful() is True + assert "fn_task" in t.result.name + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.name == "test_celery_functional.fn_task" + assert span.attributes.get("messaging.message_id") == t.task_id + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task" + ) + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "SUCCESS" + + +def test_fn_task_apply_async(celery_app, memory_exporter): + @celery_app.task + def fn_task_parameters(user, force_logout=False): + return (user, force_logout) + + result = fn_task_parameters.apply_async( + args=["user"], kwargs={"force_logout": True} + ) + assert result.get(timeout=10) == ["user", True] + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 2 + + async_span, run_span = spans + + assert run_span.context.trace_id != async_span.context.trace_id + + assert async_span.status.is_ok is True + assert async_span.name == "test_celery_functional.fn_task_parameters" + assert async_span.attributes.get("celery.action") == "apply_async" + assert async_span.attributes.get("messaging.message_id") == result.task_id + assert ( + async_span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task_parameters" + ) + + assert run_span.status.is_ok is True + assert run_span.name == "test_celery_functional.fn_task_parameters" + assert run_span.attributes.get("celery.action") == "run" + assert run_span.attributes.get("celery.state") == "SUCCESS" + assert run_span.attributes.get("messaging.message_id") == result.task_id + assert ( + run_span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task_parameters" + ) + + +def test_concurrent_delays(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 + + results = [fn_task.delay() for _ in range(100)] + + for result in results: + assert result.get(timeout=1) == 42 + + spans = memory_exporter.get_finished_spans() + + assert len(spans) == 200 + + +def test_fn_task_delay(celery_app, memory_exporter): + @celery_app.task + def fn_task_parameters(user, force_logout=False): + return (user, force_logout) + + result = fn_task_parameters.delay("user", force_logout=True) + assert result.get(timeout=10) == ["user", True] + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 2 + + async_span, run_span = spans + + assert run_span.context.trace_id != async_span.context.trace_id + + assert async_span.status.is_ok is True + assert async_span.name == "test_celery_functional.fn_task_parameters" + assert async_span.attributes.get("celery.action") == "apply_async" + assert async_span.attributes.get("messaging.message_id") == result.task_id + assert ( + async_span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task_parameters" + ) + + assert run_span.status.is_ok is True + assert run_span.name == "test_celery_functional.fn_task_parameters" + assert run_span.attributes.get("celery.action") == "run" + assert run_span.attributes.get("celery.state") == "SUCCESS" + assert run_span.attributes.get("messaging.message_id") == result.task_id + assert ( + run_span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task_parameters" + ) + + +def test_fn_exception(celery_app, memory_exporter): + @celery_app.task + def fn_exception(): + raise Exception("Task class is failing") + + result = fn_exception.apply() + + assert result.failed() is True + assert "Task class is failing" in result.traceback + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is False + assert span.name == "test_celery_functional.fn_exception" + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "FAILURE" + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.fn_exception" + ) + assert span.status.canonical_code == StatusCanonicalCode.UNKNOWN + assert span.attributes.get("messaging.message_id") == result.task_id + assert "Task class is failing" in span.status.description + + +def test_fn_exception_expected(celery_app, memory_exporter): + @celery_app.task(throws=(MyException,)) + def fn_exception(): + raise MyException("Task class is failing") + + result = fn_exception.apply() + + assert result.failed() is True + assert "Task class is failing" in result.traceback + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.status.canonical_code == StatusCanonicalCode.OK + assert span.name == "test_celery_functional.fn_exception" + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "FAILURE" + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.fn_exception" + ) + assert span.attributes.get("messaging.message_id") == result.task_id + + +def test_fn_retry_exception(celery_app, memory_exporter): + @celery_app.task + def fn_exception(): + raise Retry("Task class is being retried") + + result = fn_exception.apply() + + assert result.failed() is False + assert "Task class is being retried" in result.traceback + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.status.canonical_code == StatusCanonicalCode.OK + assert span.name == "test_celery_functional.fn_exception" + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "RETRY" + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.fn_exception" + ) + assert span.attributes.get("messaging.message_id") == result.task_id + + +def test_class_task(celery_app, memory_exporter): + class BaseTask(celery_app.Task): + def run(self): + return 42 + + task = BaseTask() + # register the Task class if it's available (required in Celery 4.0+) + register_task = getattr(celery_app, "register_task", None) + if register_task is not None: + register_task(task) + + result = task.apply() + + assert result.successful() is True + assert result.result == 42 + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.name == "test_celery_functional.BaseTask" + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.BaseTask" + ) + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "SUCCESS" + assert span.attributes.get("messaging.message_id") == result.task_id + + +def test_class_task_exception(celery_app, memory_exporter): + class BaseTask(celery_app.Task): + def run(self): + raise Exception("Task class is failing") + + task = BaseTask() + # register the Task class if it's available (required in Celery 4.0+) + register_task = getattr(celery_app, "register_task", None) + if register_task is not None: + register_task(task) + + result = task.apply() + + assert result.failed() is True + assert "Task class is failing" in result.traceback + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is False + assert span.name == "test_celery_functional.BaseTask" + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.BaseTask" + ) + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "FAILURE" + assert span.status.canonical_code == StatusCanonicalCode.UNKNOWN + assert span.attributes.get("messaging.message_id") == result.task_id + assert "Task class is failing" in span.status.description + + +def test_class_task_exception_excepted(celery_app, memory_exporter): + class BaseTask(celery_app.Task): + throws = (MyException,) + + def run(self): + raise MyException("Task class is failing") + + task = BaseTask() + # register the Task class if it's available (required in Celery 4.0+) + register_task = getattr(celery_app, "register_task", None) + if register_task is not None: + register_task(task) + + result = task.apply() + + assert result.failed() is True + assert "Task class is failing" in result.traceback + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.status.canonical_code == StatusCanonicalCode.OK + assert span.name == "test_celery_functional.BaseTask" + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "FAILURE" + assert span.attributes.get("messaging.message_id") == result.task_id + + +def test_shared_task(celery_app, memory_exporter): + """Ensure Django Shared Task are supported""" + + @celery.shared_task + def add(x, y): + return x + y + + result = add.apply([2, 2]) + assert result.result == 4 + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.name == "test_celery_functional.add" + assert ( + span.attributes.get("celery.task_name") == "test_celery_functional.add" + ) + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "SUCCESS" + assert span.attributes.get("messaging.message_id") == result.task_id + + +def test_apply_async_previous_style_tasks( + celery_app, celery_worker, memory_exporter +): + """Ensures apply_async is properly patched if Celery 1.0 style tasks are + used even in newer versions. This should extend support to previous versions + of Celery.""" + + class CelerySuperClass(celery.task.Task): + abstract = True + + @classmethod + def apply_async(cls, args=None, kwargs=None, **kwargs_): + return super(CelerySuperClass, cls).apply_async( + args=args, kwargs=kwargs, **kwargs_ + ) + + def run(self, *args, **kwargs): + if "stop" in kwargs: + # avoid call loop + return + CelerySubClass.apply_async(args=[], kwargs={"stop": True}).get( + timeout=10 + ) + + class CelerySubClass(CelerySuperClass): + pass + + celery_worker.reload() + + task = CelerySubClass() + result = task.apply() + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 3 + + async_span, async_run_span, run_span = spans + + assert run_span.status.is_ok is True + assert run_span.name == "test_celery_functional.CelerySubClass" + assert ( + run_span.attributes.get("celery.task_name") + == "test_celery_functional.CelerySubClass" + ) + assert run_span.attributes.get("celery.action") == "run" + assert run_span.attributes.get("celery.state") == "SUCCESS" + assert run_span.attributes.get("messaging.message_id") == result.task_id + + assert async_run_span.status.is_ok is True + assert async_run_span.name == "test_celery_functional.CelerySubClass" + assert ( + async_run_span.attributes.get("celery.task_name") + == "test_celery_functional.CelerySubClass" + ) + assert async_run_span.attributes.get("celery.action") == "run" + assert async_run_span.attributes.get("celery.state") == "SUCCESS" + assert ( + async_run_span.attributes.get("messaging.message_id") != result.task_id + ) + + assert async_span.status.is_ok is True + assert async_span.name == "test_celery_functional.CelerySubClass" + assert ( + async_span.attributes.get("celery.task_name") + == "test_celery_functional.CelerySubClass" + ) + assert async_span.attributes.get("celery.action") == "apply_async" + assert async_span.attributes.get("messaging.message_id") != result.task_id + assert async_span.attributes.get( + "messaging.message_id" + ) == async_run_span.attributes.get("messaging.message_id") + + +def test_custom_tracer_provider(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 + + resource = resources.Resource.create({}) + tracer_provider = TracerProvider(resource=resource) + span_processor = export.SimpleExportSpanProcessor(memory_exporter) + tracer_provider.add_span_processor(span_processor) + + trace_api.set_tracer_provider(tracer_provider) + + CeleryInstrumentor().uninstrument() + CeleryInstrumentor().instrument(tracer_provider=tracer_provider) + + fn_task.delay() + + spans_list = memory_exporter.get_finished_spans() + assert len(spans_list) == 1 + + span = spans_list[0] + assert span.resource == resource diff --git a/tox.ini b/tox.ini index 875f63fc3f2..e182be2e005 100644 --- a/tox.ini +++ b/tox.ini @@ -143,6 +143,10 @@ envlist = py3{4,5,6,7,8}-test-ext-redis pypy3-test-ext-redis + ; opentelemetry-ext-celery + py3{5,6,7,8}-test-ext-celery + pypy3-test-ext-celery + ; opentelemetry-ext-system-metrics py3{4,5,6,7,8}-test-ext-system-metrics ; ext-system-metrics intentionally excluded from pypy3 @@ -211,6 +215,7 @@ changedir = test-opentracing-shim: ext/opentelemetry-ext-opentracing-shim/tests test-ext-sqlalchemy: ext/opentelemetry-ext-sqlalchemy/tests test-ext-redis: ext/opentelemetry-ext-redis/tests + test-ext-celery: ext/opentelemetry-ext-celery/tests test-ext-system-metrics: ext/opentelemetry-ext-system-metrics/tests commands_pre = @@ -227,6 +232,8 @@ commands_pre = getting-started: pip install -e {toxinidir}/opentelemetry-instrumentation -e {toxinidir}/ext/opentelemetry-ext-requests -e {toxinidir}/ext/opentelemetry-ext-wsgi -e {toxinidir}/ext/opentelemetry-ext-flask + celery: pip install {toxinidir}/ext/opentelemetry-ext-celery[test] + grpc: pip install {toxinidir}/ext/opentelemetry-ext-grpc[test] wsgi,flask,django,asgi,pyramid,starlette: pip install {toxinidir}/tests/util @@ -374,6 +381,7 @@ deps = psycopg2-binary ~= 2.8.4 sqlalchemy ~= 1.3.16 redis ~= 3.3.11 + celery ~= 4.0, != 4.4.4 changedir = ext/opentelemetry-ext-docker-tests/tests @@ -383,6 +391,7 @@ commands_pre = -e {toxinidir}/opentelemetry-sdk \ -e {toxinidir}/opentelemetry-instrumentation \ -e {toxinidir}/tests/util \ + -e {toxinidir}/ext/opentelemetry-ext-celery \ -e {toxinidir}/ext/opentelemetry-ext-dbapi \ -e {toxinidir}/ext/opentelemetry-ext-mysql \ -e {toxinidir}/ext/opentelemetry-ext-psycopg2 \