diff --git a/ddtrace/debugging/_debugger.py b/ddtrace/debugging/_debugger.py index 5cd712cac1a..3c0947627d1 100644 --- a/ddtrace/debugging/_debugger.py +++ b/ddtrace/debugging/_debugger.py @@ -52,6 +52,7 @@ from ddtrace.debugging._signal.collector import SignalContext from ddtrace.debugging._signal.metric_sample import MetricSample from ddtrace.debugging._signal.model import Signal +from ddtrace.debugging._signal.model import SignalState from ddtrace.debugging._signal.snapshot import Snapshot from ddtrace.debugging._signal.tracing import DynamicSpan from ddtrace.debugging._signal.tracing import SpanDecoration @@ -322,6 +323,9 @@ def _dd_debugger_hook(self, probe: Probe) -> None: log.debug("[%s][P: %s] Debugger. Report signal %s", os.getpid(), os.getppid(), signal) self._collector.push(signal) + if signal.state is SignalState.DONE: + self._probe_registry.set_emitting(probe) + except Exception: log.error("Failed to execute probe hook", exc_info=True) @@ -421,6 +425,9 @@ def _(wrapped: FunctionType, args: Tuple[Any], kwargs: Dict[str, Any]) -> Any: for context in open_contexts: context.exit(retval, exc_info, end_time - start_time) + signal = context.signal + if signal.state is SignalState.DONE: + self._probe_registry.set_emitting(signal.probe) exc = exc_info[1] if exc is not None: diff --git a/ddtrace/debugging/_probe/registry.py b/ddtrace/debugging/_probe/registry.py index 75cfffaf48b..32e8f443ea6 100644 --- a/ddtrace/debugging/_probe/registry.py +++ b/ddtrace/debugging/_probe/registry.py @@ -3,6 +3,7 @@ from typing import Dict from typing import List from typing import Optional +from typing import cast from ddtrace.debugging._probe.model import Probe from ddtrace.debugging._probe.model import ProbeLocationMixin @@ -15,17 +16,27 @@ class ProbeRegistryEntry(object): - __slots__ = ("probe", "installed", "error_type", "message") + __slots__ = ( + "probe", + "installed", + "emitting", + "error_type", + "message", + ) def __init__(self, probe: Probe) -> None: self.probe = probe self.installed = False + self.emitting = False self.error_type: Optional[str] = None self.message: Optional[str] = None def set_installed(self) -> None: self.installed = True + def set_emitting(self) -> None: + self.emitting = True + def set_error(self, error_type: str, message: str) -> None: self.error_type = error_type self.message = message @@ -102,6 +113,14 @@ def set_installed(self, probe: Probe) -> None: self.logger.installed(probe) + def set_emitting(self, probe: Probe) -> None: + """Set the emitting flag for a probe.""" + with self._lock: + entry = cast(ProbeRegistryEntry, self[probe.probe_id]) + if not entry.emitting: + entry.set_emitting() + self.logger.emitting(probe) + def set_error(self, probe: Probe, error_type: str, message: str) -> None: """Set the error message for a probe.""" with self._lock: @@ -109,7 +128,9 @@ def set_error(self, probe: Probe, error_type: str, message: str) -> None: self.logger.error(probe, (error_type, message)) def _log_probe_status_unlocked(self, entry: ProbeRegistryEntry) -> None: - if entry.installed: + if entry.emitting: + self.logger.emitting(entry.probe) + elif entry.installed: self.logger.installed(entry.probe) elif entry.error_type: assert entry.message is not None, entry # nosec diff --git a/ddtrace/debugging/_probe/status.py b/ddtrace/debugging/_probe/status.py index 3a1dab59eb6..a12db52edbc 100644 --- a/ddtrace/debugging/_probe/status.py +++ b/ddtrace/debugging/_probe/status.py @@ -136,5 +136,12 @@ def installed(self, probe: Probe, message: t.Optional[str] = None) -> None: message or "Probe %s instrumented correctly" % probe.probe_id, ) + def emitting(self, probe: Probe, message: t.Optional[str] = None) -> None: + self._enqueue( + probe, + "EMITTING", + message or "Probe %s is emitting data" % probe.probe_id, + ) + def error(self, probe: Probe, error: t.Optional[ErrorInfo] = None) -> None: self._enqueue(probe, "ERROR", "Failed to instrument probe %s" % probe.probe_id, error) diff --git a/tests/debugging/exploration/debugger.py b/tests/debugging/exploration/debugger.py index 229ac4e64d1..b156b0ec97b 100644 --- a/tests/debugging/exploration/debugger.py +++ b/tests/debugging/exploration/debugger.py @@ -171,6 +171,9 @@ def received(self, *args, **kwargs): def installed(self, *args, **kwargs): pass + def emitting(self, *args, **kwargs): + pass + def error(self, *args, **kwargs): pass diff --git a/tests/debugging/test_debugger.py b/tests/debugging/test_debugger.py index 622c957b7a1..7b8e976a598 100644 --- a/tests/debugging/test_debugger.py +++ b/tests/debugging/test_debugger.py @@ -637,7 +637,7 @@ def test_debugger_line_probe_on_wrapped_function(stuff): assert snapshot.probe.probe_id == "line-probe-wrapped-method" -def test_probe_status_logging(remote_config_worker): +def test_probe_status_logging(remote_config_worker, stuff): assert remoteconfig_poller.status == ServiceStatus.STOPPED with rcm_endpoint(), debugger(diagnostics_interval=float("inf"), enabled=True) as d: @@ -647,6 +647,7 @@ def test_probe_status_logging(remote_config_worker): source_file="tests/submod/stuff.py", line=36, condition=None, + rate=float("inf"), ), create_snapshot_function_probe( probe_id="line-probe-error", @@ -656,18 +657,22 @@ def test_probe_status_logging(remote_config_worker): ), ) + # Call the function multiple times to ensure that we emit only once. + for _ in range(10): + stuff.Stuff().instancestuff(42) + logger = d.probe_status_logger def count_status(queue): return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue) - logger.wait(lambda q: count_status(q) == {"INSTALLED": 1, "RECEIVED": 2, "ERROR": 1}) + logger.wait(lambda q: count_status(q) == {"INSTALLED": 1, "RECEIVED": 2, "ERROR": 1, "EMITTING": 1}) d.log_probe_status() - logger.wait(lambda q: count_status(q) == {"INSTALLED": 2, "RECEIVED": 2, "ERROR": 2}) + logger.wait(lambda q: count_status(q) == {"INSTALLED": 1, "RECEIVED": 2, "ERROR": 2, "EMITTING": 2}) d.log_probe_status() - logger.wait(lambda q: count_status(q) == {"INSTALLED": 3, "RECEIVED": 2, "ERROR": 3}) + logger.wait(lambda q: count_status(q) == {"INSTALLED": 1, "RECEIVED": 2, "ERROR": 3, "EMITTING": 3}) def test_probe_status_logging_reemit_on_modify(remote_config_worker):