From 4bf4859087f2018f072fc0be472b7a12b58563e9 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 23 Sep 2020 16:33:26 +0200 Subject: [PATCH] fix: Second attempt at fixing trace propagation in Celery 4.2+ (#831) Follow-up to #824 #825 --- sentry_sdk/integrations/celery.py | 20 ++++++++++++-------- tests/conftest.py | 6 +----- tests/integrations/celery/test_celery.py | 16 +++++++++++++--- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/sentry_sdk/integrations/celery.py b/sentry_sdk/integrations/celery.py index 1a11d4a745..2b51fe1f00 100644 --- a/sentry_sdk/integrations/celery.py +++ b/sentry_sdk/integrations/celery.py @@ -61,7 +61,6 @@ def sentry_build_tracer(name, task, *args, **kwargs): # short-circuits to task.run if it thinks it's safe. task.__call__ = _wrap_task_call(task, task.__call__) task.run = _wrap_task_call(task, task.run) - task.apply_async = _wrap_apply_async(task, task.apply_async) # `build_tracer` is apparently called for every task # invocation. Can't wrap every celery task for every invocation @@ -72,6 +71,10 @@ def sentry_build_tracer(name, task, *args, **kwargs): trace.build_tracer = sentry_build_tracer + from celery.app.task import Task # type: ignore + + Task.apply_async = _wrap_apply_async(Task.apply_async) + _patch_worker_exit() # This logger logs every status of every task that ran on the worker. @@ -85,19 +88,22 @@ def sentry_build_tracer(name, task, *args, **kwargs): ignore_logger("celery.redirected") -def _wrap_apply_async(task, f): - # type: (Any, F) -> F +def _wrap_apply_async(f): + # type: (F) -> F @wraps(f) def apply_async(*args, **kwargs): # type: (*Any, **Any) -> Any hub = Hub.current integration = hub.get_integration(CeleryIntegration) if integration is not None and integration.propagate_traces: - with hub.start_span(op="celery.submit", description=task.name): + with hub.start_span(op="celery.submit", description=args[0].name): with capture_internal_exceptions(): headers = dict(hub.iter_trace_propagation_headers()) + if headers: - kwarg_headers = kwargs.setdefault("headers", {}) + # Note: kwargs can contain headers=None, so no setdefault! + # Unsure which backend though. + kwarg_headers = kwargs.get("headers") or {} kwarg_headers.update(headers) # https://github.com/celery/celery/issues/4875 @@ -105,10 +111,8 @@ def apply_async(*args, **kwargs): # Need to setdefault the inner headers too since other # tracing tools (dd-trace-py) also employ this exact # workaround and we don't want to break them. - # - # This is not reproducible outside of AMQP, therefore no - # tests! kwarg_headers.setdefault("headers", {}).update(headers) + kwargs["headers"] = kwarg_headers return f(*args, **kwargs) else: diff --git a/tests/conftest.py b/tests/conftest.py index 0a17d135fc..1c368a5b14 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -235,11 +235,7 @@ def append_envelope(envelope): @pytest.fixture def capture_events_forksafe(monkeypatch, capture_events, request): def inner(): - in_process_events = capture_events() - - @request.addfinalizer - def _(): - assert not in_process_events + capture_events() events_r, events_w = os.pipe() events_r = os.fdopen(events_r, "rb", 0) diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index 13c7c4dd46..6ef50bc093 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -42,6 +42,7 @@ def inner(propagate_traces=True, backend="always_eager", **kwargs): # this backend requires capture_events_forksafe celery.conf.worker_max_tasks_per_child = 1 + celery.conf.worker_concurrency = 1 celery.conf.broker_url = "redis://127.0.0.1:6379" celery.conf.result_backend = "redis://127.0.0.1:6379" celery.conf.task_always_eager = False @@ -297,7 +298,7 @@ def dummy_task(self): @pytest.mark.forked -def test_redis_backend(init_celery, capture_events_forksafe, tmpdir): +def test_redis_backend_trace_propagation(init_celery, capture_events_forksafe, tmpdir): celery = init_celery(traces_sample_rate=1.0, backend="redis", debug=True) events = capture_events_forksafe() @@ -309,8 +310,9 @@ def dummy_task(self): runs.append(1) 1 / 0 - # Curious: Cannot use delay() here or py2.7-celery-4.2 crashes - res = dummy_task.apply_async() + with start_transaction(name="submit_celery"): + # Curious: Cannot use delay() here or py2.7-celery-4.2 crashes + res = dummy_task.apply_async() with pytest.raises(Exception): # Celery 4.1 raises a gibberish exception @@ -319,6 +321,13 @@ def dummy_task(self): # if this is nonempty, the worker never really forked assert not runs + submit_transaction = events.read_event() + assert submit_transaction["type"] == "transaction" + assert submit_transaction["transaction"] == "submit_celery" + (span,) = submit_transaction["spans"] + assert span["op"] == "celery.submit" + assert span["description"] == "dummy_task" + event = events.read_event() (exception,) = event["exception"]["values"] assert exception["type"] == "ZeroDivisionError" @@ -327,6 +336,7 @@ def dummy_task(self): assert ( transaction["contexts"]["trace"]["trace_id"] == event["contexts"]["trace"]["trace_id"] + == submit_transaction["contexts"]["trace"]["trace_id"] ) events.read_flush()