Skip to content

Commit

Permalink
Merge pull request #1016 from DataDog/yang.song/otlp-intake-test-2
Browse files Browse the repository at this point in the history
[opentelemetry] Add DD Agent and OTel Collector ingestion tests to OTel system tests
  • Loading branch information
songy23 authored May 5, 2023
2 parents ef07d2f + cbfc89f commit dcadd52
Show file tree
Hide file tree
Showing 14 changed files with 318 additions and 67 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,11 @@ jobs:
env:
DD_API_KEY: ${{ secrets.DD_API_KEY }}
DD_APPLICATION_KEY: ${{ secrets.DD_APPLICATION_KEY }}
DD_APP_KEY: ${{ secrets.DD_APPLICATION_KEY }}
DD_API_KEY_2: ${{ secrets.DD_API_KEY_2 }}
DD_APP_KEY_2: ${{ secrets.DD_APP_KEY_2 }}
DD_API_KEY_3: ${{ secrets.DD_API_KEY_3 }}
DD_APP_KEY_3: ${{ secrets.DD_APP_KEY_3 }}
- name: Compress logs
if: always() && steps.build.outcome == 'success'
run: tar -czvf artifact.tar.gz $(ls | grep logs)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ docker==6.0.0

opentelemetry-proto==1.17.0
paramiko==3.1.0
dictdiffer==0.9.0
91 changes: 80 additions & 11 deletions tests/otel_tracing_e2e/_validator.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
# Util functions to validate JSON trace data from OTel system tests

import json
import dictdiffer

# Validates traces from Agent, Collector and Backend intake OTLP ingestion paths are consistent
def validate_all_traces(
traces_agent: list[dict], traces_intake: list[dict], traces_collector: list[dict], use_128_bits_trace_id: bool
):
spans_agent = validate_trace(traces_agent, use_128_bits_trace_id)
spans_intake = validate_trace(traces_intake, use_128_bits_trace_id)
spans_collector = validate_trace(traces_collector, use_128_bits_trace_id)
validate_spans_from_all_paths(spans_agent, spans_intake, spans_collector)

def validate_trace(traces: list, use_128_bits_trace_id: bool):

# Validates fields that we know the values upfront for one single trace from an OTLP ingestion path
def validate_trace(traces: list[dict], use_128_bits_trace_id: bool) -> tuple:
server_span = None
message_span = None
for trace in traces:
Expand All @@ -21,6 +32,7 @@ def validate_trace(traces: list, use_128_bits_trace_id: bool):
validate_server_span(server_span)
validate_message_span(message_span)
validate_span_link(server_span, message_span)
return (server_span, message_span)


def validate_common_tags(span: dict, use_128_bits_trace_id: bool):
Expand All @@ -35,7 +47,6 @@ def validate_common_tags(span: dict, use_128_bits_trace_id: bool):
"deployment.environment": "system-tests",
"_dd.ingestion_reason": "otel",
"otel.status_code": "Unset",
"otel.user_agent": "OTel-OTLP-Exporter-Java/1.23.1",
"otel.library.name": "com.datadoghq.springbootnative",
}
assert expected_tags.items() <= span.items()
Expand All @@ -54,24 +65,82 @@ def validate_trace_id(span: dict, use_128_bits_trace_id: bool):


def validate_server_span(span: dict):
expected_tags = {"name": "WebController.home", "resource": "GET /"}
expected_tags = {"name": "WebController.basic", "resource": "GET /"}
expected_meta = {"http.route": "/", "http.method": "GET"}
assert expected_tags.items() <= span.items()
assert expected_meta.items() <= span["meta"].items()


def validate_message_span(span: dict):
expected_tags = {"name": "WebController.home.publish", "resource": "publish"}
expected_tags = {"name": "WebController.basic.publish", "resource": "publish"}
expected_meta = {"messaging.operation": "publish", "messaging.system": "rabbitmq"}
assert expected_tags.items() <= span.items()
assert expected_meta.items() <= span["meta"].items()


def validate_span_link(server_span: dict, message_span: dict):
span_links = json.loads(server_span["meta"]["_dd.span_links"])
assert len(span_links) == 1
span_link = span_links[0]
assert span_link["trace_id"] == message_span["meta"]["otel.trace_id"]
span_id_hex = f'{int(message_span["span_id"]):x}' # span_id is an int in span but a hex in span_links
assert span_link["span_id"] == span_id_hex
assert span_link["attributes"] == {"messaging.operation": "publish"}
# TODO: enable check on span links once newer version of Agent is used in system tests
if "_dd.span_links" not in server_span["meta"]:
return

