Skip to content

Commit

Permalink
Prometheus compatibility (#5039)
Browse files Browse the repository at this point in the history
* PrometheusHttpServer serializes resource attributes in target_info

* PrometheusHttpServer serializes scope as otel_scope_info
  • Loading branch information
jack-berg authored Dec 22, 2022
1 parent 4cf3f65 commit 16a02b2
Show file tree
Hide file tree
Showing 6 changed files with 561 additions and 100 deletions.
6 changes: 6 additions & 0 deletions exporters/prometheus/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@ testing {
suites {
val integrationTest by registering(JvmTestSuite::class) {
dependencies {
implementation(project(":semconv"))

implementation("io.opentelemetry.proto:opentelemetry-proto")

implementation("com.fasterxml.jackson.jr:jackson-jr-stree")
implementation("com.google.guava:guava")
implementation("com.linecorp.armeria:armeria")
implementation("com.linecorp.armeria:armeria-junit5")
implementation("com.linecorp.armeria:armeria-grpc-protocol")
implementation("io.prometheus:simpleclient_httpserver")
implementation("org.testcontainers:junit-jupiter")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.prometheus;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.testcontainers.Testcontainers.exposeHostPorts;

import com.google.protobuf.InvalidProtocolBufferException;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.grpc.protocol.AbstractUnaryGrpcService;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
import io.opentelemetry.proto.metrics.v1.Sum;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

/**
* Verifies {@link PrometheusHttpServer} end to end using the OpenTelemetry Collector. The Collector
* is configured to scrape the {@link PrometheusHttpServer} and export data over gRPC to a server
* running in process, allowing assertions to be made against the data.
*/
@Testcontainers(disabledWithoutDocker = true)
class PrometheusHttpServerIntegrationTest {

private static final String COLLECTOR_IMAGE =
"ghcr.io/open-telemetry/opentelemetry-java/otel-collector";
private static final Integer COLLECTOR_HEALTH_CHECK_PORT = 13133;

private static int prometheusPort;
private static Resource resource;
private static SdkMeterProvider meterProvider;
private static OtlpGrpcServer grpcServer;
private static GenericContainer<?> collector;

@BeforeAll
static void beforeAll() {
PrometheusHttpServer prometheusHttpServer = PrometheusHttpServer.builder().setPort(0).build();
prometheusPort = prometheusHttpServer.getAddress().getPort();
resource = Resource.getDefault();
meterProvider =
SdkMeterProvider.builder()
.setResource(resource)
.registerMetricReader(prometheusHttpServer)
.build();
exposeHostPorts(prometheusPort);

grpcServer = new OtlpGrpcServer();
grpcServer.start();
exposeHostPorts(grpcServer.httpPort());

collector =
new GenericContainer<>(DockerImageName.parse(COLLECTOR_IMAGE))
.withEnv("APP_ENDPOINT", "host.testcontainers.internal:" + prometheusPort)
.withEnv("LOGGING_EXPORTER_VERBOSITY", "detailed")
.withEnv(
"OTLP_EXPORTER_ENDPOINT", "host.testcontainers.internal:" + grpcServer.httpPort())
.withClasspathResourceMapping(
"otel-config.yaml", "/otel-config.yaml", BindMode.READ_ONLY)
.withCommand("--config", "/otel-config.yaml")
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("otel-collector")))
.withExposedPorts(COLLECTOR_HEALTH_CHECK_PORT)
.waitingFor(Wait.forHttp("/").forPort(COLLECTOR_HEALTH_CHECK_PORT));
collector.start();
}

@AfterAll
static void afterAll() {
meterProvider.shutdown().join(10, TimeUnit.SECONDS);
grpcServer.stop().join();
collector.stop();
}

@AfterEach
void afterEach() {
grpcServer.reset();
}

@Test
void endToEnd() {
Meter meter = meterProvider.meterBuilder("test").setInstrumentationVersion("1.0.0").build();

meter
.counterBuilder("requests")
.build()
.add(3, Attributes.builder().put("animal", "bear").build());

await()
.atMost(Duration.ofSeconds(30))
.untilAsserted(() -> assertThat(grpcServer.metricRequests.size()).isGreaterThan(0));

ExportMetricsServiceRequest request = grpcServer.metricRequests.get(0);
Assertions.assertThat(request.getResourceMetricsCount()).isEqualTo(1);

ResourceMetrics resourceMetrics = request.getResourceMetrics(0);
assertThat(resourceMetrics.getResource().getAttributesList())
.containsExactlyInAnyOrder(
// Resource attributes derived from the prometheus scrape config
stringKeyValue("service.name", "app"),
stringKeyValue("service.instance.id", "host.testcontainers.internal:" + prometheusPort),
stringKeyValue("net.host.name", "host.testcontainers.internal"),
stringKeyValue("net.host.port", String.valueOf(prometheusPort)),
stringKeyValue("http.scheme", "http"),
// Resource attributes from the metric SDK resource translated to target_info
stringKeyValue(
"service_name",
Objects.requireNonNull(
resource.getAttributes().get(ResourceAttributes.SERVICE_NAME))),
stringKeyValue(
"telemetry_sdk_name",
Objects.requireNonNull(
resource.getAttributes().get(ResourceAttributes.TELEMETRY_SDK_NAME))),
stringKeyValue(
"telemetry_sdk_language",
Objects.requireNonNull(
resource.getAttributes().get(ResourceAttributes.TELEMETRY_SDK_LANGUAGE))),
stringKeyValue(
"telemetry_sdk_version",
Objects.requireNonNull(
resource.getAttributes().get(ResourceAttributes.TELEMETRY_SDK_VERSION))));

assertThat(resourceMetrics.getScopeMetricsCount()).isEqualTo(1);
ScopeMetrics scopeMetrics = resourceMetrics.getScopeMetrics(0);
assertThat(scopeMetrics.getScope().getName()).isEqualTo("");

Optional<Metric> optRequestTotal =
scopeMetrics.getMetricsList().stream()
.filter(metric -> metric.getName().equals("requests_total"))
.findFirst();
assertThat(optRequestTotal).isPresent();
Metric requestTotal = optRequestTotal.get();
assertThat(requestTotal.getDataCase()).isEqualTo(Metric.DataCase.SUM);

Sum requestTotalSum = requestTotal.getSum();
assertThat(requestTotalSum.getAggregationTemporality())
.isEqualTo(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE);
assertThat(requestTotalSum.getIsMonotonic()).isTrue();
assertThat(requestTotalSum.getDataPointsCount()).isEqualTo(1);

NumberDataPoint requestTotalDataPoint = requestTotalSum.getDataPoints(0);
assertThat(requestTotalDataPoint.getAsDouble()).isEqualTo(3.0);
assertThat(requestTotalDataPoint.getAttributesList())
.containsExactlyInAnyOrder(
stringKeyValue("animal", "bear"),
// Scope name and version are serialized as attributes to disambiguate metrics with the
// same name in different scopes
stringKeyValue("otel_scope_name", "test"),
stringKeyValue("otel_scope_version", "1.0.0"));

// Scope is serialized as info type metric, which is transformed to non-monotonic cumulative sum
Optional<Metric> optTestScopeInfo =
scopeMetrics.getMetricsList().stream()
.filter(metric -> metric.getName().equals("otel_scope_info"))
.findFirst();
assertThat(optTestScopeInfo).isPresent();
Metric testScopeInfo = optTestScopeInfo.get();
assertThat(testScopeInfo.getDataCase()).isEqualTo(Metric.DataCase.SUM);

Sum testScopeInfoSum = testScopeInfo.getSum();
assertThat(testScopeInfoSum.getAggregationTemporality())
.isEqualTo(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE);
assertThat(testScopeInfoSum.getIsMonotonic()).isFalse();
assertThat(testScopeInfoSum.getDataPointsCount()).isEqualTo(1);

NumberDataPoint testScopeInfoDataPoint = testScopeInfoSum.getDataPoints(0);
assertThat(testScopeInfoDataPoint.getAsDouble()).isEqualTo(1.0);
assertThat(testScopeInfoDataPoint.getAttributesList())
.containsExactlyInAnyOrder(
stringKeyValue("otel_scope_name", "test"),
stringKeyValue("otel_scope_version", "1.0.0"));
}

private static KeyValue stringKeyValue(String key, String value) {
return KeyValue.newBuilder()
.setKey(key)
.setValue(AnyValue.newBuilder().setStringValue(value).build())
.build();
}

private static class OtlpGrpcServer extends ServerExtension {

private final List<ExportMetricsServiceRequest> metricRequests = new ArrayList<>();

private void reset() {
metricRequests.clear();
}

@Override
protected void configure(ServerBuilder sb) {
sb.service(
"/opentelemetry.proto.collector.metrics.v1.MetricsService/Export",
new AbstractUnaryGrpcService() {
@Override
protected CompletionStage<byte[]> handleMessage(
ServiceRequestContext ctx, byte[] message) {
try {
metricRequests.add(ExportMetricsServiceRequest.parseFrom(message));
} catch (InvalidProtocolBufferException e) {
throw new UncheckedIOException(e);
}
return completedFuture(
ExportMetricsServiceResponse.getDefaultInstance().toByteArray());
}
});
sb.http(0);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
extensions:
health_check: {}
receivers:
prometheus:
config:
scrape_configs:
- job_name: 'app'
scrape_interval: 1s
static_configs:
- targets: ['$APP_ENDPOINT']
exporters:
logging:
verbosity: $LOGGING_EXPORTER_VERBOSITY
otlp:
endpoint: $OTLP_EXPORTER_ENDPOINT
tls:
insecure: true
compression: none
service:
extensions: [health_check]
pipelines:
metrics:
receivers: [prometheus]
exporters: [logging, otlp]
Loading

0 comments on commit 16a02b2

Please sign in to comment.