Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opentelemetry] Add DD Agent and OTel Collector ingestion tests to OTel system tests #1016

Merged
merged 16 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,11 @@ jobs:
env:
DD_API_KEY: ${{ secrets.DD_API_KEY }}
DD_APPLICATION_KEY: ${{ secrets.DD_APPLICATION_KEY }}
DD_APP_KEY: ${{ secrets.DD_APPLICATION_KEY }}
DD_API_KEY_2: ${{ secrets.DD_API_KEY_2 }}
DD_APP_KEY_2: ${{ secrets.DD_APP_KEY_2 }}
DD_API_KEY_3: ${{ secrets.DD_API_KEY_3 }}
DD_APP_KEY_3: ${{ secrets.DD_APP_KEY_3 }}
- name: Compress logs
if: always() && steps.build.outcome == 'success'
run: tar -czvf artifact.tar.gz $(ls | grep logs)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ docker==6.0.0

opentelemetry-proto==1.17.0
paramiko==3.1.0
dictdiffer==0.9.0
91 changes: 80 additions & 11 deletions tests/otel_tracing_e2e/_validator.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
# Util functions to validate JSON trace data from OTel system tests

import json
import dictdiffer

# Validates traces from Agent, Collector and Backend intake OTLP ingestion paths are consistent
def validate_all_traces(
traces_agent: list[dict], traces_intake: list[dict], traces_collector: list[dict], use_128_bits_trace_id: bool
):
spans_agent = validate_trace(traces_agent, use_128_bits_trace_id)
spans_intake = validate_trace(traces_intake, use_128_bits_trace_id)
spans_collector = validate_trace(traces_collector, use_128_bits_trace_id)
validate_spans_from_all_paths(spans_agent, spans_intake, spans_collector)

def validate_trace(traces: list, use_128_bits_trace_id: bool):

# Validates fields that we know the values upfront for one single trace from an OTLP ingestion path
def validate_trace(traces: list[dict], use_128_bits_trace_id: bool) -> tuple:
server_span = None
message_span = None
for trace in traces:
Expand All @@ -21,6 +32,7 @@ def validate_trace(traces: list, use_128_bits_trace_id: bool):
validate_server_span(server_span)
validate_message_span(message_span)
validate_span_link(server_span, message_span)
return (server_span, message_span)


def validate_common_tags(span: dict, use_128_bits_trace_id: bool):
Expand All @@ -35,7 +47,6 @@ def validate_common_tags(span: dict, use_128_bits_trace_id: bool):
"deployment.environment": "system-tests",
"_dd.ingestion_reason": "otel",
"otel.status_code": "Unset",
"otel.user_agent": "OTel-OTLP-Exporter-Java/1.23.1",
"otel.library.name": "com.datadoghq.springbootnative",
}
assert expected_tags.items() <= span.items()
Expand All @@ -54,24 +65,82 @@ def validate_trace_id(span: dict, use_128_bits_trace_id: bool):


def validate_server_span(span: dict):
expected_tags = {"name": "WebController.home", "resource": "GET /"}
expected_tags = {"name": "WebController.basic", "resource": "GET /"}
expected_meta = {"http.route": "/", "http.method": "GET"}
assert expected_tags.items() <= span.items()
assert expected_meta.items() <= span["meta"].items()


def validate_message_span(span: dict):
expected_tags = {"name": "WebController.home.publish", "resource": "publish"}
expected_tags = {"name": "WebController.basic.publish", "resource": "publish"}
expected_meta = {"messaging.operation": "publish", "messaging.system": "rabbitmq"}
assert expected_tags.items() <= span.items()
assert expected_meta.items() <= span["meta"].items()


def validate_span_link(server_span: dict, message_span: dict):
span_links = json.loads(server_span["meta"]["_dd.span_links"])
assert len(span_links) == 1
span_link = span_links[0]
assert span_link["trace_id"] == message_span["meta"]["otel.trace_id"]
span_id_hex = f'{int(message_span["span_id"]):x}' # span_id is an int in span but a hex in span_links
assert span_link["span_id"] == span_id_hex
assert span_link["attributes"] == {"messaging.operation": "publish"}
# TODO: enable check on span links once newer version of Agent is used in system tests
if "_dd.span_links" not in server_span["meta"]:
return