actual = json.loads(server_span["meta"]["_dd.span_links"])
expected = [
{
"trace_id": message_span["meta"]["otel.trace_id"],
"span_id": f'{int(message_span["span_id"]):x}',
"attributes": {"messaging.operation": "publish"},
}
]
assert actual == expected


# Validates fields that we don't know the values upfront for all 3 ingestion paths
def validate_spans_from_all_paths(spans_agent: tuple, spans_intake: tuple, spans_collector: tuple):
validate_span_fields(spans_agent[0], spans_intake[0], "Agent server span", "Intake server span")
validate_span_fields(spans_agent[0], spans_collector[0], "Agent server span", "Collector server span")
validate_span_fields(spans_agent[1], spans_intake[1], "Agent message span", "Intake message span")
validate_span_fields(spans_agent[1], spans_collector[1], "Agent message span", "Intake message span")


def validate_span_fields(span1: dict, span2: dict, name1: str, name2: str):
assert span1["start"] == span2["start"]
assert span1["end"] == span2["end"]
assert span1["duration"] == span2["duration"]
assert span1["resource_hash"] == span2["resource_hash"]
validate_span_metas_metrics(span1["meta"], span2["meta"], span1["metrics"], span2["metrics"], name1, name2)


KNOWN_UNMATCHED_METAS = [
"otel.user_agent",
"span.kind",
"_dd.agent_version",
"_dd.span_links", # TODO: remove once Agent supports span links
"_dd.agent_rare_sampler.enabled",
"_dd.hostname",
"_dd.compute_stats",
"_dd.tracer_version",
"_dd.p.dm",
"_dd.agent_hostname",
]
KNOWN_UNMATCHED_METRICS = [
"_dd.agent_errors_sampler.target_tps",
"_dd.agent_priority_sampler.target_tps",
"_sampling_priority_rate_v1",
"_dd.otlp_sr",
]


def validate_span_metas_metrics(meta1: dict, meta2: dict, metrics1: dict, metrics2: dict, name1: str, name2: str):
# Exclude fields that are expected to have different values for different ingestion paths
for known_unmatched_meta in KNOWN_UNMATCHED_METAS:
meta1.pop(known_unmatched_meta, None)
meta2.pop(known_unmatched_meta, None)
for known_unmatched_metric in KNOWN_UNMATCHED_METRICS:
metrics1.pop(known_unmatched_metric, None)
metrics2.pop(known_unmatched_metric, None)

# Other fields should match
assert meta1 == meta2, f"Diff in metas between {name1} and {name2}: {list(dictdiffer.diff(meta1, meta2))}"
assert (
metrics1 == metrics2
), f"Diff in metrics between {name1} and {name2}: {list(dictdiffer.diff(metrics1, metrics2))}"
49 changes: 41 additions & 8 deletions tests/otel_tracing_e2e/test_e2e.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from _validator import validate_trace
import base64
import os
from _validator import validate_all_traces
from utils import context, weblog, interfaces, scenarios, irrelevant


Expand All @@ -7,19 +9,50 @@
class Test_OTel_E2E:
def setup_main(self):
self.use_128_bits_trace_id = False
self.r = weblog.get(path="/")
self.r = weblog.get(path="/basic")

