From f66b76a162736675390884f881d5484e61983de8 Mon Sep 17 00:00:00 2001 From: Yang Song Date: Thu, 16 Mar 2023 16:44:39 -0400 Subject: [PATCH 01/15] [opentelemetry] Add OTLP intake E2E system tests --- requirements.txt | 6 + tests/otel_tracing_e2e/_validator.py | 93 ++++++ tests/otel_tracing_e2e/test_e2e.py | 83 +++++ utils/_context/_scenarios.py | 31 +- utils/build/build.sh | 3 +- .../java_otel/spring-boot-native.Dockerfile | 29 ++ .../java_otel/spring-boot-native/pom.xml | 94 ++++++ .../com/datadoghq/springbootnative/App.java | 71 +++++ .../springbootnative/WebController.java | 287 ++++++++++++++++++ 9 files changed, 691 insertions(+), 6 deletions(-) create mode 100644 tests/otel_tracing_e2e/_validator.py create mode 100644 tests/otel_tracing_e2e/test_e2e.py create mode 100644 utils/build/docker/java_otel/spring-boot-native.Dockerfile create mode 100644 utils/build/docker/java_otel/spring-boot-native/pom.xml create mode 100644 utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/App.java create mode 100644 utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/WebController.java diff --git a/requirements.txt b/requirements.txt index 3236e84fdb..634f0fb681 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,3 +19,9 @@ rfc3339-validator==0.1.4 matplotlib docker==6.0.0 + +opentelemetry-api==1.16.0 +opentelemetry-exporter-otlp-proto-http==1.16.0 +opentelemetry-sdk==1.16.0 +opentelemetry-propagator-b3==1.16.0 +opentelemetry-semantic-conventions==0.37b0 diff --git a/tests/otel_tracing_e2e/_validator.py b/tests/otel_tracing_e2e/_validator.py new file mode 100644 index 0000000000..f7198457a5 --- /dev/null +++ b/tests/otel_tracing_e2e/_validator.py @@ -0,0 +1,93 @@ +import json + +# Util functions to validate JSON trace data from OTel system tests + + +def validate_trace(trace_data, dd_trace_id, root_span_id, otel_trace_id): + assert root_span_id == int(trace_data["root_id"]) + spans = trace_data["spans"] + assert len(spans) == 3 + root_span = None + server_span = None + message_span = None + for item in spans.items(): + span_id = item[0] + span = item[1] + validate_common_tags(span, dd_trace_id, otel_trace_id) + if int(span_id) == root_span_id: + root_span = span + elif span["type"] == "web": + server_span = span + elif span["type"] == "custom": + message_span = span + else: + raise Exception("Unexpected span ", span) + validate_root_span(root_span, server_span["span_id"], message_span["span_id"]) + validate_server_span(server_span, root_span_id) + validate_message_span(message_span, root_span_id) + # TODO: validate span links once the format is fixed in intake + # validate_span_link(server_span, message_span, otel_trace_id) + + +def validate_common_tags(span, dd_trace_id, otel_trace_id): + assert int(span["trace_id"]) == dd_trace_id + assert int(span["meta"]["otel.trace_id"], base=16) == otel_trace_id + assert span["env"] == "system-tests" + assert span["meta"]["env"] == "system-tests" + assert span["meta"]["deployment.environment"] == "system-tests" + assert span["meta"]["_dd.ingestion_reason"] == "otel" + assert span["ingestion_reason"] == "otel" + assert span["meta"]["otel.status_code"] == "STATUS_CODE_UNSET" + + +def validate_root_span(span, server_span_id, message_span_id): + assert span["parent_id"] == "0" + assert span["type"] == "http" + assert span["service"] == "system-tests-runner" + assert span["name"] == "runner.get" + assert span["resource"] == "GET" + assert span["meta"]["otel.user_agent"] == "OTel-OTLP-Exporter-Python/1.16.0" + assert span["meta"]["otel.library.name"] == "system-tests-runner" + assert span["meta"]["telemetry.sdk.name"] == "opentelemetry" + assert span["meta"]["telemetry.sdk.language"] == "python" + assert span["meta"]["telemetry.sdk.version"] == "1.16.0" + assert span["meta"]["http.status_code"] == "200" + assert span["meta"]["http.host"] == "weblog" + assert span["meta"]["http.url"] == "http://weblog:7777" + assert span["meta"]["http.method"] == "GET" + assert len(span["children_ids"]) == 2 + assert server_span_id in span["children_ids"] + assert message_span_id in span["children_ids"] + + +def validate_server_span(span, root_span_id): + assert int(span["parent_id"]) == root_span_id + assert span["type"] == "web" + assert span["service"] == "otel-system-tests-spring-boot" + assert span["name"] == "WebController.home" + assert span["resource"] == "GET /" + assert span["meta"]["otel.user_agent"] == "OTel-OTLP-Exporter-Java/1.23.1" + assert span["meta"]["otel.library.name"] == "com.datadoghq.springbootnative" + assert span["meta"]["http.route"] == "/" + assert span["meta"]["http.method"] == "GET" + + +def validate_message_span(span, root_span_id): + assert int(span["parent_id"]) == root_span_id + assert span["type"] == "custom" + assert span["service"] == "otel-system-tests-spring-boot" + assert span["name"] == "WebController.home.publish" + assert span["resource"] == "publish" + assert span["meta"]["otel.user_agent"] == "OTel-OTLP-Exporter-Java/1.23.1" + assert span["meta"]["otel.library.name"] == "com.datadoghq.springbootnative" + assert span["meta"]["messaging.operation"] == "publish" + assert span["meta"]["messaging.system"] == "rabbitmq" + + +def validate_span_link(server_span, message_span, otel_trace_id): + span_links = json.load(server_span["meta"]["_dd.span_links"]) + assert len(span_links) == 1 + span_link = span_links[0] + assert int(span_link["trace_id"], base=16) == otel_trace_id + assert span_link["span_id"] == message_span["span_id"] + assert span_link["attributes"] == {"messaging.operation": "publish"} diff --git a/tests/otel_tracing_e2e/test_e2e.py b/tests/otel_tracing_e2e/test_e2e.py new file mode 100644 index 0000000000..b2335103fb --- /dev/null +++ b/tests/otel_tracing_e2e/test_e2e.py @@ -0,0 +1,83 @@ +import json +import os + +from opentelemetry import trace, propagate +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, Resource, SERVICE_NAME +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.propagators.b3 import B3MultiFormat, B3SingleFormat +from _validator import validate_trace +from utils import context, weblog, interfaces, scenarios, missing_feature + + +class E2ETestBase: + def setup_main(self): + self.setup_opentelemetry() + self.use_128_bits_trace_id = False + with self.tracer.start_as_current_span(name="runner.get", kind=trace.SpanKind.CLIENT) as span: + span.set_attribute(SpanAttributes.HTTP_METHOD, "GET") + span.set_attribute(SpanAttributes.HTTP_URL, "http://weblog:7777") + headers = {} + propagate.get_global_textmap().inject(headers) + self.r = weblog.get(path="/", headers=headers) + span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, self.r.status_code) + self.trace_id = span.get_span_context().trace_id + self.root_span_id = span.get_span_context().span_id + + def setup_opentelemetry(self): + resource = Resource.create( + attributes={SERVICE_NAME: "system-tests-runner", DEPLOYMENT_ENVIRONMENT: "system-tests"} + ) + dd_site = os.environ.get("DD_SITE") + if dd_site is None or dd_site == "": + dd_site = "datad0g.com" + exporter = OTLPSpanExporter( + endpoint=f"https://trace.agent.{dd_site}/api/v0.2/traces", + headers={"dd-protocol": "otlp", "dd-api-key": os.environ.get("DD_API_KEY"),}, + ) + processor = BatchSpanProcessor(span_exporter=exporter, max_export_batch_size=1) + trace.set_tracer_provider(TracerProvider(resource=resource, active_span_processor=processor)) + self.tracer = trace.get_tracer("system-tests-runner") + + def test_main(self): + response = interfaces.backend._wait_for_trace( + rid=self.r, trace_id=self._get_dd_trace_id(), retries=5, sleep_interval_multiplier=2.0 + ) + trace_data = json.loads(response["response"]["content"])["trace"] + validate_trace(trace_data, self._get_dd_trace_id(), self.root_span_id, self.trace_id) + + def _get_dd_trace_id(self): + if self.use_128_bits_trace_id: + return self.trace_id + trace_id_bytes = self.trace_id.to_bytes(16, "big") + return int.from_bytes(trace_id_bytes[8:], "big") + + +@scenarios.otel_tracing_e2e_w3c +@missing_feature( + context.library != "java_otel", reason="OTel tests only support OTel instrumented applications at the moment.", +) +class Test_E2E_W3C(E2ETestBase): + pass # Use the default propagator TraceContextTextMapPropagator (W3C propagator) + + +@scenarios.otel_tracing_e2e_b3 +@missing_feature( + context.library != "java_otel", reason="OTel tests only support OTel instrumented applications at the moment.", +) +class Test_E2E_B3(E2ETestBase): + def setup_main(self): + propagate.set_global_textmap(B3SingleFormat()) + E2ETestBase.setup_main(self) + + +@scenarios.otel_tracing_e2e_b3_multi +@missing_feature( + context.library != "java_otel", reason="OTel tests only support OTel instrumented applications at the moment.", +) +class Test_E2E_B3_Multi(E2ETestBase): + def setup_main(self): + propagate.set_global_textmap(B3MultiFormat()) + E2ETestBase.setup_main(self) diff --git a/utils/_context/_scenarios.py b/utils/_context/_scenarios.py index 180b46c2e4..8858c5ad92 100644 --- a/utils/_context/_scenarios.py +++ b/utils/_context/_scenarios.py @@ -197,11 +197,10 @@ def _get_warmups(self): for container in self._required_containers: warmups.append(container.start) - warmups += [ - self.agent_container.start, - self.weblog_container.start, - EndToEndScenario._wait_for_app_readiness, - ] + warmups += [self.agent_container.start, self.weblog_container.start] + # TODO: Support Agent ingestion path in OTel tests + if self.weblog_container.library != "java_otel": + warmups += [EndToEndScenario._wait_for_app_readiness] return warmups @@ -465,6 +464,28 @@ class scenarios: backend_interface_timeout=5, ) + # OpenTelemetry tracing end-to-end scenarios + otel_tracing_e2e_w3c = EndToEndScenario( + "OTEL_TRACING_E2E_W3C", + weblog_env={"DD_API_KEY": os.environ.get("DD_API_KEY"), "DD_SITE": os.environ.get("DD_SITE"),}, + ) + otel_tracing_e2e_b3 = EndToEndScenario( + "OTEL_TRACING_E2E_B3", + weblog_env={ + "DD_API_KEY": os.environ.get("DD_API_KEY"), + "DD_SITE": os.environ.get("DD_SITE"), + "OTEL_PROPAGATORS": "b3", + }, + ) + otel_tracing_e2e_b3_multi = EndToEndScenario( + "OTEL_TRACING_E2E_B3_MULTI", + weblog_env={ + "DD_API_KEY": os.environ.get("DD_API_KEY"), + "DD_SITE": os.environ.get("DD_SITE"), + "OTEL_PROPAGATORS": "b3multi", + }, + ) + library_conf_custom_headers_short = EndToEndScenario( "LIBRARY_CONF_CUSTOM_HEADERS_SHORT", additional_trace_header_tags=("header-tag1", "header-tag2") ) diff --git a/utils/build/build.sh b/utils/build/build.sh index cb75ff5b6a..150ba69406 100755 --- a/utils/build/build.sh +++ b/utils/build/build.sh @@ -27,6 +27,7 @@ readonly DEFAULT_python=flask-poc readonly DEFAULT_ruby=rails70 readonly DEFAULT_golang=net-http readonly DEFAULT_java=spring-boot +readonly DEFAULT_java_otel=spring-boot-native readonly DEFAULT_php=apache-mod-8.0 readonly DEFAULT_dotnet=poc readonly DEFAULT_cpp=nginx @@ -213,7 +214,7 @@ COMMAND=build while [[ "$#" -gt 0 ]]; do case $1 in - cpp|dotnet|golang|java|nodejs|php|python|ruby) TEST_LIBRARY="$1";; + cpp|dotnet|golang|java|java_otel|nodejs|php|python|ruby) TEST_LIBRARY="$1";; -l|--library) TEST_LIBRARY="$2"; shift ;; -i|--images) BUILD_IMAGES="$2"; shift ;; -w|--weblog-variant) WEBLOG_VARIANT="$2"; shift ;; diff --git a/utils/build/docker/java_otel/spring-boot-native.Dockerfile b/utils/build/docker/java_otel/spring-boot-native.Dockerfile new file mode 100644 index 0000000000..514a872424 --- /dev/null +++ b/utils/build/docker/java_otel/spring-boot-native.Dockerfile @@ -0,0 +1,29 @@ +FROM openjdk:17-buster + +# Install required bsdtar +RUN apt-get update && \ + apt-get install -y libarchive-tools + + +# Install maven +RUN curl https://archive.apache.org/dist/maven/maven-3/3.8.6/binaries/apache-maven-3.8.6-bin.tar.gz --output /opt/maven.tar.gz && \ + tar xzvf /opt/maven.tar.gz --directory /opt && \ + rm /opt/maven.tar.gz + +WORKDIR /app + +# Copy application sources and cache dependencies +COPY ./utils/build/docker/java_otel/spring-boot-native/pom.xml . +COPY ./utils/build/docker/java_otel/spring-boot-native/src ./src + +# Compile application +RUN /opt/apache-maven-3.8.6/bin/mvn clean package + +# Set up required args +RUN echo "1.23.1" > SYSTEM_TESTS_LIBRARY_VERSION +RUN echo "1.0.0" > SYSTEM_TESTS_LIBDDWAF_VERSION +RUN echo "1.0.0" > SYSTEM_TESTS_APPSEC_EVENT_RULES_VERSION + +RUN echo "#!/bin/bash\njava -jar target/myproject-3.0.0-SNAPSHOT.jar --server.port=7777" > app.sh +RUN chmod +x app.sh +CMD [ "./app.sh" ] diff --git a/utils/build/docker/java_otel/spring-boot-native/pom.xml b/utils/build/docker/java_otel/spring-boot-native/pom.xml new file mode 100644 index 0000000000..6a5a733f69 --- /dev/null +++ b/utils/build/docker/java_otel/spring-boot-native/pom.xml @@ -0,0 +1,94 @@ + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 3.0.1 + + + com.example + myproject + 3.0.0-SNAPSHOT + jar + + + + + io.opentelemetry + opentelemetry-bom + 1.23.1 + pom + import + + + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-tomcat + + + + javax.servlet + javax.servlet-api + 4.0.1 + + + + io.opentelemetry + opentelemetry-api + + + + io.opentelemetry + opentelemetry-sdk + + + + io.opentelemetry + opentelemetry-exporter-otlp + + + + io.opentelemetry + opentelemetry-exporter-logging-otlp + + + + io.opentelemetry + opentelemetry-exporter-logging-otlp + + + + io.opentelemetry + opentelemetry-extension-trace-propagators + 1.24.0 + + + + io.opentelemetry + opentelemetry-semconv + 1.23.1-alpha + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + \ No newline at end of file diff --git a/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/App.java b/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/App.java new file mode 100644 index 0000000000..c8c40254d5 --- /dev/null +++ b/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/App.java @@ -0,0 +1,71 @@ +package com.datadoghq.springbootnative; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.exporter.logging.otlp.OtlpJsonLoggingSpanExporter; +import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter; +import io.opentelemetry.extension.trace.propagation.B3Propagator; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; +import java.util.logging.Logger; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; + +@SpringBootApplication +@ComponentScan(basePackages = {"com.datadoghq.springbootnative"}) +public class App { + private static final Logger logger = Logger.getLogger(App.class.getName()); + + public static void main(String[] args) { + Resource resource = Resource.create(Attributes.of( + ResourceAttributes.SERVICE_NAME, "otel-system-tests-spring-boot", + ResourceAttributes.DEPLOYMENT_ENVIRONMENT, "system-tests")); + + String ddSite = System.getenv().getOrDefault("DD_SITE", "datad0g.com"); + OtlpHttpSpanExporter intakeExporter = OtlpHttpSpanExporter.builder() + .setEndpoint(String.format("http://trace.agent.%s/api/v0.2/traces", ddSite)) + .addHeader("dd-protocol", "otlp") + .addHeader("dd-api-key", System.getenv("DD_API_KEY")) + .build(); + + SpanExporter loggingSpanExporter = OtlpJsonLoggingSpanExporter.create(); + + SpanExporter exporter = SpanExporter.composite(intakeExporter, loggingSpanExporter); + + SpanProcessor processor = BatchSpanProcessor.builder(exporter) + .setMaxExportBatchSize(1) + .build(); + + SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(processor) + .setSampler(Sampler.alwaysOn()) + .setResource(resource) + .build(); + + String propagator = System.getenv().getOrDefault("OTEL_PROPAGATORS", "tracecontext"); + TextMapPropagator textPropagator = switch (propagator) { + case "tracecontext" -> W3CTraceContextPropagator.getInstance(); + case "b3" -> B3Propagator.injectingSingleHeader(); + case "b3multi" -> B3Propagator.injectingMultiHeaders(); + default -> TextMapPropagator.noop(); + }; + OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() + .setTracerProvider(sdkTracerProvider) + .setPropagators(ContextPropagators.create(textPropagator)) + .buildAndRegisterGlobal(); + + logger.info("Using OpenTelemetry Propagator: " + openTelemetry.getPropagators().getTextMapPropagator()); + + SpringApplication.run(App.class, args); + } +} diff --git a/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/WebController.java b/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/WebController.java new file mode 100644 index 0000000000..7c6d2688ff --- /dev/null +++ b/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/WebController.java @@ -0,0 +1,287 @@ +package com.datadoghq.springbootnative; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.servlet.http.HttpServletResponse; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestHeader; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +public class WebController { + private final Tracer tracer = GlobalOpenTelemetry.getTracer("com.datadoghq.springbootnative"); + private final TextMapPropagator propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + + private static final TextMapGetter getter = + new TextMapGetter<>() { + @Override + public String get(HttpHeaders headers, String key) { + List vals = headers.get(key); + return vals == null ? "" : vals.get(0); + } + + @Override + public Iterable keys(HttpHeaders headers) { + return headers.keySet(); + } + }; + + @RequestMapping("/") + String home(@RequestHeader HttpHeaders headers) throws InterruptedException { + try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { + // Create a fake producer span to test span links + Span fakeSpan = tracer.spanBuilder("WebController.home.publish") + .setSpanKind(SpanKind.PRODUCER) + .setAttribute("messaging.system", "rabbitmq") + .setAttribute("messaging.operation", "publish") + .startSpan(); + Thread.sleep(1); + fakeSpan.end(); + + Span span = tracer.spanBuilder("WebController.home") + .setSpanKind(SpanKind.SERVER) + .addLink( + fakeSpan.getSpanContext(), + Attributes.of(AttributeKey.stringKey("messaging.operation"), "publish")) + .setAttribute(SemanticAttributes.HTTP_ROUTE, "/") + .setAttribute(SemanticAttributes.HTTP_METHOD, "GET") + .startSpan(); + try (Scope ignored = span.makeCurrent()) { + Thread.sleep(5); + return "Hello World!"; + } finally { + span.end(); + } + } + } + + @GetMapping("/headers") + String headers(HttpServletResponse response, @RequestHeader HttpHeaders headers) { + try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { + Span span = tracer.spanBuilder("WebController.headers") + .setSpanKind(SpanKind.SERVER) + .setAttribute(SemanticAttributes.HTTP_ROUTE, "/headers") + .startSpan(); + try (Scope ignored = span.makeCurrent()) { + response.setHeader("content-language", "en-US"); + return "012345678901234567890123456789012345678901"; + } + finally { + span.end(); + } + } + } + + @RequestMapping("/status") + ResponseEntity status(@RequestParam Integer code, @RequestHeader HttpHeaders headers) { + try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { + Span span = tracer.spanBuilder("WebController.status") + .setSpanKind(SpanKind.SERVER) + .setAttribute(SemanticAttributes.HTTP_ROUTE, "/status") + .setAttribute(SemanticAttributes.HTTP_STATUS_CODE, code == null ? 0 : code.longValue()) + .startSpan(); + try (Scope ignored = span.makeCurrent()) { + return new ResponseEntity<>(HttpStatus.valueOf(code)); + } + finally { + span.end(); + } + } + } + + @RequestMapping("/hello") + public String hello(@RequestHeader HttpHeaders headers) { + try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { + Span span = tracer.spanBuilder("WebController.hello") + .setSpanKind(SpanKind.SERVER) + .setAttribute(SemanticAttributes.HTTP_ROUTE, "/hello") + .startSpan(); + try (Scope ignored = span.makeCurrent()) { + return "Hello world"; + } + finally { + span.end(); + } + } + } + + @RequestMapping("/sample_rate_route/{i}") + String sample_route(@PathVariable("i") String i, @RequestHeader HttpHeaders headers) { + try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { + Span span = tracer.spanBuilder("WebController.sample_route") + .setSpanKind(SpanKind.SERVER) + .setAttribute(SemanticAttributes.HTTP_ROUTE, "/sample_rate_route/{i}") + .setAttribute("i", i) + .startSpan(); + try (Scope ignored = span.makeCurrent()) { + return "OK"; + } + finally { + span.end(); + } + } + } + + @RequestMapping("/params/{str}") + String params_route(@PathVariable("str") String str, @RequestHeader HttpHeaders headers) { + try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { + Span span = tracer.spanBuilder("WebController.params_route") + .setSpanKind(SpanKind.SERVER) + .setAttribute(SemanticAttributes.HTTP_ROUTE, "/params/{str}") + .setAttribute("str", str) + .startSpan(); + try (Scope ignored = span.makeCurrent()) { + return "OK"; + } + finally { + span.end(); + } + } + } + + @GetMapping("/user_login_success_event") + public String userLoginSuccess( + @RequestParam(value = "event_user_id", defaultValue = "system_tests_user") String userId, + @RequestHeader HttpHeaders headers) { + try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { + Span span = tracer.spanBuilder("WebController.userLoginSuccess") + .setSpanKind(SpanKind.SERVER) + .setAttribute(SemanticAttributes.HTTP_ROUTE, "/user_login_success_event") + .setAttribute("event_user_id", userId) + .startSpan(); + try (Scope ignored = span.makeCurrent()) { + return "OK"; + } + finally { + span.end(); + } + } + } + + @GetMapping("/user_login_failure_event") + public String userLoginFailure( + @RequestParam(value = "event_user_id", defaultValue = "system_tests_user") String userId, + @RequestParam(value = "event_user_exists", defaultValue = "true") boolean eventUserExists, + @RequestHeader HttpHeaders headers) { + try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { + Span span = tracer.spanBuilder("WebController.userLoginFailure") + .setSpanKind(SpanKind.SERVER) + .setAttribute(SemanticAttributes.HTTP_ROUTE, "/user_login_failure_event") + .setAttribute("event_user_id", userId) + .setAttribute("event_user_exists", eventUserExists) + .startSpan(); + try (Scope ignored = span.makeCurrent()) { + return "OK"; + } + finally { + span.end(); + } + } + } + + @GetMapping("/custom_event") + public String customEvent( + @RequestParam(value = "event_name", defaultValue = "system_tests_event") String eventName, + @RequestHeader HttpHeaders headers) { + try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { + Span span = tracer.spanBuilder("WebController.customEvent") + .setSpanKind(SpanKind.SERVER) + .setAttribute(SemanticAttributes.HTTP_ROUTE, "/custom_event") + .startSpan(); + try (Scope ignored = span.makeCurrent()) { + span.addEvent("custom_event", Attributes.of(AttributeKey.stringKey("event_name"), eventName)); + return "OK"; + } + finally { + span.end(); + } + } + } + + @RequestMapping("/make_distant_call") + DistantCallResponse make_distant_call( + @RequestParam String url, @RequestHeader HttpHeaders headers) throws Exception { + try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { + Span span = tracer.spanBuilder("WebController.make_distant_call") + .setSpanKind(SpanKind.SERVER) + .setAttribute(SemanticAttributes.HTTP_ROUTE, "/make_distant_call") + .setAttribute(SemanticAttributes.HTTP_URL, url) + .startSpan(); + try (Scope ignored = span.makeCurrent()) { + URL urlObject = new URL(url); + + HttpURLConnection con = (HttpURLConnection) urlObject.openConnection(); + con.setRequestMethod("GET"); + + Span child = tracer.spanBuilder("WebController.make_distant_call.get") + .setSpanKind(SpanKind.CLIENT) + .setAttribute(SemanticAttributes.HTTP_URL, url) + .setAttribute(SemanticAttributes.HTTP_METHOD, "GET") + .startSpan(); + + // Save request headers + HashMap request_headers = new HashMap(); + for (Map.Entry> header: con.getRequestProperties().entrySet()) { + if (header.getKey() == null) { + continue; + } + + request_headers.put(header.getKey(), header.getValue().get(0)); + child.setAttribute(header.getKey(), header.getValue().get(0)); + } + + // Save response headers and status code + int status_code = con.getResponseCode(); + child.setAttribute(SemanticAttributes.HTTP_STATUS_CODE, status_code); + child.end(); + + HashMap response_headers = new HashMap(); + for (Map.Entry> header: con.getHeaderFields().entrySet()) { + if (header.getKey() == null) { + continue; + } + + response_headers.put(header.getKey(), header.getValue().get(0)); + } + + DistantCallResponse result = new DistantCallResponse(); + result.url = url; + result.status_code = status_code; + result.request_headers = request_headers; + result.response_headers = response_headers; + + return result; + } + finally { + span.end(); + } + } + } + + public static final class DistantCallResponse { + public String url; + public int status_code; + public HashMap request_headers; + public HashMap response_headers; + } +} From 50a251cefbc7d1d2046f447d351bfd2c93603141 Mon Sep 17 00:00:00 2001 From: Yang Song Date: Sat, 25 Mar 2023 10:34:41 -0400 Subject: [PATCH 02/15] Refactor OTel system tests - Associate OTel trace id with request id - Remove client side instrumentation - Remove trace context propagation - Support OTLP intake in proxy --- requirements.txt | 6 +- tests/otel_tracing_e2e/_validator.py | 96 +++++++------------ tests/otel_tracing_e2e/test_e2e.py | 87 +++-------------- utils/_context/_scenarios.py | 29 ++---- .../com/datadoghq/springbootnative/App.java | 3 +- .../springbootnative/WebController.java | 2 + utils/interfaces/_backend.py | 13 +++ utils/interfaces/_library/core.py | 20 ++++ utils/proxy/core.py | 11 ++- 9 files changed, 102 insertions(+), 165 deletions(-) diff --git a/requirements.txt b/requirements.txt index 634f0fb681..e31cac4a40 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,8 +20,4 @@ matplotlib docker==6.0.0 -opentelemetry-api==1.16.0 -opentelemetry-exporter-otlp-proto-http==1.16.0 -opentelemetry-sdk==1.16.0 -opentelemetry-propagator-b3==1.16.0 -opentelemetry-semantic-conventions==0.37b0 +opentelemetry-proto==1.17.0 diff --git a/tests/otel_tracing_e2e/_validator.py b/tests/otel_tracing_e2e/_validator.py index f7198457a5..254cd549e2 100644 --- a/tests/otel_tracing_e2e/_validator.py +++ b/tests/otel_tracing_e2e/_validator.py @@ -1,93 +1,69 @@ -import json - # Util functions to validate JSON trace data from OTel system tests -def validate_trace(trace_data, dd_trace_id, root_span_id, otel_trace_id): - assert root_span_id == int(trace_data["root_id"]) - spans = trace_data["spans"] - assert len(spans) == 3 - root_span = None +def validate_trace(traces: list, use_128_bits_trace_id: bool): server_span = None message_span = None - for item in spans.items(): - span_id = item[0] - span = item[1] - validate_common_tags(span, dd_trace_id, otel_trace_id) - if int(span_id) == root_span_id: - root_span = span - elif span["type"] == "web": - server_span = span - elif span["type"] == "custom": - message_span = span - else: - raise Exception("Unexpected span ", span) - validate_root_span(root_span, server_span["span_id"], message_span["span_id"]) - validate_server_span(server_span, root_span_id) - validate_message_span(message_span, root_span_id) + for trace in traces: + spans = trace["spans"] + assert len(spans) == 1 + for item in spans.items(): + span = item[1] + validate_common_tags(span, use_128_bits_trace_id) + if span["type"] == "web": + server_span = span + elif span["type"] == "custom": + message_span = span + else: + raise Exception("Unexpected span ", span) + validate_server_span(server_span) + validate_message_span(message_span) # TODO: validate span links once the format is fixed in intake - # validate_span_link(server_span, message_span, otel_trace_id) + # validate_span_link(server_span, message_span) -def validate_common_tags(span, dd_trace_id, otel_trace_id): - assert int(span["trace_id"]) == dd_trace_id - assert int(span["meta"]["otel.trace_id"], base=16) == otel_trace_id +def validate_common_tags(span: dict, use_128_bits_trace_id: bool): + assert span["parent_id"] == "0" assert span["env"] == "system-tests" + assert span["service"] == "otel-system-tests-spring-boot" assert span["meta"]["env"] == "system-tests" assert span["meta"]["deployment.environment"] == "system-tests" assert span["meta"]["_dd.ingestion_reason"] == "otel" assert span["ingestion_reason"] == "otel" - assert span["meta"]["otel.status_code"] == "STATUS_CODE_UNSET" + assert span["meta"]["otel.status_code"] == "Unset" + assert span["meta"]["otel.user_agent"] == "OTel-OTLP-Exporter-Java/1.23.1" + assert span["meta"]["otel.library.name"] == "com.datadoghq.springbootnative" + validate_trace_id(span, use_128_bits_trace_id) -def validate_root_span(span, server_span_id, message_span_id): - assert span["parent_id"] == "0" - assert span["type"] == "http" - assert span["service"] == "system-tests-runner" - assert span["name"] == "runner.get" - assert span["resource"] == "GET" - assert span["meta"]["otel.user_agent"] == "OTel-OTLP-Exporter-Python/1.16.0" - assert span["meta"]["otel.library.name"] == "system-tests-runner" - assert span["meta"]["telemetry.sdk.name"] == "opentelemetry" - assert span["meta"]["telemetry.sdk.language"] == "python" - assert span["meta"]["telemetry.sdk.version"] == "1.16.0" - assert span["meta"]["http.status_code"] == "200" - assert span["meta"]["http.host"] == "weblog" - assert span["meta"]["http.url"] == "http://weblog:7777" - assert span["meta"]["http.method"] == "GET" - assert len(span["children_ids"]) == 2 - assert server_span_id in span["children_ids"] - assert message_span_id in span["children_ids"] +def validate_trace_id(span: dict, use_128_bits_trace_id: bool): + dd_trace_id = int(span["trace_id"], base=10) + otel_trace_id = int(span["meta"]["otel.trace_id"], base=16) + if use_128_bits_trace_id: + assert dd_trace_id == otel_trace_id + else: + trace_id_bytes = otel_trace_id.to_bytes(16, "big") + assert dd_trace_id == int.from_bytes(trace_id_bytes[8:], "big") -def validate_server_span(span, root_span_id): - assert int(span["parent_id"]) == root_span_id - assert span["type"] == "web" - assert span["service"] == "otel-system-tests-spring-boot" +def validate_server_span(span: dict): assert span["name"] == "WebController.home" assert span["resource"] == "GET /" - assert span["meta"]["otel.user_agent"] == "OTel-OTLP-Exporter-Java/1.23.1" - assert span["meta"]["otel.library.name"] == "com.datadoghq.springbootnative" assert span["meta"]["http.route"] == "/" assert span["meta"]["http.method"] == "GET" -def validate_message_span(span, root_span_id): - assert int(span["parent_id"]) == root_span_id - assert span["type"] == "custom" - assert span["service"] == "otel-system-tests-spring-boot" +def validate_message_span(span: dict): assert span["name"] == "WebController.home.publish" assert span["resource"] == "publish" - assert span["meta"]["otel.user_agent"] == "OTel-OTLP-Exporter-Java/1.23.1" - assert span["meta"]["otel.library.name"] == "com.datadoghq.springbootnative" assert span["meta"]["messaging.operation"] == "publish" assert span["meta"]["messaging.system"] == "rabbitmq" -def validate_span_link(server_span, message_span, otel_trace_id): - span_links = json.load(server_span["meta"]["_dd.span_links"]) +def validate_span_link(server_span: dict, message_span: dict): + span_links = server_span["meta"]["_dd.span_links"] assert len(span_links) == 1 span_link = span_links[0] - assert int(span_link["trace_id"], base=16) == otel_trace_id + assert span_link["trace_id"] == message_span["meta"]["otel.trace_id"] assert span_link["span_id"] == message_span["span_id"] assert span_link["attributes"] == {"messaging.operation": "publish"} diff --git a/tests/otel_tracing_e2e/test_e2e.py b/tests/otel_tracing_e2e/test_e2e.py index b2335103fb..5392432c7e 100644 --- a/tests/otel_tracing_e2e/test_e2e.py +++ b/tests/otel_tracing_e2e/test_e2e.py @@ -1,83 +1,24 @@ -import json -import os - -from opentelemetry import trace, propagate -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, Resource, SERVICE_NAME -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.semconv.trace import SpanAttributes -from opentelemetry.propagators.b3 import B3MultiFormat, B3SingleFormat from _validator import validate_trace from utils import context, weblog, interfaces, scenarios, missing_feature -class E2ETestBase: - def setup_main(self): - self.setup_opentelemetry() - self.use_128_bits_trace_id = False - with self.tracer.start_as_current_span(name="runner.get", kind=trace.SpanKind.CLIENT) as span: - span.set_attribute(SpanAttributes.HTTP_METHOD, "GET") - span.set_attribute(SpanAttributes.HTTP_URL, "http://weblog:7777") - headers = {} - propagate.get_global_textmap().inject(headers) - self.r = weblog.get(path="/", headers=headers) - span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, self.r.status_code) - self.trace_id = span.get_span_context().trace_id - self.root_span_id = span.get_span_context().span_id - - def setup_opentelemetry(self): - resource = Resource.create( - attributes={SERVICE_NAME: "system-tests-runner", DEPLOYMENT_ENVIRONMENT: "system-tests"} - ) - dd_site = os.environ.get("DD_SITE") - if dd_site is None or dd_site == "": - dd_site = "datad0g.com" - exporter = OTLPSpanExporter( - endpoint=f"https://trace.agent.{dd_site}/api/v0.2/traces", - headers={"dd-protocol": "otlp", "dd-api-key": os.environ.get("DD_API_KEY"),}, - ) - processor = BatchSpanProcessor(span_exporter=exporter, max_export_batch_size=1) - trace.set_tracer_provider(TracerProvider(resource=resource, active_span_processor=processor)) - self.tracer = trace.get_tracer("system-tests-runner") - - def test_main(self): - response = interfaces.backend._wait_for_trace( - rid=self.r, trace_id=self._get_dd_trace_id(), retries=5, sleep_interval_multiplier=2.0 - ) - trace_data = json.loads(response["response"]["content"])["trace"] - validate_trace(trace_data, self._get_dd_trace_id(), self.root_span_id, self.trace_id) - - def _get_dd_trace_id(self): - if self.use_128_bits_trace_id: - return self.trace_id - trace_id_bytes = self.trace_id.to_bytes(16, "big") - return int.from_bytes(trace_id_bytes[8:], "big") - - -@scenarios.otel_tracing_e2e_w3c -@missing_feature( - context.library != "java_otel", reason="OTel tests only support OTel instrumented applications at the moment.", -) -class Test_E2E_W3C(E2ETestBase): - pass # Use the default propagator TraceContextTextMapPropagator (W3C propagator) - - -@scenarios.otel_tracing_e2e_b3 +@scenarios.otel_tracing_e2e @missing_feature( context.library != "java_otel", reason="OTel tests only support OTel instrumented applications at the moment.", ) -class Test_E2E_B3(E2ETestBase): +class Test_OTel_E2E: def setup_main(self): - propagate.set_global_textmap(B3SingleFormat()) - E2ETestBase.setup_main(self) + self.use_128_bits_trace_id = False + self.r = weblog.get(path="/") + def test_main(self): + otel_trace_ids = list(interfaces.library.get_otel_trace_id(request=self.r)) + assert len(otel_trace_ids) == 2 + dd_trace_ids = map(self._get_dd_trace_id, otel_trace_ids) + traces = map(lambda dd_trace_id: interfaces.backend.assert_otlp_trace_exist(request=self.r, dd_trace_id=dd_trace_id), dd_trace_ids) + validate_trace(traces, self.use_128_bits_trace_id) -@scenarios.otel_tracing_e2e_b3_multi -@missing_feature( - context.library != "java_otel", reason="OTel tests only support OTel instrumented applications at the moment.", -) -class Test_E2E_B3_Multi(E2ETestBase): - def setup_main(self): - propagate.set_global_textmap(B3MultiFormat()) - E2ETestBase.setup_main(self) + def _get_dd_trace_id(self, otel_trace_id=bytes) -> int: + if self.use_128_bits_trace_id: + return int.from_bytes(otel_trace_id, "big") + return int.from_bytes(otel_trace_id[8:], "big") diff --git a/utils/_context/_scenarios.py b/utils/_context/_scenarios.py index 8858c5ad92..548766bbea 100644 --- a/utils/_context/_scenarios.py +++ b/utils/_context/_scenarios.py @@ -197,10 +197,11 @@ def _get_warmups(self): for container in self._required_containers: warmups.append(container.start) - warmups += [self.agent_container.start, self.weblog_container.start] - # TODO: Support Agent ingestion path in OTel tests - if self.weblog_container.library != "java_otel": - warmups += [EndToEndScenario._wait_for_app_readiness] + warmups += [ + self.agent_container.start, + self.weblog_container.start, + EndToEndScenario._wait_for_app_readiness, + ] return warmups @@ -465,26 +466,10 @@ class scenarios: ) # OpenTelemetry tracing end-to-end scenarios - otel_tracing_e2e_w3c = EndToEndScenario( - "OTEL_TRACING_E2E_W3C", + otel_tracing_e2e = EndToEndScenario( + "OTEL_TRACING_E2E", weblog_env={"DD_API_KEY": os.environ.get("DD_API_KEY"), "DD_SITE": os.environ.get("DD_SITE"),}, ) - otel_tracing_e2e_b3 = EndToEndScenario( - "OTEL_TRACING_E2E_B3", - weblog_env={ - "DD_API_KEY": os.environ.get("DD_API_KEY"), - "DD_SITE": os.environ.get("DD_SITE"), - "OTEL_PROPAGATORS": "b3", - }, - ) - otel_tracing_e2e_b3_multi = EndToEndScenario( - "OTEL_TRACING_E2E_B3_MULTI", - weblog_env={ - "DD_API_KEY": os.environ.get("DD_API_KEY"), - "DD_SITE": os.environ.get("DD_SITE"), - "OTEL_PROPAGATORS": "b3multi", - }, - ) library_conf_custom_headers_short = EndToEndScenario( "LIBRARY_CONF_CUSTOM_HEADERS_SHORT", additional_trace_header_tags=("header-tag1", "header-tag2") diff --git a/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/App.java b/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/App.java index c8c40254d5..27b059fbea 100644 --- a/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/App.java +++ b/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/App.java @@ -31,9 +31,8 @@ public static void main(String[] args) { ResourceAttributes.SERVICE_NAME, "otel-system-tests-spring-boot", ResourceAttributes.DEPLOYMENT_ENVIRONMENT, "system-tests")); - String ddSite = System.getenv().getOrDefault("DD_SITE", "datad0g.com"); OtlpHttpSpanExporter intakeExporter = OtlpHttpSpanExporter.builder() - .setEndpoint(String.format("http://trace.agent.%s/api/v0.2/traces", ddSite)) + .setEndpoint("http://runner:8126/api/v0.2/traces") .addHeader("dd-protocol", "otlp") .addHeader("dd-api-key", System.getenv("DD_API_KEY")) .build(); diff --git a/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/WebController.java b/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/WebController.java index 7c6d2688ff..e5753b031d 100644 --- a/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/WebController.java +++ b/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/WebController.java @@ -54,6 +54,7 @@ String home(@RequestHeader HttpHeaders headers) throws InterruptedException { .setSpanKind(SpanKind.PRODUCER) .setAttribute("messaging.system", "rabbitmq") .setAttribute("messaging.operation", "publish") + .setAttribute("http.request.headers.user-agent", headers.get("User-Agent").get(0)) .startSpan(); Thread.sleep(1); fakeSpan.end(); @@ -65,6 +66,7 @@ String home(@RequestHeader HttpHeaders headers) throws InterruptedException { Attributes.of(AttributeKey.stringKey("messaging.operation"), "publish")) .setAttribute(SemanticAttributes.HTTP_ROUTE, "/") .setAttribute(SemanticAttributes.HTTP_METHOD, "GET") + .setAttribute("http.request.headers.user-agent", headers.get("User-Agent").get(0)) .startSpan(); try (Scope ignored = span.makeCurrent()) { Thread.sleep(5); diff --git a/utils/interfaces/_backend.py b/utils/interfaces/_backend.py index 47ba710fe1..ee8941e011 100644 --- a/utils/interfaces/_backend.py +++ b/utils/interfaces/_backend.py @@ -63,6 +63,19 @@ def assert_library_traces_exist(self, request, min_traces_len=1): ), f"We only found {len(traces)} traces in the library (tracers), but we expected {min_traces_len}!" return traces + def assert_otlp_trace_exist(self, request: requests.Request, dd_trace_id: str) -> dict: + """Attempts to fetch from the backend, ALL the traces that the OpenTelemetry SDKs sent to Datadog + during the execution of the given request. + + The assosiation of the traces with a request is done through propagating the request ID (inside user agent) + on all the submitted traces. This is done automatically, unless you create root spans manually, which in + that case you need to manually propagate the user agent to the new spans. + """ + + rid = get_rid_from_request(request) + data = self._wait_for_trace(rid=rid, trace_id=dd_trace_id, retries=5, sleep_interval_multiplier=2.0) + return json.loads(data["response"]["content"])["trace"] + def assert_single_spans_exist(self, request, min_spans_len=1, limit=100): """Attempts to fetch single span events using the given `query_filter` as part of the search query. The query should be what you would use in the `/apm/traces` page in the UI. diff --git a/utils/interfaces/_library/core.py b/utils/interfaces/_library/core.py index 780e69fd30..8a5b6ec35a 100644 --- a/utils/interfaces/_library/core.py +++ b/utils/interfaces/_library/core.py @@ -5,6 +5,7 @@ from collections import namedtuple import json import threading +from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ExportTraceServiceRequest from utils.tools import logger from utils.interfaces._core import InterfaceValidator, get_rid_from_request, get_rid_from_span, get_rid_from_user_agent @@ -60,6 +61,25 @@ def get_traces(self, request=None): yield data, trace break + + def get_otel_trace_id(self, request): + paths = ["/api/v0.2/traces"] + rid = get_rid_from_request(request) + + if rid: + logger.debug(f"Try to find traces related to request {rid}") + + for data in self.get_data(path_filters=paths): + export_request = ExportTraceServiceRequest() + content = eval(data["request"]["content"]) # Raw content is a str like "b'\n\x\...'" + assert export_request.ParseFromString(content) > 0, content + for resource_span in export_request.resource_spans: + for scope_span in resource_span.scope_spans: + for span in scope_span.spans: + for attribute in span.attributes: + if attribute.key == "http.request.headers.user-agent" and rid in attribute.value.string_value: + yield span.trace_id + def get_spans(self, request=None): """ Iterate over all spans reported by the tracer to the agent. diff --git a/utils/proxy/core.py b/utils/proxy/core.py index 8633cb7bd1..11313c06d7 100644 --- a/utils/proxy/core.py +++ b/utils/proxy/core.py @@ -72,8 +72,13 @@ def _scrub(self, content): def request(self, flow): if flow.request.host in ("runner", "localhost"): # localhost because on UDS mode, UDS socket is redirected - flow.request.host, flow.request.port = "agent", 8126 - flow.request.scheme = "http" + if "api/v0.2/traces" in flow.request.path: + flow.request.host = "trace.agent." + os.environ.get("DD_SITE") + flow.request.port = 443 + flow.request.scheme = "https" + else: + flow.request.host, flow.request.port = "agent", 8126 + flow.request.scheme = "http" def response(self, flow): self._modify_response(flow) @@ -108,7 +113,7 @@ def response(self, flow): if flow.error and flow.error.msg == FlowError.KILLED_MESSAGE: payload["response"] = None - if flow.request.host == "agent": + if flow.request.host == "agent" or flow.request.headers.get("dd-protocol") == "otlp": interface = interfaces.library else: interface = interfaces.agent From ec52e308a2eab3d8459ddc2038a03d0891b1cc7d Mon Sep 17 00:00:00 2001 From: Yang Song Date: Sat, 25 Mar 2023 13:06:57 -0400 Subject: [PATCH 03/15] Format --- tests/otel_tracing_e2e/test_e2e.py | 5 ++++- utils/interfaces/_library/core.py | 6 ++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/otel_tracing_e2e/test_e2e.py b/tests/otel_tracing_e2e/test_e2e.py index 5392432c7e..ccc01e7f6f 100644 --- a/tests/otel_tracing_e2e/test_e2e.py +++ b/tests/otel_tracing_e2e/test_e2e.py @@ -15,7 +15,10 @@ def test_main(self): otel_trace_ids = list(interfaces.library.get_otel_trace_id(request=self.r)) assert len(otel_trace_ids) == 2 dd_trace_ids = map(self._get_dd_trace_id, otel_trace_ids) - traces = map(lambda dd_trace_id: interfaces.backend.assert_otlp_trace_exist(request=self.r, dd_trace_id=dd_trace_id), dd_trace_ids) + traces = map( + lambda dd_trace_id: interfaces.backend.assert_otlp_trace_exist(request=self.r, dd_trace_id=dd_trace_id), + dd_trace_ids, + ) validate_trace(traces, self.use_128_bits_trace_id) def _get_dd_trace_id(self, otel_trace_id=bytes) -> int: diff --git a/utils/interfaces/_library/core.py b/utils/interfaces/_library/core.py index 8a5b6ec35a..e95341c92d 100644 --- a/utils/interfaces/_library/core.py +++ b/utils/interfaces/_library/core.py @@ -61,7 +61,6 @@ def get_traces(self, request=None): yield data, trace break - def get_otel_trace_id(self, request): paths = ["/api/v0.2/traces"] rid = get_rid_from_request(request) @@ -77,7 +76,10 @@ def get_otel_trace_id(self, request): for scope_span in resource_span.scope_spans: for span in scope_span.spans: for attribute in span.attributes: - if attribute.key == "http.request.headers.user-agent" and rid in attribute.value.string_value: + if ( + attribute.key == "http.request.headers.user-agent" + and rid in attribute.value.string_value + ): yield span.trace_id def get_spans(self, request=None): From 9c940450d68ab3ef701f35f4e4235ff834e7cfb1 Mon Sep 17 00:00:00 2001 From: Yang Song Date: Mon, 17 Apr 2023 16:53:40 -0400 Subject: [PATCH 04/15] Fix review comments --- tests/otel_tracing_e2e/_validator.py | 52 ++-- .../com/datadoghq/springbootnative/App.java | 19 +- .../springbootnative/WebController.java | 267 ++---------------- 3 files changed, 47 insertions(+), 291 deletions(-) diff --git a/tests/otel_tracing_e2e/_validator.py b/tests/otel_tracing_e2e/_validator.py index 254cd549e2..fb560a71cd 100644 --- a/tests/otel_tracing_e2e/_validator.py +++ b/tests/otel_tracing_e2e/_validator.py @@ -1,5 +1,7 @@ # Util functions to validate JSON trace data from OTel system tests +import json + def validate_trace(traces: list, use_128_bits_trace_id: bool): server_span = None @@ -18,21 +20,26 @@ def validate_trace(traces: list, use_128_bits_trace_id: bool): raise Exception("Unexpected span ", span) validate_server_span(server_span) validate_message_span(message_span) - # TODO: validate span links once the format is fixed in intake - # validate_span_link(server_span, message_span) + validate_span_link(server_span, message_span) def validate_common_tags(span: dict, use_128_bits_trace_id: bool): - assert span["parent_id"] == "0" - assert span["env"] == "system-tests" - assert span["service"] == "otel-system-tests-spring-boot" - assert span["meta"]["env"] == "system-tests" - assert span["meta"]["deployment.environment"] == "system-tests" - assert span["meta"]["_dd.ingestion_reason"] == "otel" - assert span["ingestion_reason"] == "otel" - assert span["meta"]["otel.status_code"] == "Unset" - assert span["meta"]["otel.user_agent"] == "OTel-OTLP-Exporter-Java/1.23.1" - assert span["meta"]["otel.library.name"] == "com.datadoghq.springbootnative" + expected_tags = { + "parent_id": "0", + "env": "system-tests", + "service": "otel-system-tests-spring-boot", + "ingestion_reason": "otel", + } + expected_meta = { + "env": "system-tests", + "deployment.environment": "system-tests", + "_dd.ingestion_reason": "otel", + "otel.status_code": "Unset", + "otel.user_agent": "OTel-OTLP-Exporter-Java/1.23.1", + "otel.library.name": "com.datadoghq.springbootnative", + } + assert expected_tags.items() <= span.items() + assert expected_meta.items() <= span["meta"].items() validate_trace_id(span, use_128_bits_trace_id) @@ -47,23 +54,24 @@ def validate_trace_id(span: dict, use_128_bits_trace_id: bool): def validate_server_span(span: dict): - assert span["name"] == "WebController.home" - assert span["resource"] == "GET /" - assert span["meta"]["http.route"] == "/" - assert span["meta"]["http.method"] == "GET" + expected_tags = {"name": "WebController.home", "resource": "GET /"} + expected_meta = {"http.route": "/", "http.method": "GET"} + assert expected_tags.items() <= span.items() + assert expected_meta.items() <= span["meta"].items() def validate_message_span(span: dict): - assert span["name"] == "WebController.home.publish" - assert span["resource"] == "publish" - assert span["meta"]["messaging.operation"] == "publish" - assert span["meta"]["messaging.system"] == "rabbitmq" + expected_tags = {"name": "WebController.home.publish", "resource": "publish"} + expected_meta = {"messaging.operation": "publish", "messaging.system": "rabbitmq"} + assert expected_tags.items() <= span.items() + assert expected_meta.items() <= span["meta"].items() def validate_span_link(server_span: dict, message_span: dict): - span_links = server_span["meta"]["_dd.span_links"] + span_links = json.loads(server_span["meta"]["_dd.span_links"]) assert len(span_links) == 1 span_link = span_links[0] assert span_link["trace_id"] == message_span["meta"]["otel.trace_id"] - assert span_link["span_id"] == message_span["span_id"] + span_id_hex = f'{int(message_span["span_id"]):x}' # span_id is an int in span but a hex in span_links + assert span_link["span_id"] == span_id_hex assert span_link["attributes"] == {"messaging.operation": "publish"} diff --git a/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/App.java b/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/App.java index 27b059fbea..92dac289dd 100644 --- a/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/App.java +++ b/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/App.java @@ -2,12 +2,8 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; -import io.opentelemetry.context.propagation.ContextPropagators; -import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.exporter.logging.otlp.OtlpJsonLoggingSpanExporter; import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter; -import io.opentelemetry.extension.trace.propagation.B3Propagator; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.SdkTracerProvider; @@ -16,7 +12,6 @@ import io.opentelemetry.sdk.trace.export.SpanExporter; import io.opentelemetry.sdk.trace.samplers.Sampler; import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; -import java.util.logging.Logger; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; @@ -24,15 +19,13 @@ @SpringBootApplication @ComponentScan(basePackages = {"com.datadoghq.springbootnative"}) public class App { - private static final Logger logger = Logger.getLogger(App.class.getName()); - public static void main(String[] args) { Resource resource = Resource.create(Attributes.of( ResourceAttributes.SERVICE_NAME, "otel-system-tests-spring-boot", ResourceAttributes.DEPLOYMENT_ENVIRONMENT, "system-tests")); OtlpHttpSpanExporter intakeExporter = OtlpHttpSpanExporter.builder() - .setEndpoint("http://runner:8126/api/v0.2/traces") + .setEndpoint("http://runner:8126/api/v0.2/traces") // send to the proxy first .addHeader("dd-protocol", "otlp") .addHeader("dd-api-key", System.getenv("DD_API_KEY")) .build(); @@ -51,20 +44,10 @@ public static void main(String[] args) { .setResource(resource) .build(); - String propagator = System.getenv().getOrDefault("OTEL_PROPAGATORS", "tracecontext"); - TextMapPropagator textPropagator = switch (propagator) { - case "tracecontext" -> W3CTraceContextPropagator.getInstance(); - case "b3" -> B3Propagator.injectingSingleHeader(); - case "b3multi" -> B3Propagator.injectingMultiHeaders(); - default -> TextMapPropagator.noop(); - }; OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() .setTracerProvider(sdkTracerProvider) - .setPropagators(ContextPropagators.create(textPropagator)) .buildAndRegisterGlobal(); - logger.info("Using OpenTelemetry Propagator: " + openTelemetry.getPropagators().getTextMapPropagator()); - SpringApplication.run(App.class, args); } } diff --git a/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/WebController.java b/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/WebController.java index e5753b031d..c73c4e898b 100644 --- a/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/WebController.java +++ b/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/WebController.java @@ -4,66 +4,28 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import io.opentelemetry.context.propagation.TextMapGetter; -import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import javax.servlet.http.HttpServletResponse; import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class WebController { private final Tracer tracer = GlobalOpenTelemetry.getTracer("com.datadoghq.springbootnative"); - private final TextMapPropagator propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); - - private static final TextMapGetter getter = - new TextMapGetter<>() { - @Override - public String get(HttpHeaders headers, String key) { - List vals = headers.get(key); - return vals == null ? "" : vals.get(0); - } - - @Override - public Iterable keys(HttpHeaders headers) { - return headers.keySet(); - } - }; @RequestMapping("/") - String home(@RequestHeader HttpHeaders headers) throws InterruptedException { - try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { - // Create a fake producer span to test span links - Span fakeSpan = tracer.spanBuilder("WebController.home.publish") - .setSpanKind(SpanKind.PRODUCER) - .setAttribute("messaging.system", "rabbitmq") - .setAttribute("messaging.operation", "publish") - .setAttribute("http.request.headers.user-agent", headers.get("User-Agent").get(0)) - .startSpan(); - Thread.sleep(1); - fakeSpan.end(); - + private String home(@RequestHeader HttpHeaders headers) throws InterruptedException { + try (Scope scope = Context.current().makeCurrent()) { + SpanContext spanContext = fakeAsyncWork(headers); Span span = tracer.spanBuilder("WebController.home") .setSpanKind(SpanKind.SERVER) - .addLink( - fakeSpan.getSpanContext(), - Attributes.of(AttributeKey.stringKey("messaging.operation"), "publish")) + .addLink(spanContext, Attributes.of(AttributeKey.stringKey("messaging.operation"), "publish")) .setAttribute(SemanticAttributes.HTTP_ROUTE, "/") .setAttribute(SemanticAttributes.HTTP_METHOD, "GET") .setAttribute("http.request.headers.user-agent", headers.get("User-Agent").get(0)) @@ -77,213 +39,16 @@ String home(@RequestHeader HttpHeaders headers) throws InterruptedException { } } - @GetMapping("/headers") - String headers(HttpServletResponse response, @RequestHeader HttpHeaders headers) { - try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { - Span span = tracer.spanBuilder("WebController.headers") - .setSpanKind(SpanKind.SERVER) - .setAttribute(SemanticAttributes.HTTP_ROUTE, "/headers") - .startSpan(); - try (Scope ignored = span.makeCurrent()) { - response.setHeader("content-language", "en-US"); - return "012345678901234567890123456789012345678901"; - } - finally { - span.end(); - } - } - } - - @RequestMapping("/status") - ResponseEntity status(@RequestParam Integer code, @RequestHeader HttpHeaders headers) { - try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { - Span span = tracer.spanBuilder("WebController.status") - .setSpanKind(SpanKind.SERVER) - .setAttribute(SemanticAttributes.HTTP_ROUTE, "/status") - .setAttribute(SemanticAttributes.HTTP_STATUS_CODE, code == null ? 0 : code.longValue()) - .startSpan(); - try (Scope ignored = span.makeCurrent()) { - return new ResponseEntity<>(HttpStatus.valueOf(code)); - } - finally { - span.end(); - } - } - } - - @RequestMapping("/hello") - public String hello(@RequestHeader HttpHeaders headers) { - try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { - Span span = tracer.spanBuilder("WebController.hello") - .setSpanKind(SpanKind.SERVER) - .setAttribute(SemanticAttributes.HTTP_ROUTE, "/hello") - .startSpan(); - try (Scope ignored = span.makeCurrent()) { - return "Hello world"; - } - finally { - span.end(); - } - } - } - - @RequestMapping("/sample_rate_route/{i}") - String sample_route(@PathVariable("i") String i, @RequestHeader HttpHeaders headers) { - try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { - Span span = tracer.spanBuilder("WebController.sample_route") - .setSpanKind(SpanKind.SERVER) - .setAttribute(SemanticAttributes.HTTP_ROUTE, "/sample_rate_route/{i}") - .setAttribute("i", i) - .startSpan(); - try (Scope ignored = span.makeCurrent()) { - return "OK"; - } - finally { - span.end(); - } - } - } - - @RequestMapping("/params/{str}") - String params_route(@PathVariable("str") String str, @RequestHeader HttpHeaders headers) { - try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { - Span span = tracer.spanBuilder("WebController.params_route") - .setSpanKind(SpanKind.SERVER) - .setAttribute(SemanticAttributes.HTTP_ROUTE, "/params/{str}") - .setAttribute("str", str) - .startSpan(); - try (Scope ignored = span.makeCurrent()) { - return "OK"; - } - finally { - span.end(); - } - } - } - - @GetMapping("/user_login_success_event") - public String userLoginSuccess( - @RequestParam(value = "event_user_id", defaultValue = "system_tests_user") String userId, - @RequestHeader HttpHeaders headers) { - try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { - Span span = tracer.spanBuilder("WebController.userLoginSuccess") - .setSpanKind(SpanKind.SERVER) - .setAttribute(SemanticAttributes.HTTP_ROUTE, "/user_login_success_event") - .setAttribute("event_user_id", userId) - .startSpan(); - try (Scope ignored = span.makeCurrent()) { - return "OK"; - } - finally { - span.end(); - } - } - } - - @GetMapping("/user_login_failure_event") - public String userLoginFailure( - @RequestParam(value = "event_user_id", defaultValue = "system_tests_user") String userId, - @RequestParam(value = "event_user_exists", defaultValue = "true") boolean eventUserExists, - @RequestHeader HttpHeaders headers) { - try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { - Span span = tracer.spanBuilder("WebController.userLoginFailure") - .setSpanKind(SpanKind.SERVER) - .setAttribute(SemanticAttributes.HTTP_ROUTE, "/user_login_failure_event") - .setAttribute("event_user_id", userId) - .setAttribute("event_user_exists", eventUserExists) - .startSpan(); - try (Scope ignored = span.makeCurrent()) { - return "OK"; - } - finally { - span.end(); - } - } - } - - @GetMapping("/custom_event") - public String customEvent( - @RequestParam(value = "event_name", defaultValue = "system_tests_event") String eventName, - @RequestHeader HttpHeaders headers) { - try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { - Span span = tracer.spanBuilder("WebController.customEvent") - .setSpanKind(SpanKind.SERVER) - .setAttribute(SemanticAttributes.HTTP_ROUTE, "/custom_event") - .startSpan(); - try (Scope ignored = span.makeCurrent()) { - span.addEvent("custom_event", Attributes.of(AttributeKey.stringKey("event_name"), eventName)); - return "OK"; - } - finally { - span.end(); - } - } - } - - @RequestMapping("/make_distant_call") - DistantCallResponse make_distant_call( - @RequestParam String url, @RequestHeader HttpHeaders headers) throws Exception { - try (Scope scope = propagator.extract(Context.current(), headers, getter).makeCurrent()) { - Span span = tracer.spanBuilder("WebController.make_distant_call") - .setSpanKind(SpanKind.SERVER) - .setAttribute(SemanticAttributes.HTTP_ROUTE, "/make_distant_call") - .setAttribute(SemanticAttributes.HTTP_URL, url) - .startSpan(); - try (Scope ignored = span.makeCurrent()) { - URL urlObject = new URL(url); - - HttpURLConnection con = (HttpURLConnection) urlObject.openConnection(); - con.setRequestMethod("GET"); - - Span child = tracer.spanBuilder("WebController.make_distant_call.get") - .setSpanKind(SpanKind.CLIENT) - .setAttribute(SemanticAttributes.HTTP_URL, url) - .setAttribute(SemanticAttributes.HTTP_METHOD, "GET") - .startSpan(); - - // Save request headers - HashMap request_headers = new HashMap(); - for (Map.Entry> header: con.getRequestProperties().entrySet()) { - if (header.getKey() == null) { - continue; - } - - request_headers.put(header.getKey(), header.getValue().get(0)); - child.setAttribute(header.getKey(), header.getValue().get(0)); - } - - // Save response headers and status code - int status_code = con.getResponseCode(); - child.setAttribute(SemanticAttributes.HTTP_STATUS_CODE, status_code); - child.end(); - - HashMap response_headers = new HashMap(); - for (Map.Entry> header: con.getHeaderFields().entrySet()) { - if (header.getKey() == null) { - continue; - } - - response_headers.put(header.getKey(), header.getValue().get(0)); - } - - DistantCallResponse result = new DistantCallResponse(); - result.url = url; - result.status_code = status_code; - result.request_headers = request_headers; - result.response_headers = response_headers; - - return result; - } - finally { - span.end(); - } - } - } - - public static final class DistantCallResponse { - public String url; - public int status_code; - public HashMap request_headers; - public HashMap response_headers; + // Create a fake producer span and return its span context to test span links + private SpanContext fakeAsyncWork(HttpHeaders headers) throws InterruptedException { + Span fakeSpan = tracer.spanBuilder("WebController.home.publish") + .setSpanKind(SpanKind.PRODUCER) + .setAttribute("messaging.system", "rabbitmq") + .setAttribute("messaging.operation", "publish") + .setAttribute("http.request.headers.user-agent", headers.get("User-Agent").get(0)) + .startSpan(); + Thread.sleep(1); + fakeSpan.end(); + return fakeSpan.getSpanContext(); } } From 8ee433344aeaa2550576fa9c4fc66fdffb5ea3a1 Mon Sep 17 00:00:00 2001 From: Yang Song Date: Mon, 17 Apr 2023 17:05:21 -0400 Subject: [PATCH 05/15] Remove unused deps --- .../build/docker/java_otel/spring-boot-native/pom.xml | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/utils/build/docker/java_otel/spring-boot-native/pom.xml b/utils/build/docker/java_otel/spring-boot-native/pom.xml index 6a5a733f69..d7de3d9371 100644 --- a/utils/build/docker/java_otel/spring-boot-native/pom.xml +++ b/utils/build/docker/java_otel/spring-boot-native/pom.xml @@ -64,17 +64,6 @@ opentelemetry-exporter-logging-otlp - - io.opentelemetry - opentelemetry-exporter-logging-otlp - - - - io.opentelemetry - opentelemetry-extension-trace-propagators - 1.24.0 - - io.opentelemetry opentelemetry-semconv From 80d089f00ea9196cc5c03f1182e26fc5c714d48d Mon Sep 17 00:00:00 2001 From: Charles de Beauchesne Date: Thu, 20 Apr 2023 17:47:28 +0200 Subject: [PATCH 06/15] introduce open telemetry interface --- requirements.txt | 3 -- utils/_context/_scenarios.py | 73 ++++++++++++++++++++++++++++- utils/interfaces/__init__.py | 3 +- utils/interfaces/_open_telemetry.py | 47 +++++++++++++++++++ utils/proxy/core.py | 4 +- 5 files changed, 122 insertions(+), 8 deletions(-) create mode 100644 utils/interfaces/_open_telemetry.py diff --git a/requirements.txt b/requirements.txt index de974bc3bb..11c01df34e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -25,9 +25,6 @@ rfc3339-validator==0.1.4 matplotlib docker==6.0.0 -<<<<<<< HEAD opentelemetry-proto==1.17.0 -======= paramiko==3.1.0 ->>>>>>> main diff --git a/utils/_context/_scenarios.py b/utils/_context/_scenarios.py index 0283960b21..b158b1f25a 100644 --- a/utils/_context/_scenarios.py +++ b/utils/_context/_scenarios.py @@ -413,6 +413,76 @@ def get_junit_properties(self): return result +class OpenTelemetryScenario(_DockerScenario): + """ Scenario for testing open""" + + def __init__(self, name, weblog_env) -> None: + super().__init__(name, use_proxy=True) + + self.weblog_container = WeblogContainer(self.host_log_folder, environment=weblog_env) + self._required_containers.append(self.weblog_container) + + def _create_interface_folders(self): + for interface in ("open_telemetry", "backend"): + self.create_log_subfolder(f"interfaces/{interface}") + + def _start_interface_watchdog(self): + from utils import interfaces + + class Event(FileSystemEventHandler): + def __init__(self, interface) -> None: + super().__init__() + self.interface = interface + + def on_modified(self, event): + if event.is_directory: + return + + self.interface.ingest_file(event.src_path) + + observer = Observer() + observer.schedule( + Event(interfaces.library), path=f"{self.host_log_folder}/interfaces/open_telemetry", recursive=True + ) + + observer.start() + + def _get_warmups(self): + warmups = super()._get_warmups() + + warmups.insert(0, self._create_interface_folders) + warmups.insert(1, self._start_interface_watchdog) + warmups.append(self._wait_for_app_readiness) + + return warmups + + def _wait_for_app_readiness(self): + from utils import interfaces # import here to avoid circular import + + if self.use_proxy: + logger.debug("Wait for app readiness") + + if not interfaces.open_telemetry.ready.wait(40): + raise Exception("Open telemetry interface not ready") + logger.debug("Open telemetry ready") + + def post_setup(self, session): + from utils import interfaces + + if self.use_proxy: + self._wait_interface(interfaces.library, session, self.library_interface_timeout) + self._wait_interface(interfaces.agent, session, self.agent_interface_timeout) + self._wait_interface(interfaces.backend, session, self.backend_interface_timeout) + + self.collect_logs() + + self._wait_interface(interfaces.library_stdout, session, 0) + self._wait_interface(interfaces.library_dotnet_managed, session, 0) + self._wait_interface(interfaces.agent_stdout, session, 0) + else: + self.collect_logs() + + class CgroupScenario(EndToEndScenario): # cgroup test @@ -667,8 +737,7 @@ class scenarios: backend_interface_timeout=5, ) - # OpenTelemetry tracing end-to-end scenarios - otel_tracing_e2e = EndToEndScenario( + otel_tracing_e2e = OpenTelemetryScenario( "OTEL_TRACING_E2E", weblog_env={"DD_API_KEY": os.environ.get("DD_API_KEY"), "DD_SITE": os.environ.get("DD_SITE"),}, ) diff --git a/utils/interfaces/__init__.py b/utils/interfaces/__init__.py index d1a24654f5..92cd43e808 100644 --- a/utils/interfaces/__init__.py +++ b/utils/interfaces/__init__.py @@ -6,7 +6,7 @@ from ._backend import _BackendInterfaceValidator from ._library.core import LibraryInterfaceValidator from ._logs import _LibraryStdout, _LibraryDotnetManaged, _AgentStdout - +from ._open_telemetry import OpenTelemetryInterfaceValidator # singletons agent = AgentInterfaceValidator() @@ -15,5 +15,6 @@ agent_stdout = _AgentStdout() library_dotnet_managed = _LibraryDotnetManaged() backend = _BackendInterfaceValidator() +open_telemetry = OpenTelemetryInterfaceValidator() all_interfaces = (agent, library, library_stdout, library_dotnet_managed, agent_stdout, backend) diff --git a/utils/interfaces/_open_telemetry.py b/utils/interfaces/_open_telemetry.py new file mode 100644 index 0000000000..4fac92de7d --- /dev/null +++ b/utils/interfaces/_open_telemetry.py @@ -0,0 +1,47 @@ +# Unless explicitly stated otherwise all files in this repository are licensed under the the Apache License Version 2.0. +# This product includes software developed at Datadog (https://www.datadoghq.com/). +# Copyright 2021 Datadog, Inc. + +""" +This files will validate data flow between agent and backend +""" + +import threading + +from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ExportTraceServiceRequest + +from utils.tools import logger +from utils.interfaces._core import InterfaceValidator, get_rid_from_request + + +class OpenTelemetryInterfaceValidator(InterfaceValidator): + """ Validated communication between open telemetry and datadog backend""" + + def __init__(self): + super().__init__("agent") + self.ready = threading.Event() + + def ingest_file(self, src_path): + self.ready.set() + return super().ingest_file(src_path) + + def get_otel_trace_id(self, request): + paths = ["/api/v0.2/traces"] + rid = get_rid_from_request(request) + + if rid: + logger.debug(f"Try to find traces related to request {rid}") + + for data in self.get_data(path_filters=paths): + export_request = ExportTraceServiceRequest() + content = eval(data["request"]["content"]) # Raw content is a str like "b'\n\x\...'" + assert export_request.ParseFromString(content) > 0, content + for resource_span in export_request.resource_spans: + for scope_span in resource_span.scope_spans: + for span in scope_span.spans: + for attribute in span.attributes: + if ( + attribute.key == "http.request.headers.user-agent" + and rid in attribute.value.string_value + ): + yield span.trace_id diff --git a/utils/proxy/core.py b/utils/proxy/core.py index b953bd223a..e8665bb034 100644 --- a/utils/proxy/core.py +++ b/utils/proxy/core.py @@ -110,7 +110,7 @@ def request(self, flow): if flow.request.host in ("proxy", "localhost"): # tracer is the only container that uses the proxy directly - if "api/v0.2/traces" in flow.request.path: # open telemetry datas + if flow.request.headers.get("dd-protocol") == "otlp": # open telemetry datas flow.request.host = "trace.agent." + os.environ.get("DD_SITE") flow.request.port = 443 flow.request.scheme = "https" @@ -159,7 +159,7 @@ def response(self, flow): data["response"] = None if flow.request.headers.get("dd-protocol") == "otlp": - interface = "otlp" + interface = "open_telemetry" if self.request_is_from_tracer(flow.request): interface = "library" elif f"https://{flow.request.host}" == self.dd_site_url: From 62ae0b356d60aa8fcdb25b9cb3f9731f423fab5f Mon Sep 17 00:00:00 2001 From: Charles de Beauchesne Date: Thu, 20 Apr 2023 19:02:01 +0200 Subject: [PATCH 07/15] Minor fixes --- tests/otel_tracing_e2e/test_e2e.py | 6 ++---- utils/_context/_scenarios.py | 29 +++++++++++++++++++++++++---- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/tests/otel_tracing_e2e/test_e2e.py b/tests/otel_tracing_e2e/test_e2e.py index ccc01e7f6f..3c4dfc0a0b 100644 --- a/tests/otel_tracing_e2e/test_e2e.py +++ b/tests/otel_tracing_e2e/test_e2e.py @@ -1,11 +1,9 @@ from _validator import validate_trace -from utils import context, weblog, interfaces, scenarios, missing_feature +from utils import context, weblog, interfaces, scenarios, irrelevant @scenarios.otel_tracing_e2e -@missing_feature( - context.library != "java_otel", reason="OTel tests only support OTel instrumented applications at the moment.", -) +@irrelevant(context.library != "open_telemetry") class Test_OTel_E2E: def setup_main(self): self.use_128_bits_trace_id = False diff --git a/utils/_context/_scenarios.py b/utils/_context/_scenarios.py index b158b1f25a..45a3369c2d 100644 --- a/utils/_context/_scenarios.py +++ b/utils/_context/_scenarios.py @@ -470,18 +470,39 @@ def post_setup(self, session): from utils import interfaces if self.use_proxy: - self._wait_interface(interfaces.library, session, self.library_interface_timeout) - self._wait_interface(interfaces.agent, session, self.agent_interface_timeout) - self._wait_interface(interfaces.backend, session, self.backend_interface_timeout) + self._wait_interface(interfaces.open_telemetry, session, 5) self.collect_logs() self._wait_interface(interfaces.library_stdout, session, 0) self._wait_interface(interfaces.library_dotnet_managed, session, 0) - self._wait_interface(interfaces.agent_stdout, session, 0) else: self.collect_logs() + @staticmethod + def _wait_interface(interface, session, timeout): + terminal = session.config.pluginmanager.get_plugin("terminalreporter") + terminal.write_sep("-", f"Wait for {interface} ({timeout}s)") + terminal.flush() + + interface.wait(timeout) + + @property + def library(self): + return LibraryVersion("open_telemetry", "0.0.0") + + @property + def agent(self): + return LibraryVersion("agent", "0.0.0") + + @property + def agent_version(self): + return self.agent.version + + @property + def weblog_variant(self): + return self.weblog_container.weblog_variant + class CgroupScenario(EndToEndScenario): From 3aec32d0da5140e5be2e82a2ab5b8c5ffe533640 Mon Sep 17 00:00:00 2001 From: Yang Song Date: Thu, 20 Apr 2023 17:47:39 -0400 Subject: [PATCH 08/15] Use correct proxy in exporter --- .vscode/settings.json | 3 ++- utils/_context/_scenarios.py | 2 +- .../src/main/java/com/datadoghq/springbootnative/App.java | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 4a57801274..c6566172bf 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -4,5 +4,6 @@ "python.testing.unittestEnabled": false, "python.linting.enabled": true, "python.linting.pylintEnabled": true, - "python.formatting.provider": "black" + "python.formatting.provider": "black", + "java.compile.nullAnalysis.mode": "disabled" } diff --git a/utils/_context/_scenarios.py b/utils/_context/_scenarios.py index 45a3369c2d..2b39440374 100644 --- a/utils/_context/_scenarios.py +++ b/utils/_context/_scenarios.py @@ -414,7 +414,7 @@ def get_junit_properties(self): class OpenTelemetryScenario(_DockerScenario): - """ Scenario for testing open""" + """ Scenario for testing opentelemetry""" def __init__(self, name, weblog_env) -> None: super().__init__(name, use_proxy=True) diff --git a/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/App.java b/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/App.java index 92dac289dd..067b8d9df3 100644 --- a/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/App.java +++ b/utils/build/docker/java_otel/spring-boot-native/src/main/java/com/datadoghq/springbootnative/App.java @@ -25,7 +25,7 @@ public static void main(String[] args) { ResourceAttributes.DEPLOYMENT_ENVIRONMENT, "system-tests")); OtlpHttpSpanExporter intakeExporter = OtlpHttpSpanExporter.builder() - .setEndpoint("http://runner:8126/api/v0.2/traces") // send to the proxy first + .setEndpoint("http://proxy:8126/api/v0.2/traces") // send to the proxy first .addHeader("dd-protocol", "otlp") .addHeader("dd-api-key", System.getenv("DD_API_KEY")) .build(); From 5810006c59e5676ac73119c675d4001be2bdb4da Mon Sep 17 00:00:00 2001 From: Yang Song Date: Fri, 21 Apr 2023 13:09:49 -0400 Subject: [PATCH 09/15] More fixes on otel e2e test --- tests/otel_tracing_e2e/test_e2e.py | 12 ++++++------ utils/_context/_scenarios.py | 2 +- utils/interfaces/_backend.py | 2 +- utils/proxy/core.py | 7 ++++--- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/tests/otel_tracing_e2e/test_e2e.py b/tests/otel_tracing_e2e/test_e2e.py index 3c4dfc0a0b..94cf490d16 100644 --- a/tests/otel_tracing_e2e/test_e2e.py +++ b/tests/otel_tracing_e2e/test_e2e.py @@ -10,13 +10,13 @@ def setup_main(self): self.r = weblog.get(path="/") def test_main(self): - otel_trace_ids = list(interfaces.library.get_otel_trace_id(request=self.r)) + otel_trace_ids = set(interfaces.open_telemetry.get_otel_trace_id(request=self.r)) assert len(otel_trace_ids) == 2 - dd_trace_ids = map(self._get_dd_trace_id, otel_trace_ids) - traces = map( - lambda dd_trace_id: interfaces.backend.assert_otlp_trace_exist(request=self.r, dd_trace_id=dd_trace_id), - dd_trace_ids, - ) + dd_trace_ids = [self._get_dd_trace_id(otel_trace_id) for otel_trace_id in otel_trace_ids] + traces = [ + interfaces.backend.assert_otlp_trace_exist(request=self.r, dd_trace_id=dd_trace_id) + for dd_trace_id in dd_trace_ids + ] validate_trace(traces, self.use_128_bits_trace_id) def _get_dd_trace_id(self, otel_trace_id=bytes) -> int: diff --git a/utils/_context/_scenarios.py b/utils/_context/_scenarios.py index 2b39440374..4d2fea52a5 100644 --- a/utils/_context/_scenarios.py +++ b/utils/_context/_scenarios.py @@ -442,7 +442,7 @@ def on_modified(self, event): observer = Observer() observer.schedule( - Event(interfaces.library), path=f"{self.host_log_folder}/interfaces/open_telemetry", recursive=True + Event(interfaces.open_telemetry), path=f"{self.host_log_folder}/interfaces/open_telemetry", recursive=True ) observer.start() diff --git a/utils/interfaces/_backend.py b/utils/interfaces/_backend.py index 07e1846ece..e19e86a179 100644 --- a/utils/interfaces/_backend.py +++ b/utils/interfaces/_backend.py @@ -75,7 +75,7 @@ def assert_otlp_trace_exist(self, request: requests.Request, dd_trace_id: str) - """ rid = get_rid_from_request(request) - data = self._wait_for_trace(rid=rid, trace_id=dd_trace_id, retries=5, sleep_interval_multiplier=2.0) + data = self._wait_for_trace(rid=rid, trace_id=dd_trace_id, retries=10, sleep_interval_multiplier=2.0) return json.loads(data["response"]["content"])["trace"] def assert_single_spans_exist(self, request, min_spans_len=1, limit=100): diff --git a/utils/proxy/core.py b/utils/proxy/core.py index e8665bb034..d47393f38f 100644 --- a/utils/proxy/core.py +++ b/utils/proxy/core.py @@ -111,13 +111,14 @@ def request(self, flow): # tracer is the only container that uses the proxy directly if flow.request.headers.get("dd-protocol") == "otlp": # open telemetry datas - flow.request.host = "trace.agent." + os.environ.get("DD_SITE") + flow.request.host = "trace.agent." + os.environ.get("DD_SITE", "datad0g.com") flow.request.port = 443 flow.request.scheme = "https" else: flow.request.host, flow.request.port = "agent", 8127 flow.request.scheme = "http" - logger.info(f" => reverse proxy to {flow.request.pretty_url}") + + logger.info(f" => reverse proxy to {flow.request.pretty_url}") @staticmethod def request_is_from_tracer(request): @@ -160,7 +161,7 @@ def response(self, flow): if flow.request.headers.get("dd-protocol") == "otlp": interface = "open_telemetry" - if self.request_is_from_tracer(flow.request): + elif self.request_is_from_tracer(flow.request): interface = "library" elif f"https://{flow.request.host}" == self.dd_site_url: interface = "backend" From e2985803583b23191636f4e4c9f2b46dd87701ea Mon Sep 17 00:00:00 2001 From: Yang Song Date: Fri, 21 Apr 2023 13:18:53 -0400 Subject: [PATCH 10/15] Fix a merge conflict --- utils/interfaces/_backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/interfaces/_backend.py b/utils/interfaces/_backend.py index 680eb40bae..2b61b6f0b3 100644 --- a/utils/interfaces/_backend.py +++ b/utils/interfaces/_backend.py @@ -106,7 +106,7 @@ def assert_otlp_trace_exist(self, request: requests.Request, dd_trace_id: str) - rid = get_rid_from_request(request) data = self._wait_for_trace(rid=rid, trace_id=dd_trace_id, retries=10, sleep_interval_multiplier=2.0) - return json.loads(data["response"]["content"])["trace"] + return data["response"]["content"]["trace"] def assert_single_spans_exist(self, request, min_spans_len=1, limit=100): """Attempts to fetch single span events using the given `query_filter` as part of the search query. From 90e665ae162b4b8ee03cb84c936dcb7844ad7c24 Mon Sep 17 00:00:00 2001 From: Charles de Beauchesne Date: Mon, 24 Apr 2023 10:45:32 +0200 Subject: [PATCH 11/15] Add otel in CI --- .github/workflows/ci.yml | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ba9b702ba7..2a352f6ee7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -613,6 +613,40 @@ jobs: - name: Print fancy log report if: ${{ always() }} run: python utils/scripts/markdown_logs.py >> $GITHUB_STEP_SUMMARY + + test-the-tests-open-telemetry: + runs-on: ubuntu-latest + if: github.event.action != 'opened' && !contains(github.event.pull_request.labels.*.name, 'run-default-scenario') + needs: + - lint_and_test + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Setup python 3.9 + uses: actions/setup-python@v4 + with: + python-version: '3.9' + - name: Pull mitmproxy image + run: docker pull mitmproxy/mitmproxy + - name: Build + id: build + run: SYSTEM_TEST_BUILD_ATTEMPTS=3 ./build.sh + - name: Run OTEL_TRACING_E2E scenario + if: steps.build.outcome == 'success' + run: ./run.sh OTEL_TRACING_E2E + env: + DD_API_KEY: ${{ secrets.DD_API_KEY }} + DD_APPLICATION_KEY: ${{ secrets.DD_APPLICATION_KEY }} + - name: Compress logs + if: steps.build.outcome == 'success' + run: tar -czvf artifact.tar.gz $(ls | grep logs) + - name: Upload artifact + if: steps.build.outcome == 'success' + uses: actions/upload-artifact@v3 + with: + name: logs_java-otel_spring-boot-native_prod + path: artifact.tar.gz + post_test-the-tests: runs-on: ubuntu-latest needs: From ec918cc78fb7ccb99f26794eef55bfc5e10c137d Mon Sep 17 00:00:00 2001 From: Yang Song Date: Mon, 24 Apr 2023 11:50:32 -0400 Subject: [PATCH 12/15] Fix build failure in run.sh --- utils/_context/_scenarios.py | 1 + 1 file changed, 1 insertion(+) diff --git a/utils/_context/_scenarios.py b/utils/_context/_scenarios.py index 7797869188..de308009d0 100644 --- a/utils/_context/_scenarios.py +++ b/utils/_context/_scenarios.py @@ -419,6 +419,7 @@ class OpenTelemetryScenario(_DockerScenario): """ Scenario for testing opentelemetry""" def __init__(self, name, weblog_env) -> None: + self._required_containers = [] super().__init__(name, use_proxy=True) self.weblog_container = WeblogContainer(self.host_log_folder, environment=weblog_env) From c143cd43ae0df92df5acee23ac07ec519a29bb71 Mon Sep 17 00:00:00 2001 From: Yang Song Date: Mon, 24 Apr 2023 16:56:20 -0400 Subject: [PATCH 13/15] Fix CI failure --- utils/_context/_scenarios.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/utils/_context/_scenarios.py b/utils/_context/_scenarios.py index de308009d0..93541f0e75 100644 --- a/utils/_context/_scenarios.py +++ b/utils/_context/_scenarios.py @@ -421,6 +421,8 @@ class OpenTelemetryScenario(_DockerScenario): def __init__(self, name, weblog_env) -> None: self._required_containers = [] super().__init__(name, use_proxy=True) + if not self.is_current_scenario: + return self.weblog_container = WeblogContainer(self.host_log_folder, environment=weblog_env) self._required_containers.append(self.weblog_container) From d42fdbd8af295649cf294224f8f2f0247704248c Mon Sep 17 00:00:00 2001 From: Yang Song Date: Mon, 24 Apr 2023 17:59:16 -0400 Subject: [PATCH 14/15] Fix test-the-tests-open-telemetry CI config --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2a352f6ee7..08281a7964 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -630,7 +630,7 @@ jobs: run: docker pull mitmproxy/mitmproxy - name: Build id: build - run: SYSTEM_TEST_BUILD_ATTEMPTS=3 ./build.sh + run: SYSTEM_TEST_BUILD_ATTEMPTS=3 ./build.sh java_otel - name: Run OTEL_TRACING_E2E scenario if: steps.build.outcome == 'success' run: ./run.sh OTEL_TRACING_E2E From 04f15473292cd0cf674243e187378c977dd075c9 Mon Sep 17 00:00:00 2001 From: Yang Song Date: Tue, 25 Apr 2023 11:04:36 -0400 Subject: [PATCH 15/15] Remove unused func --- utils/interfaces/_library/core.py | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/utils/interfaces/_library/core.py b/utils/interfaces/_library/core.py index f51f2aee85..586aa55262 100644 --- a/utils/interfaces/_library/core.py +++ b/utils/interfaces/_library/core.py @@ -61,27 +61,6 @@ def get_traces(self, request=None): yield data, trace break - def get_otel_trace_id(self, request): - paths = ["/api/v0.2/traces"] - rid = get_rid_from_request(request) - - if rid: - logger.debug(f"Try to find traces related to request {rid}") - - for data in self.get_data(path_filters=paths): - export_request = ExportTraceServiceRequest() - content = eval(data["request"]["content"]) # Raw content is a str like "b'\n\x\...'" - assert export_request.ParseFromString(content) > 0, content - for resource_span in export_request.resource_spans: - for scope_span in resource_span.scope_spans: - for span in scope_span.spans: - for attribute in span.attributes: - if ( - attribute.key == "http.request.headers.user-agent" - and rid in attribute.value.string_value - ): - yield span.trace_id - def get_spans(self, request=None): """ Iterate over all spans reported by the tracer to the agent.