actual = json.loads(server_span["meta"]["_dd.span_links"])
expected = [
{
"trace_id": message_span["meta"]["otel.trace_id"],
"span_id": f'{int(message_span["span_id"]):x}',
"attributes": {"messaging.operation": "publish"},
}
]
assert actual == expected


# Validates fields that we don't know the values upfront for all 3 ingestion paths
def validate_spans_from_all_paths(spans_agent: tuple, spans_intake: tuple, spans_collector: tuple):
validate_span_fields(spans_agent[0], spans_intake[0], "Agent server span", "Intake server span")
validate_span_fields(spans_agent[0], spans_collector[0], "Agent server span", "Collector server span")
validate_span_fields(spans_agent[1], spans_intake[1], "Agent message span", "Intake message span")
validate_span_fields(spans_agent[1], spans_collector[1], "Agent message span", "Intake message span")


def validate_span_fields(span1: dict, span2: dict, name1: str, name2: str):
assert span1["start"] == span2["start"]
assert span1["end"] == span2["end"]
assert span1["duration"] == span2["duration"]
assert span1["resource_hash"] == span2["resource_hash"]
validate_span_metas_metrics(span1["meta"], span2["meta"], span1["metrics"], span2["metrics"], name1, name2)


KNOWN_UNMATCHED_METAS = [
"otel.user_agent",
"span.kind",
"_dd.agent_version",
"_dd.span_links", # TODO: remove once Agent supports span links
"_dd.agent_rare_sampler.enabled",
"_dd.hostname",
"_dd.compute_stats",
"_dd.tracer_version",
"_dd.p.dm",
"_dd.agent_hostname",
]
KNOWN_UNMATCHED_METRICS = [
"_dd.agent_errors_sampler.target_tps",
"_dd.agent_priority_sampler.target_tps",
"_sampling_priority_rate_v1",
"_dd.otlp_sr",
]


def validate_span_metas_metrics(meta1: dict, meta2: dict, metrics1: dict, metrics2: dict, name1: str, name2: str):
# Exclude fields that are expected to have different values for different ingestion paths
for known_unmatched_meta in KNOWN_UNMATCHED_METAS:
meta1.pop(known_unmatched_meta, None)
meta2.pop(known_unmatched_meta, None)
for known_unmatched_metric in KNOWN_UNMATCHED_METRICS:
metrics1.pop(known_unmatched_metric, None)
metrics2.pop(known_unmatched_metric, None)

# Other fields should match
assert meta1 == meta2, f"Diff in metas between {name1} and {name2}: {list(dictdiffer.diff(meta1, meta2))}"
assert (
metrics1 == metrics2
), f"Diff in metrics between {name1} and {name2}: {list(dictdiffer.diff(metrics1, metrics2))}"
49 changes: 41 additions & 8 deletions tests/otel_tracing_e2e/test_e2e.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from _validator import validate_trace
import base64
import os
from _validator import validate_all_traces
from utils import context, weblog, interfaces, scenarios, irrelevant


Expand All @@ -7,19 +9,50 @@
class Test_OTel_E2E:
def setup_main(self):
self.use_128_bits_trace_id = False
self.r = weblog.get(path="/")
self.r = weblog.get(path="/basic")