def test_main(self):
otel_trace_ids = set(interfaces.open_telemetry.get_otel_trace_id(request=self.r))
assert len(otel_trace_ids) == 2
dd_trace_ids = [self._get_dd_trace_id(otel_trace_id) for otel_trace_id in otel_trace_ids]
traces = [
interfaces.backend.assert_otlp_trace_exist(request=self.r, dd_trace_id=dd_trace_id)

# The 1st account has traces sent by DD Agent
traces_agent = [
interfaces.backend.assert_otlp_trace_exist(
request=self.r,
dd_trace_id=dd_trace_id,
dd_api_key=os.environ["DD_API_KEY"],
dd_app_key=os.environ.get("DD_APP_KEY", os.environ.get("DD_APPLICATION_KEY")),
)
for dd_trace_id in dd_trace_ids
]

# The 2nd account has traces via the backend OTLP intake endpoint
traces_intake = [
interfaces.backend.assert_otlp_trace_exist(
request=self.r,
dd_trace_id=dd_trace_id,
dd_api_key=os.environ["DD_API_KEY_2"],
dd_app_key=os.environ["DD_APP_KEY_2"],
)
for dd_trace_id in dd_trace_ids
]
validate_trace(traces, self.use_128_bits_trace_id)

def _get_dd_trace_id(self, otel_trace_id=bytes) -> int:
# The 3rd account has traces sent by OTel Collector
traces_collector = [
interfaces.backend.assert_otlp_trace_exist(
request=self.r,
dd_trace_id=dd_trace_id,
dd_api_key=os.environ["DD_API_KEY_3"],
dd_app_key=os.environ["DD_APP_KEY_3"],
)
for dd_trace_id in dd_trace_ids
]

validate_all_traces(traces_agent, traces_intake, traces_collector, self.use_128_bits_trace_id)

def _get_dd_trace_id(self, otel_trace_id=str) -> int:
otel_trace_id_bytes = base64.b64decode(otel_trace_id)
if self.use_128_bits_trace_id:
return int.from_bytes(otel_trace_id, "big")
return int.from_bytes(otel_trace_id[8:], "big")
return int.from_bytes(otel_trace_id_bytes, "big")
return int.from_bytes(otel_trace_id_bytes[8:], "big")
37 changes: 24 additions & 13 deletions utils/_context/_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
CassandraContainer,
RabbitMqContainer,
MySqlContainer,
OpenTelemetryCollectorContainer,
create_network,
)

Expand Down Expand Up @@ -493,15 +494,27 @@ def get_junit_properties(self):
class OpenTelemetryScenario(_DockerScenario):
""" Scenario for testing opentelemetry"""

def __init__(self, name, weblog_env) -> None:
self._required_containers = []
def __init__(self, name) -> None:
super().__init__(name, use_proxy=True)

self.weblog_container = WeblogContainer(self.host_log_folder, environment=weblog_env)
self.agent_container = AgentContainer(host_log_folder=self.host_log_folder, use_proxy=True)
self.weblog_container = WeblogContainer(self.host_log_folder)
self.collector_container = OpenTelemetryCollectorContainer(self.host_log_folder)
self._required_containers.append(self.agent_container)
self._required_containers.append(self.weblog_container)
self._required_containers.append(self.collector_container)

def configure(self):
super().configure()
self._check_env_vars()
dd_site = os.environ.get("DD_SITE", "datad0g.com")
self.weblog_container.environment["DD_API_KEY"] = os.environ.get("DD_API_KEY_2")
self.weblog_container.environment["DD_SITE"] = dd_site
self.collector_container.environment["DD_API_KEY"] = os.environ.get("DD_API_KEY_3")
self.collector_container.environment["DD_SITE"] = dd_site

def _create_interface_folders(self):
for interface in ("open_telemetry", "backend"):
for interface in ("open_telemetry", "backend", "agent"):
self.create_log_subfolder(f"interfaces/{interface}")

def _start_interface_watchdog(self):
Expand Down Expand Up @@ -565,17 +578,18 @@ def _wait_interface(interface, session, timeout):

interface.wait(timeout)

def _check_env_vars(self):
for env in ["DD_API_KEY", "DD_APP_KEY", "DD_API_KEY_2", "DD_APP_KEY_2", "DD_API_KEY_3", "DD_APP_KEY_3"]:
if env not in os.environ:
raise Exception(f"Please set {env}, OTel E2E test requires 3 API keys and 3 APP keys")

@property
def library(self):
return LibraryVersion("open_telemetry", "0.0.0")

@property
def agent(self):
return LibraryVersion("agent", "0.0.0")

@property
def agent_version(self):
return self.agent.version
return self.agent_container.agent_version

@property
def weblog_variant(self):
Expand Down Expand Up @@ -841,10 +855,7 @@ class scenarios:
backend_interface_timeout=5,
)

otel_tracing_e2e = OpenTelemetryScenario(
"OTEL_TRACING_E2E",
weblog_env={"DD_API_KEY": os.environ.get("DD_API_KEY"), "DD_SITE": os.environ.get("DD_SITE"),},
)
otel_tracing_e2e = OpenTelemetryScenario("OTEL_TRACING_E2E")

