From 327e0177c25aad847d6de0fa7b735e4a4702f05c Mon Sep 17 00:00:00 2001 From: Kaspar Kivistik Date: Fri, 15 Nov 2024 15:44:12 +0200 Subject: [PATCH] Add capability for anyone to define their implementation of OTLP metrics sending --- .../registry/otlp/OltpMetricsSender.java | 24 +++++ .../registry/otlp/OtlpHttpMetricsSender.java | 77 +++++++++++++ .../registry/otlp/OtlpMeterRegistry.java | 102 +++++++----------- .../registry/otlp/OtlpMeterRegistryTest.java | 20 ++-- 4 files changed, 150 insertions(+), 73 deletions(-) create mode 100644 implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OltpMetricsSender.java create mode 100644 implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpHttpMetricsSender.java diff --git a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OltpMetricsSender.java b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OltpMetricsSender.java new file mode 100644 index 0000000000..b921e7cabf --- /dev/null +++ b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OltpMetricsSender.java @@ -0,0 +1,24 @@ +/* + * Copyright 2022 VMware, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.registry.otlp; + +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; + +public interface OltpMetricsSender { + + void send(ExportMetricsServiceRequest request); + +} diff --git a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpHttpMetricsSender.java b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpHttpMetricsSender.java new file mode 100644 index 0000000000..cef38efa6b --- /dev/null +++ b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpHttpMetricsSender.java @@ -0,0 +1,77 @@ +/* + * Copyright 2022 VMware, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.registry.otlp; + +import io.micrometer.common.util.internal.logging.InternalLogger; +import io.micrometer.common.util.internal.logging.InternalLoggerFactory; +import io.micrometer.core.ipc.http.HttpSender; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; + +public class OtlpHttpMetricsSender implements OltpMetricsSender { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(OtlpHttpMetricsSender.class); + + // VisibleForTesting + final HttpSender httpSender; + + private final OtlpConfig config; + + private final String userAgentHeader; + + public OtlpHttpMetricsSender(HttpSender httpSender, OtlpConfig config) { + this.httpSender = httpSender; + this.config = config; + this.userAgentHeader = getUserAgentHeader(); + } + + @Override + public void send(ExportMetricsServiceRequest request) { + HttpSender.Request.Builder httpRequest = this.httpSender.post(config.url()) + .withHeader("User-Agent", userAgentHeader) + .withContent("application/x-protobuf", request.toByteArray()); + config.headers().forEach(httpRequest::withHeader); + try { + HttpSender.Response response = httpRequest.send(); + if (!response.isSuccessful()) { + logger.warn( + "Failed to publish metrics (context: {}). Server responded with HTTP status code {} and body {}", + getConfigurationContext(), response.code(), response.body()); + } + } + catch (Throwable e) { + logger.warn("Failed to publish metrics (context: {}) ", getConfigurationContext(), e); + } + } + + private String getUserAgentHeader() { + String plainExporter = "Micrometer-OTLP-Exporter-Java"; + if (OtlpMeterRegistry.class.getPackage().getImplementationVersion() == null) { + return plainExporter; + } + return plainExporter + "/" + OtlpMeterRegistry.class.getPackage().getImplementationVersion(); + } + + /** + * Get the configuration context. + * @return A message containing enough information for the log reader to figure out + * what configuration details may have contributed to the failure. + */ + private String getConfigurationContext() { + // While other values may contribute to failures, these two are most common + return "url=" + config.url() + ", resource-attributes=" + config.resourceAttributes(); + } + +} diff --git a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java index 6e58f37bd1..713826de47 100644 --- a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java +++ b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java @@ -18,12 +18,10 @@ import io.micrometer.common.lang.Nullable; import io.micrometer.common.util.internal.logging.InternalLogger; import io.micrometer.common.util.internal.logging.InternalLoggerFactory; -import io.micrometer.core.instrument.Gauge; -import io.micrometer.core.instrument.*; import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.*; import io.micrometer.core.instrument.config.NamingConvention; import io.micrometer.core.instrument.distribution.*; -import io.micrometer.core.instrument.distribution.Histogram; import io.micrometer.core.instrument.distribution.pause.PauseDetector; import io.micrometer.core.instrument.internal.DefaultGauge; import io.micrometer.core.instrument.internal.DefaultLongTaskTimer; @@ -36,14 +34,14 @@ import io.micrometer.core.instrument.util.MeterPartition; import io.micrometer.core.instrument.util.NamedThreadFactory; import io.micrometer.core.instrument.util.TimeUtils; -import io.micrometer.core.ipc.http.HttpSender; import io.micrometer.core.ipc.http.HttpUrlConnectionSender; import io.micrometer.registry.otlp.internal.CumulativeBase2ExponentialHistogram; import io.micrometer.registry.otlp.internal.DeltaBase2ExponentialHistogram; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.common.v1.AnyValue; import io.opentelemetry.proto.common.v1.KeyValue; -import io.opentelemetry.proto.metrics.v1.*; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; import io.opentelemetry.proto.resource.v1.Resource; import java.time.Duration; @@ -56,8 +54,7 @@ import java.util.function.ToLongFunction; /** - * Publishes meters in OTLP (OpenTelemetry Protocol) format. HTTP with Protobuf encoding - * is the only option currently supported. + * Publishes meters in OTLP (OpenTelemetry Protocol) format. * * @author Tommy Ludwig * @author Lenin Jaganathan @@ -83,7 +80,7 @@ public class OtlpMeterRegistry extends PushMeterRegistry { private final OtlpConfig config; - private final HttpSender httpSender; + private final OltpMetricsSender metricsSender; private final Resource resource; @@ -91,8 +88,6 @@ public class OtlpMeterRegistry extends PushMeterRegistry { private final TimeUnit baseTimeUnit; - private final String userAgentHeader; - // Time when the last scheduled rollOver has started. Applicable only for delta // flavour. private volatile long lastMeterRolloverStartTime = -1; @@ -109,27 +104,32 @@ public OtlpMeterRegistry(OtlpConfig config, Clock clock) { } /** - * Create an {@code OtlpMeterRegistry} instance. + * Create an {@code OtlpMeterRegistry} instance with an HTTP metrics sender. * @param config config * @param clock clock * @param threadFactory thread factory * @since 1.14.0 */ public OtlpMeterRegistry(OtlpConfig config, Clock clock, ThreadFactory threadFactory) { - this(config, clock, threadFactory, new HttpUrlConnectionSender()); + this(config, clock, threadFactory, new OtlpHttpMetricsSender(new HttpUrlConnectionSender(), config)); } - // VisibleForTesting - // not public until we decide what we want to expose in public API - // HttpSender may not be a good idea if we will support a non-HTTP transport - OtlpMeterRegistry(OtlpConfig config, Clock clock, ThreadFactory threadFactory, HttpSender httpSender) { + /** + * Create an {@code OtlpMeterRegistry} instance. + * @param config config + * @param clock clock + * @param threadFactory thread factory + * @param metricsSender metrics sender + * @since 1.14.0 + */ + public OtlpMeterRegistry(OtlpConfig config, Clock clock, ThreadFactory threadFactory, + OltpMetricsSender metricsSender) { super(config, clock); this.config = config; this.baseTimeUnit = config.baseTimeUnit(); - this.httpSender = httpSender; + this.metricsSender = metricsSender; this.resource = Resource.newBuilder().addAllAttributes(getResourceAttributes()).build(); this.aggregationTemporality = config.aggregationTemporality(); - this.userAgentHeader = getUserAgentHeader(); config().namingConvention(NamingConvention.dot); start(threadFactory); } @@ -178,34 +178,15 @@ protected void publish() { .build()) .build()) .build(); - HttpSender.Request.Builder httpRequest = this.httpSender.post(this.config.url()) - .withHeader("User-Agent", this.userAgentHeader) - .withContent("application/x-protobuf", request.toByteArray()); - this.config.headers().forEach(httpRequest::withHeader); - HttpSender.Response response = httpRequest.send(); - if (!response.isSuccessful()) { - logger.warn( - "Failed to publish metrics (context: {}). Server responded with HTTP status code {} and body {}", - getConfigurationContext(), response.code(), response.body()); - } + + metricsSender.send(request); } catch (Throwable e) { - logger.warn(String.format("Failed to publish metrics to OTLP receiver (context: %s)", - getConfigurationContext()), e); + logger.warn("Failed to publish metrics to OTLP receiver", e); } } } - /** - * Get the configuration context. - * @return A message containing enough information for the log reader to figure out - * what configuration details may have contributed to the failure. - */ - private String getConfigurationContext() { - // While other values may contribute to failures, these two are most common - return "url=" + config.url() + ", resource-attributes=" + config.resourceAttributes(); - } - @Override protected Gauge newGauge(Meter.Id id, @Nullable T obj, ToDoubleFunction valueFunction) { return new DefaultGauge<>(id, obj, valueFunction); @@ -390,12 +371,12 @@ Iterable getResourceAttributes() { } static Histogram getHistogram(Clock clock, DistributionStatisticConfig distributionStatisticConfig, - OtlpConfig otlpConfig) { + OtlpConfig otlpConfig) { return getHistogram(clock, distributionStatisticConfig, otlpConfig, null); } static Histogram getHistogram(final Clock clock, final DistributionStatisticConfig distributionStatisticConfig, - final OtlpConfig otlpConfig, @Nullable final TimeUnit baseTimeUnit) { + final OtlpConfig otlpConfig, @Nullable final TimeUnit baseTimeUnit) { // While publishing to OTLP, we export either Histogram datapoint (Explicit // ExponentialBuckets // or Exponential) / Summary @@ -411,14 +392,14 @@ static Histogram getHistogram(final Clock clock, final DistributionStatisticConf } return otlpConfig.aggregationTemporality() == AggregationTemporality.DELTA - ? new DeltaBase2ExponentialHistogram(otlpConfig.maxScale(), otlpConfig.maxBucketCount(), - minimumExpectedValue, baseTimeUnit, clock, otlpConfig.step().toMillis()) - : new CumulativeBase2ExponentialHistogram(otlpConfig.maxScale(), otlpConfig.maxBucketCount(), - minimumExpectedValue, baseTimeUnit); + ? new DeltaBase2ExponentialHistogram(otlpConfig.maxScale(), otlpConfig.maxBucketCount(), + minimumExpectedValue, baseTimeUnit, clock, otlpConfig.step().toMillis()) + : new CumulativeBase2ExponentialHistogram(otlpConfig.maxScale(), otlpConfig.maxBucketCount(), + minimumExpectedValue, baseTimeUnit); } Histogram explicitBucketHistogram = getExplicitBucketHistogram(clock, distributionStatisticConfig, - otlpConfig.aggregationTemporality(), otlpConfig.step().toMillis()); + otlpConfig.aggregationTemporality(), otlpConfig.step().toMillis()); if (explicitBucketHistogram != null) { return explicitBucketHistogram; } @@ -431,13 +412,13 @@ static Histogram getHistogram(final Clock clock, final DistributionStatisticConf } static HistogramFlavor histogramFlavor(HistogramFlavor preferredHistogramFlavor, - DistributionStatisticConfig distributionStatisticConfig) { + DistributionStatisticConfig distributionStatisticConfig) { final double[] serviceLevelObjectiveBoundaries = distributionStatisticConfig .getServiceLevelObjectiveBoundaries(); if (distributionStatisticConfig.isPublishingHistogram() - && preferredHistogramFlavor == HistogramFlavor.BASE2_EXPONENTIAL_BUCKET_HISTOGRAM - && (serviceLevelObjectiveBoundaries == null || serviceLevelObjectiveBoundaries.length == 0)) { + && preferredHistogramFlavor == HistogramFlavor.BASE2_EXPONENTIAL_BUCKET_HISTOGRAM + && (serviceLevelObjectiveBoundaries == null || serviceLevelObjectiveBoundaries.length == 0)) { return HistogramFlavor.BASE2_EXPONENTIAL_BUCKET_HISTOGRAM; } return HistogramFlavor.EXPLICIT_BUCKET_HISTOGRAM; @@ -445,8 +426,8 @@ static HistogramFlavor histogramFlavor(HistogramFlavor preferredHistogramFlavor, @Nullable private static Histogram getExplicitBucketHistogram(final Clock clock, - final DistributionStatisticConfig distributionStatisticConfig, - final AggregationTemporality aggregationTemporality, final long stepMillis) { + final DistributionStatisticConfig distributionStatisticConfig, + final AggregationTemporality aggregationTemporality, final long stepMillis) { double[] sloWithPositiveInf = getSloWithPositiveInf(distributionStatisticConfig); if (AggregationTemporality.isCumulative(aggregationTemporality)) { @@ -461,11 +442,11 @@ private static Histogram getExplicitBucketHistogram(final Clock clock, } if (AggregationTemporality.isDelta(aggregationTemporality) && stepMillis > 0) { return new OtlpStepBucketHistogram(clock, stepMillis, - DistributionStatisticConfig.builder() - .serviceLevelObjectives(sloWithPositiveInf) - .build() - .merge(distributionStatisticConfig), - true, false); + DistributionStatisticConfig.builder() + .serviceLevelObjectives(sloWithPositiveInf) + .build() + .merge(distributionStatisticConfig), + true, false); } return null; @@ -492,11 +473,4 @@ static double[] getSloWithPositiveInf(DistributionStatisticConfig distributionSt return sloWithPositiveInf; } - private String getUserAgentHeader() { - if (this.getClass().getPackage().getImplementationVersion() == null) { - return "Micrometer-OTLP-Exporter-Java"; - } - return "Micrometer-OTLP-Exporter-Java/" + this.getClass().getPackage().getImplementationVersion(); - } - } diff --git a/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpMeterRegistryTest.java b/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpMeterRegistryTest.java index 9d033f7ed8..f9edf08a4e 100644 --- a/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpMeterRegistryTest.java +++ b/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpMeterRegistryTest.java @@ -54,12 +54,12 @@ abstract class OtlpMeterRegistryTest { protected MockClock clock; - private HttpSender mockHttpSender; - OtlpMeterRegistry registry; OtlpMeterRegistry registryWithExponentialHistogram; + private OtlpHttpMetricsSender metricsSender; + abstract OtlpConfig otlpConfig(); abstract OtlpConfig exponentialHistogramOtlpConfig(); @@ -67,9 +67,10 @@ abstract class OtlpMeterRegistryTest { @BeforeEach void setUp() { this.clock = new MockClock(); - this.mockHttpSender = mock(HttpSender.class); - this.registry = new OtlpMeterRegistry(otlpConfig(), this.clock, - new NamedThreadFactory("otlp-metrics-publisher"), this.mockHttpSender); + OtlpConfig config = otlpConfig(); + this.metricsSender = new OtlpHttpMetricsSender(mock(HttpSender.class), config); + this.registry = new OtlpMeterRegistry(config, this.clock, new NamedThreadFactory("otlp-metrics-publisher"), + metricsSender); this.registryWithExponentialHistogram = new OtlpMeterRegistry(exponentialHistogramOtlpConfig(), clock); } @@ -148,15 +149,16 @@ void timeGauge() { @Issue("#5577") @Test void httpHeaders() throws Throwable { - HttpSender.Request.Builder builder = HttpSender.Request.build(otlpConfig().url(), this.mockHttpSender); - when(mockHttpSender.post(otlpConfig().url())).thenReturn(builder); + HttpSender.Request.Builder builder = HttpSender.Request.build(otlpConfig().url(), + this.metricsSender.httpSender); + when(metricsSender.httpSender.post(otlpConfig().url())).thenReturn(builder); - when(mockHttpSender.send(isA(HttpSender.Request.class))).thenReturn(new HttpSender.Response(200, "")); + when(metricsSender.httpSender.send(isA(HttpSender.Request.class))).thenReturn(new HttpSender.Response(200, "")); writeToMetric(TimeGauge.builder("gauge.time", this, TimeUnit.MICROSECONDS, o -> 24).register(registry)); registry.publish(); - verify(this.mockHttpSender).send(assertArg(request -> { + verify(this.metricsSender.httpSender).send(assertArg(request -> { assertThat(request.getRequestHeaders().get("User-Agent")).startsWith("Micrometer-OTLP-Exporter-Java"); assertThat(request.getRequestHeaders()).containsEntry("Content-Type", "application/x-protobuf"); }));