def test_main(self):
otel_trace_ids = set(interfaces.open_telemetry.get_otel_trace_id(request=self.r))
assert len(otel_trace_ids) == 2
dd_trace_ids = [self._get_dd_trace_id(otel_trace_id) for otel_trace_id in otel_trace_ids]
traces = [
interfaces.backend.assert_otlp_trace_exist(request=self.r, dd_trace_id=dd_trace_id)

# The 1st account has traces sent by DD Agent
traces_agent = [
interfaces.backend.assert_otlp_trace_exist(
request=self.r,
dd_trace_id=dd_trace_id,
dd_api_key=os.environ["DD_API_KEY"],
dd_app_key=os.environ.get("DD_APP_KEY", os.environ.get("DD_APPLICATION_KEY")),
)
for dd_trace_id in dd_trace_ids
]

# The 2nd account has traces via the backend OTLP intake endpoint
traces_intake = [
interfaces.backend.assert_otlp_trace_exist(
request=self.r,
dd_trace_id=dd_trace_id,
dd_api_key=os.environ["DD_API_KEY_2"],
dd_app_key=os.environ["DD_APP_KEY_2"],
)
for dd_trace_id in dd_trace_ids
]
validate_trace(traces, self.use_128_bits_trace_id)

def _get_dd_trace_id(self, otel_trace_id=bytes) -> int:
# The 3rd account has traces sent by OTel Collector
traces_collector = [
interfaces.backend.assert_otlp_trace_exist(
request=self.r,
dd_trace_id=dd_trace_id,
dd_api_key=os.environ["DD_API_KEY_3"],
dd_app_key=os.environ["DD_APP_KEY_3"],
)
for dd_trace_id in dd_trace_ids
]

validate_all_traces(traces_agent, traces_intake, traces_collector, self.use_128_bits_trace_id)

def _get_dd_trace_id(self, otel_trace_id=str) -> int:
otel_trace_id_bytes = base64.b64decode(otel_trace_id)
if self.use_128_bits_trace_id:
return int.from_bytes(otel_trace_id, "big")
return int.from_bytes(otel_trace_id[8:], "big")
return int.from_bytes(otel_trace_id_bytes, "big")
return int.from_bytes(otel_trace_id_bytes[8:], "big")
37 changes: 24 additions & 13 deletions utils/_context/_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
CassandraContainer,
RabbitMqContainer,
MySqlContainer,
OpenTelemetryCollectorContainer,
create_network,
)

Expand Down Expand Up @@ -493,15 +494,27 @@ def get_junit_properties(self):
class OpenTelemetryScenario(_DockerScenario):
""" Scenario for testing opentelemetry"""

def __init__(self, name, weblog_env) -> None:
self._required_containers = []
def __init__(self, name) -> None:
super().__init__(name, use_proxy=True)

self.weblog_container = WeblogContainer(self.host_log_folder, environment=weblog_env)
self.agent_container = AgentContainer(host_log_folder=self.host_log_folder, use_proxy=True)
self.weblog_container = WeblogContainer(self.host_log_folder)
self.collector_container = OpenTelemetryCollectorContainer(self.host_log_folder)
self._required_containers.append(self.agent_container)
self._required_containers.append(self.weblog_container)
self._required_containers.append(self.collector_container)

def configure(self):
super().configure()
self._check_env_vars()
dd_site = os.environ.get("DD_SITE", "datad0g.com")
self.weblog_container.environment["DD_API_KEY"] = os.environ.get("DD_API_KEY_2")
self.weblog_container.environment["DD_SITE"] = dd_site
self.collector_container.environment["DD_API_KEY"] = os.environ.get("DD_API_KEY_3")
self.collector_container.environment["DD_SITE"] = dd_site

def _create_interface_folders(self):
for interface in ("open_telemetry", "backend"):
for interface in ("open_telemetry", "backend", "agent"):
self.create_log_subfolder(f"interfaces/{interface}")

def _start_interface_watchdog(self):
Expand Down Expand Up @@ -565,17 +578,18 @@ def _wait_interface(interface, session, timeout):

interface.wait(timeout)

def _check_env_vars(self):
for env in ["DD_API_KEY", "DD_APP_KEY", "DD_API_KEY_2", "DD_APP_KEY_2", "DD_API_KEY_3", "DD_APP_KEY_3"]:
if env not in os.environ:
raise Exception(f"Please set {env}, OTel E2E test requires 3 API keys and 3 APP keys")

@property
def library(self):
return LibraryVersion("open_telemetry", "0.0.0")

@property
def agent(self):
return LibraryVersion("agent", "0.0.0")

@property
def agent_version(self):
return self.agent.version
return self.agent_container.agent_version

@property
def weblog_variant(self):
Expand Down Expand Up @@ -841,10 +855,7 @@ class scenarios:
backend_interface_timeout=5,
)

otel_tracing_e2e = OpenTelemetryScenario(
"OTEL_TRACING_E2E",
weblog_env={"DD_API_KEY": os.environ.get("DD_API_KEY"), "DD_SITE": os.environ.get("DD_SITE"),},
)
otel_tracing_e2e = OpenTelemetryScenario("OTEL_TRACING_E2E")

