Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(runtime_metrics): avoid tracking multiple service names #12460

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions ddtrace/_trace/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import TypeVar
from typing import Union
Expand Down Expand Up @@ -224,12 +223,6 @@ def __init__(
# globally set tags
self._tags = config.tags.copy()

# collection of services seen, used for runtime metrics tags
# a buffer for service info so we don't perpetually send the same things
self._services: Set[str] = set()
if config.service:
self._services.add(config.service)

# Runtime id used for associating data collected during runtime to
# traces
self._pid = getpid()
Expand Down Expand Up @@ -635,13 +628,6 @@ def _generate_diagnostic_logs(self):

def _child_after_fork(self):
self._pid = getpid()

# Assume that the services of the child are not necessarily a subset of those
# of the parent.
self._services = set()
if config.service:
self._services.add(config.service)

# Re-create the background writer thread
self._writer = self._writer.recreate()
self._span_processors, self._appsec_processor, self._deferred_processors = _default_span_processors_factory(
Expand Down Expand Up @@ -832,10 +818,6 @@ def _start_span(
if activate:
self.context_provider.activate(span)

# update set of services handled by tracer
if service and service not in self._services and self._is_span_internal(span):
self._services.add(service)

# Only call span processors if the tracer is enabled (even if APM opted out)
if self.enabled or asm_config._apm_opt_out:
for p in chain(self._span_processors, SpanProcessor.__processors__, self._deferred_processors):
Expand Down
35 changes: 16 additions & 19 deletions ddtrace/internal/runtime/runtime_metrics.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import itertools
import os
from typing import ClassVar # noqa:F401
from typing import List # noqa:F401
from typing import Optional # noqa:F401
from typing import Set # noqa:F401

import ddtrace
from ddtrace.internal import atexit
Expand Down Expand Up @@ -42,13 +42,16 @@ def __repr__(self):
)


class RuntimeTags(RuntimeCollectorsIterable):
class PlatformTags(RuntimeCollectorsIterable):
# DEV: `None` means to allow all tags generated by PlatformTagCollector and TracerTagCollector
ENABLED = None
COLLECTORS = [
PlatformTagCollector,
TracerTagCollector,
]
COLLECTORS = [PlatformTagCollector]


class TracerTags(RuntimeCollectorsIterable):
# DEV: `None` means to allow all tags generated by PlatformTagCollector and TracerTagCollector
ENABLED = None
COLLECTORS = [TracerTagCollector]


class RuntimeMetrics(RuntimeCollectorsIterable):
Expand Down Expand Up @@ -80,7 +83,7 @@ def __init__(self, interval=_get_interval_or_default(), tracer=ddtrace.tracer, d
)
self.tracer: Optional[ddtrace.trace.Tracer] = tracer
self._runtime_metrics: RuntimeMetrics = RuntimeMetrics()
self._services: Set[str] = set()
self._platform_tags: List[str] = self._format_tags(PlatformTags())

@classmethod
def disable(cls):
Expand Down Expand Up @@ -122,8 +125,6 @@ def enable(cls, flush_interval=None, tracer=None, dogstatsd_url=None):
flush_interval = _get_interval_or_default()
runtime_worker = cls(flush_interval, tracer, dogstatsd_url)
runtime_worker.start()
# force an immediate update constant tags
runtime_worker.update_runtime_tags()

forksafe.register(cls._restart)
atexit.register(cls.disable)
Expand All @@ -136,11 +137,10 @@ def enable(cls, flush_interval=None, tracer=None, dogstatsd_url=None):

def flush(self):
# type: () -> None
# The constant tags for the dogstatsd client needs to updated with any new
# service(s) that may have been added.
if self.tracer and self._services != self.tracer._services:
self._services = self.tracer._services
self.update_runtime_tags()
# Ensure runtime metrics have up-to-date tags (ex: service, env, version)
rumtime_tags = self._format_tags(TracerTags()) + self._platform_tags
log.debug("Updating constant tags %s", rumtime_tags)
self._dogstatsd_client.constant_tags = rumtime_tags

with self._dogstatsd_client:
for key, value in self._runtime_metrics:
Expand All @@ -152,12 +152,9 @@ def _stop_service(self):
# De-register span hook
super(RuntimeWorker, self)._stop_service()

def update_runtime_tags(self):
# type: () -> None
def _format_tags(self, tags: RuntimeCollectorsIterable) -> List[str]:
# DEV: ddstatsd expects tags in the form ['key1:value1', 'key2:value2', ...]
tags = ["{}:{}".format(k, v) for k, v in RuntimeTags()]
log.debug("Updating constant tags %s", tags)
self._dogstatsd_client.constant_tags = tags
return ["{}:{}".format(k, v) for k, v in tags]

periodic = flush
on_shutdown = flush
7 changes: 5 additions & 2 deletions ddtrace/internal/runtime/tag_collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from ...constants import ENV_KEY
from ...constants import VERSION_KEY
from ..constants import DEFAULT_SERVICE_NAME
from .collector import ValueCollector
from .constants import LANG
from .constants import LANG_INTERPRETER
Expand All @@ -24,8 +25,10 @@ class TracerTagCollector(RuntimeTagCollector):
def collect_fn(self, keys):
ddtrace = self.modules.get("ddtrace")

# make sure to copy _services to avoid RuntimeError: Set changed size during iteration
tags = [(SERVICE, service) for service in list(ddtrace.tracer._services)]
service = DEFAULT_SERVICE_NAME
if ddtrace.config.service:
service = ddtrace.config.service
tags = [(SERVICE, service)]

# DEV: `DD_ENV`, `DD_VERSION`, and `DD_SERVICE` get picked up automatically by
# dogstatsd client, but someone might configure these via `ddtrace.config`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
runtime_metrics: Ensures only the global service name is tagged on runtime metrics, instead of every service name found in the process.
57 changes: 27 additions & 30 deletions tests/tracer/runtime/test_runtime_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from ddtrace.internal.runtime.constants import GC_COUNT_GEN0
from ddtrace.internal.runtime.constants import SERVICE
from ddtrace.internal.runtime.runtime_metrics import RuntimeMetrics
from ddtrace.internal.runtime.runtime_metrics import RuntimeTags
from ddtrace.internal.runtime.runtime_metrics import RuntimeWorker
from ddtrace.internal.runtime.runtime_metrics import TracerTags
from ddtrace.internal.service import ServiceStatus
from tests.utils import BaseTestCase
from tests.utils import TracerTestCase
Expand All @@ -37,95 +37,91 @@ class TestRuntimeTags(TracerTestCase):
def test_all_tags(self):
with self.override_global_tracer():
with self.trace("test", service="test"):
tags = set([k for (k, v) in RuntimeTags()])
tags = set([k for (k, v) in TracerTags()])
assert SERVICE in tags
# no env set by default
assert ENV not in tags

def test_one_tag(self):
with self.override_global_tracer():
with self.trace("test", service="test"):
tags = [k for (k, v) in RuntimeTags(enabled=[SERVICE])]
tags = [k for (k, v) in TracerTags(enabled=[SERVICE])]
self.assertEqual(set(tags), set([SERVICE]))

def test_env_tag(self):
def filter_only_env_tags(tags):
return [(k, v) for (k, v) in RuntimeTags() if k == "env"]
return [(k, v) for (k, v) in TracerTags() if k == "env"]

with self.override_global_tracer():
# first without env tag set in tracer
with self.trace("first-test", service="test"):
tags = filter_only_env_tags(RuntimeTags())
tags = filter_only_env_tags(TracerTags())
assert tags == []

# then with an env tag set
self.tracer.set_tags({"env": "tests.dog"})
with self.trace("second-test", service="test"):
tags = filter_only_env_tags(RuntimeTags())
tags = filter_only_env_tags(TracerTags())
assert tags == [("env", "tests.dog")]

# check whether updating env works
self.tracer.set_tags({"env": "staging.dog"})
with self.trace("third-test", service="test"):
tags = filter_only_env_tags(RuntimeTags())
tags = filter_only_env_tags(TracerTags())
assert tags == [("env", "staging.dog")]


@pytest.mark.subprocess(env={})
def test_runtime_tags_empty():
from ddtrace.internal.runtime.runtime_metrics import RuntimeTags
from ddtrace.internal.runtime.runtime_metrics import PlatformTags

tags = list(RuntimeTags())
assert len(tags) == 5
tags = list(PlatformTags())
assert len(tags) == 4

tags = dict(tags)
assert set(tags.keys()) == set(["lang", "lang_interpreter", "lang_version", "tracer_version", "service"])
assert set(tags.keys()) == set(["lang", "lang_interpreter", "lang_version", "tracer_version"])


@pytest.mark.subprocess(env={"DD_SERVICE": "my-service", "DD_ENV": "test-env", "DD_VERSION": "1.2.3"})
def test_runtime_tags_usm():
from ddtrace.internal.runtime.runtime_metrics import RuntimeTags
from ddtrace.internal.runtime.runtime_metrics import TracerTags

tags = list(RuntimeTags())
assert len(tags) == 7, tags
tags = list(TracerTags())
assert len(tags) == 3, tags

tags = dict(tags)
assert set(tags.keys()) == set(
["lang", "lang_interpreter", "lang_version", "tracer_version", "service", "version", "env"]
)
assert set(tags.keys()) == set(["service", "version", "env"])
assert tags["service"] == "my-service"
assert tags["env"] == "test-env"
assert tags["version"] == "1.2.3"


@pytest.mark.subprocess(env={"DD_TAGS": "version:1.2.3,custom:tag,test:key", "DD_VERSION": "4.5.6"})
def test_runtime_tags_dd_tags():
from ddtrace.internal.runtime.runtime_metrics import RuntimeTags
from ddtrace.internal.runtime.runtime_metrics import TracerTags

tags = list(RuntimeTags())
assert len(tags) == 8, tags
tags = list(TracerTags())
assert len(tags) == 4, tags

tags = dict(tags)
assert set(tags.keys()) == set(
["lang", "lang_interpreter", "lang_version", "tracer_version", "version", "custom", "test", "service"]
)
assert set(tags.keys()) == set(["version", "custom", "test", "service"])
assert tags["custom"] == "tag"
assert tags["test"] == "key"
assert tags["version"] == "4.5.6"


@pytest.mark.subprocess()
def test_runtime_tags_manual_tracer_tags():
from ddtrace.internal.runtime.runtime_metrics import RuntimeTags
from ddtrace.internal.runtime.runtime_metrics import TracerTags
from ddtrace.trace import tracer

tracer.set_tags({"manual": "tag"})

tags = list(RuntimeTags())
assert len(tags) == 6, tags
tags = list(TracerTags())
assert len(tags) == 2, tags

tags = dict(tags)
assert set(tags.keys()) == set(["lang", "lang_interpreter", "lang_version", "tracer_version", "manual", "service"])
assert set(tags.keys()) == set(["manual", "service"])
assert tags["manual"] == "tag"


Expand Down Expand Up @@ -177,9 +173,10 @@ def test_tracer_metrics(self):

# check to last set of metrics returned to confirm tags were set
for gauge in received[-1:]:
self.assertRegex(gauge, "service:parent")
self.assertRegex(gauge, "service:child")
self.assertNotRegex(gauge, "service:db")
self.assertRegex(gauge, "service:tests.tracer")
self.assertNotIn(gauge, "service:parent")
self.assertNotIn(gauge, "service:child")
self.assertNotIn(gauge, "service:db")
self.assertRegex(gauge, "env:tests.dog")
self.assertRegex(gauge, "lang_interpreter:CPython")
self.assertRegex(gauge, "lang_version:")
Expand Down
11 changes: 3 additions & 8 deletions tests/tracer/runtime/test_tag_collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ def test_tracer_tags_service_from_code():
import ddtrace
from ddtrace.internal.runtime import tag_collectors
from ddtrace.trace import TraceFilter
from tests.conftest import DEFAULT_DDTRACE_SUBPROCESS_TEST_SERVICE_NAME

class DropFilter(TraceFilter):
def process_trace(self, _):
Expand All @@ -102,10 +101,6 @@ def process_trace(self, _):
values = ttc.collect()

assert values is not None
assert set(values) == set(
[
("service", "new-service"),
("service", DEFAULT_DDTRACE_SUBPROCESS_TEST_SERVICE_NAME),
("service", "my-service"),
]
)
assert values == [
("service", "my-service"),
], values
20 changes: 0 additions & 20 deletions tests/tracer/test_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,26 +464,6 @@ def test_start_child_from_context(self):
_parent=None,
)

@run_in_subprocess()
def test_adding_services(self):
assert self.tracer._services == set(), self.tracer._services
with self.start_span("root", service="one") as root:
assert self.tracer._services == set(["one"]), self.tracer._services
with self.start_span("child", service="two", child_of=root):
pass
assert self.tracer._services == set(["one", "two"]), self.tracer._services

@run_in_subprocess(env_overrides=dict(DD_SERVICE_MAPPING="two:three"))
def test_adding_mapped_services(self):
assert self.tracer._services == set()
with self.start_span("root", service="one") as root:
assert self.tracer._services == set(["one"])

# service "two" gets remapped to "three"
with self.start_span("child", service="two", child_of=root):
pass
assert self.tracer._services == set(["one", "three"])

def test_tracer_set_user(self):
with self.trace("fake_span") as span:
set_user(
Expand Down
Loading