library_conf_custom_headers_short = EndToEndScenario(
"LIBRARY_CONF_CUSTOM_HEADERS_SHORT", additional_trace_header_tags=("header-tag1", "header-tag2")
Expand Down
38 changes: 37 additions & 1 deletion utils/_context/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from docker.errors import APIError
from docker.models.containers import Container
import pytest
import requests

from utils._context.library_version import LibraryVersion, Version
from utils.tools import logger
Expand All @@ -28,7 +29,14 @@ class TestedContainer:

# https://docker-py.readthedocs.io/en/stable/containers.html
def __init__(
self, name, image_name, host_log_folder, environment=None, allow_old_container=False, healthcheck=None, **kwargs
self,
name,
image_name,
host_log_folder,
environment=None,
allow_old_container=False,
healthcheck=None,
**kwargs,
) -> None:
self.name = name
self.host_log_folder = host_log_folder
Expand Down Expand Up @@ -473,3 +481,31 @@ def __init__(self, host_log_folder) -> None:
host_log_folder=host_log_folder,
healthcheck={"test": "/healthcheck.sh", "retries": 60},
)


class OpenTelemetryCollectorContainer(TestedContainer):
def __init__(self, host_log_folder) -> None:
super().__init__(
image_name="otel/opentelemetry-collector-contrib:latest",
name="collector",
command="--config=/etc/otelcol-config.yml",
environment={},
volumes={"./utils/build/docker/otelcol-config.yaml": {"bind": "/etc/otelcol-config.yml", "mode": "ro",}},
host_log_folder=host_log_folder,
ports={"13133/tcp": ("0.0.0.0", 13133)},
)

# Override wait_for_health because we cannot do docker exec for container opentelemetry-collector-contrib
def wait_for_health(self):
time.sleep(20) # It takes long for otel collector to start

for i in range(61):
try:
r = requests.get("http://localhost:13133", timeout=1)
logger.debug(f"Healthcheck #{i} on localhost:13133: {r}")
if r.status_code == 200:
return
except Exception as e:
logger.debug(f"Healthcheck #{i} on localhost:13133: {e}")
time.sleep(1)
pytest.exit("localhost:13133 never answered to healthcheck request", 1)
10 changes: 10 additions & 0 deletions utils/build/docker/agent.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ RUN echo '\
log_level: DEBUG\n\
apm_config:\n\
apm_non_local_traffic: true\n\
otlp_config:\n\
debug:\n\
verbosity: detailed\n\
receiver:\n\
protocols:\n\
http:\n\
endpoint: 0.0.0.0:4318\n\
traces:\n\
enabled: true\n\
span_name_as_resource_name: true\n\
' >> /etc/datadog-agent/datadog.yaml

# Proxy conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,29 @@ public static void main(String[] args) {
ResourceAttributes.SERVICE_NAME, "otel-system-tests-spring-boot",
ResourceAttributes.DEPLOYMENT_ENVIRONMENT, "system-tests"));

OtlpHttpSpanExporter intakeExporter = OtlpHttpSpanExporter.builder()
OtlpHttpSpanExporter agentExporter =
OtlpHttpSpanExporter.builder()
.setEndpoint("http://proxy:8126/v1/traces")
.addHeader("dd-protocol", "otlp")
.addHeader("dd-otlp-path", "agent")
.build();
OtlpHttpSpanExporter intakeExporter =
OtlpHttpSpanExporter.builder()
.setEndpoint("http://proxy:8126/api/v0.2/traces") // send to the proxy first
.addHeader("dd-protocol", "otlp")
.addHeader("dd-api-key", System.getenv("DD_API_KEY"))
.addHeader("dd-otlp-path", "intake")
.build();
OtlpHttpSpanExporter collectorExporter =
OtlpHttpSpanExporter.builder()
.setEndpoint("http://proxy:8126/v1/traces")
.addHeader("dd-protocol", "otlp")
.addHeader("dd-otlp-path", "collector")
.build();

SpanExporter loggingSpanExporter = OtlpJsonLoggingSpanExporter.create();

SpanExporter exporter = SpanExporter.composite(intakeExporter, loggingSpanExporter);
SpanExporter exporter = SpanExporter.composite(agentExporter, intakeExporter, collectorExporter, loggingSpanExporter);

SpanProcessor processor = BatchSpanProcessor.builder(exporter)
.setMaxExportBatchSize(1)
Expand Down
Loading

0 comments on commit dcadd52

Please sign in to comment.