Skip to content

Commit

Permalink
fix: Second attempt at fixing trace propagation in Celery 4.2+ (#831)
Browse files Browse the repository at this point in the history
Follow-up to #824 #825
  • Loading branch information
untitaker authored Sep 23, 2020
1 parent b0f2f41 commit 4bf4859
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 16 deletions.
20 changes: 12 additions & 8 deletions sentry_sdk/integrations/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def sentry_build_tracer(name, task, *args, **kwargs):
# short-circuits to task.run if it thinks it's safe.
task.__call__ = _wrap_task_call(task, task.__call__)
task.run = _wrap_task_call(task, task.run)
task.apply_async = _wrap_apply_async(task, task.apply_async)

# `build_tracer` is apparently called for every task
# invocation. Can't wrap every celery task for every invocation
Expand All @@ -72,6 +71,10 @@ def sentry_build_tracer(name, task, *args, **kwargs):

trace.build_tracer = sentry_build_tracer

from celery.app.task import Task # type: ignore

Task.apply_async = _wrap_apply_async(Task.apply_async)

_patch_worker_exit()

# This logger logs every status of every task that ran on the worker.
Expand All @@ -85,30 +88,31 @@ def sentry_build_tracer(name, task, *args, **kwargs):
ignore_logger("celery.redirected")


def _wrap_apply_async(task, f):
# type: (Any, F) -> F
def _wrap_apply_async(f):
# type: (F) -> F
@wraps(f)
def apply_async(*args, **kwargs):
# type: (*Any, **Any) -> Any
hub = Hub.current
integration = hub.get_integration(CeleryIntegration)
if integration is not None and integration.propagate_traces:
with hub.start_span(op="celery.submit", description=task.name):
with hub.start_span(op="celery.submit", description=args[0].name):
with capture_internal_exceptions():
headers = dict(hub.iter_trace_propagation_headers())

if headers:
kwarg_headers = kwargs.setdefault("headers", {})
# Note: kwargs can contain headers=None, so no setdefault!
# Unsure which backend though.
kwarg_headers = kwargs.get("headers") or {}
kwarg_headers.update(headers)

# https://github.com/celery/celery/issues/4875
#
# Need to setdefault the inner headers too since other
# tracing tools (dd-trace-py) also employ this exact
# workaround and we don't want to break them.
#
# This is not reproducible outside of AMQP, therefore no
# tests!
kwarg_headers.setdefault("headers", {}).update(headers)
kwargs["headers"] = kwarg_headers

return f(*args, **kwargs)
else:
Expand Down
6 changes: 1 addition & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,7 @@ def append_envelope(envelope):
@pytest.fixture
def capture_events_forksafe(monkeypatch, capture_events, request):
def inner():
in_process_events = capture_events()

@request.addfinalizer
def _():
assert not in_process_events
capture_events()

events_r, events_w = os.pipe()
events_r = os.fdopen(events_r, "rb", 0)
Expand Down
16 changes: 13 additions & 3 deletions tests/integrations/celery/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def inner(propagate_traces=True, backend="always_eager", **kwargs):

# this backend requires capture_events_forksafe
celery.conf.worker_max_tasks_per_child = 1
celery.conf.worker_concurrency = 1
celery.conf.broker_url = "redis://127.0.0.1:6379"
celery.conf.result_backend = "redis://127.0.0.1:6379"
celery.conf.task_always_eager = False
Expand Down Expand Up @@ -297,7 +298,7 @@ def dummy_task(self):


@pytest.mark.forked
def test_redis_backend(init_celery, capture_events_forksafe, tmpdir):
def test_redis_backend_trace_propagation(init_celery, capture_events_forksafe, tmpdir):
celery = init_celery(traces_sample_rate=1.0, backend="redis", debug=True)

events = capture_events_forksafe()
Expand All @@ -309,8 +310,9 @@ def dummy_task(self):
runs.append(1)
1 / 0

# Curious: Cannot use delay() here or py2.7-celery-4.2 crashes
res = dummy_task.apply_async()
with start_transaction(name="submit_celery"):
# Curious: Cannot use delay() here or py2.7-celery-4.2 crashes
res = dummy_task.apply_async()

with pytest.raises(Exception):
# Celery 4.1 raises a gibberish exception
Expand All @@ -319,6 +321,13 @@ def dummy_task(self):
# if this is nonempty, the worker never really forked
assert not runs

submit_transaction = events.read_event()
assert submit_transaction["type"] == "transaction"
assert submit_transaction["transaction"] == "submit_celery"
(span,) = submit_transaction["spans"]
assert span["op"] == "celery.submit"
assert span["description"] == "dummy_task"

event = events.read_event()
(exception,) = event["exception"]["values"]
assert exception["type"] == "ZeroDivisionError"
Expand All @@ -327,6 +336,7 @@ def dummy_task(self):
assert (
transaction["contexts"]["trace"]["trace_id"]
== event["contexts"]["trace"]["trace_id"]
== submit_transaction["contexts"]["trace"]["trace_id"]
)

events.read_flush()
Expand Down

0 comments on commit 4bf4859

Please sign in to comment.