library_conf_custom_headers_short = EndToEndScenario(
"LIBRARY_CONF_CUSTOM_HEADERS_SHORT", additional_trace_header_tags=("header-tag1", "header-tag2")
Expand Down
38 changes: 37 additions & 1 deletion utils/_context/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from docker.errors import APIError
from docker.models.containers import Container
import pytest
import requests

from utils._context.library_version import LibraryVersion, Version
from utils.tools import logger
Expand All @@ -28,7 +29,14 @@ class TestedContainer:

# https://docker-py.readthedocs.io/en/stable/containers.html
def __init__(
self, name, image_name, host_log_folder, environment=None, allow_old_container=False, healthcheck=None, **kwargs
self,
name,
image_name,
host_log_folder,
environment=None,
allow_old_container=False,
healthcheck=None,
**kwargs,
) -> None:
self.name = name
self.host_log_folder = host_log_folder
Expand Down Expand Up @@ -473,3 +481,31 @@ def __init__(self, host_log_folder) -> None:
host_log_folder=host_log_folder,
healthcheck={"test": "/healthcheck.sh", "retries": 60},
)


class OpenTelemetryCollectorContainer(TestedContainer):
def __init__(self, host_log_folder) -> None:
super().__init__(
image_name="otel/opentelemetry-collector-contrib:latest",
name="collector",
command="--config=/etc/otelcol-config.yml",
environment={},
volumes={"./utils/build/docker/otelcol-config.yaml": {"bind": "/etc/otelcol-config.yml", "mode": "ro",}},
host_log_folder=host_log_folder,
ports={"13133/tcp": ("0.0.0.0", 13133)},
)

# Override wait_for_health because we cannot do docker exec for container opentelemetry-collector-contrib
def wait_for_health(self):
time.sleep(20) # It takes long for otel collector to start

for i in range(61):
try:
r = requests.get("http://localhost:13133", timeout=1)
logger.debug(f"Healthcheck #{i} on localhost:13133: {r}")
if r.status_code == 200:
return
except Exception as e:
logger.debug(f"Healthcheck #{i} on localhost:13133: {e}")
time.sleep(1)
pytest.exit("localhost:13133 never answered to healthcheck request", 1)
10 changes: 10 additions & 0 deletions utils/build/docker/agent.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ RUN echo '\
log_level: DEBUG\n\
apm_config:\n\
apm_non_local_traffic: true\n\
otlp_config:\n\
debug:\n\
verbosity: detailed\n\
receiver:\n\
protocols:\n\
http:\n\
endpoint: 0.0.0.0:4318\n\
traces:\n\
enabled: true\n\
span_name_as_resource_name: true\n\
' >> /etc/datadog-agent/datadog.yaml

# Proxy conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,29 @@ public static void main(String[] args) {
ResourceAttributes.SERVICE_NAME, "otel-system-tests-spring-boot",
ResourceAttributes.DEPLOYMENT_ENVIRONMENT, "system-tests"));

OtlpHttpSpanExporter intakeExporter = OtlpHttpSpanExporter.builder()
OtlpHttpSpanExporter agentExporter =
OtlpHttpSpanExporter.builder()
.setEndpoint("http://proxy:8126/v1/traces")
.addHeader("dd-protocol", "otlp")
.addHeader("dd-otlp-path", "agent")
.build();
OtlpHttpSpanExporter intakeExporter =
OtlpHttpSpanExporter.builder()
.setEndpoint("http://proxy:8126/api/v0.2/traces") // send to the proxy first
.addHeader("dd-protocol", "otlp")
.addHeader("dd-api-key", System.getenv("DD_API_KEY"))
.addHeader("dd-otlp-path", "intake")
.build();
OtlpHttpSpanExporter collectorExporter =
OtlpHttpSpanExporter.builder()
.setEndpoint("http://proxy:8126/v1/traces")
.addHeader("dd-protocol", "otlp")
.addHeader("dd-otlp-path", "collector")
.build();

SpanExporter loggingSpanExporter = OtlpJsonLoggingSpanExporter.create();

SpanExporter exporter = SpanExporter.composite(intakeExporter, loggingSpanExporter);
SpanExporter exporter = SpanExporter.composite(agentExporter, intakeExporter, collectorExporter, loggingSpanExporter);

SpanProcessor processor = BatchSpanProcessor.builder(exporter)
.setMaxExportBatchSize(1)
Expand Down
Loading