Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the Executor for PrometheusHttpServer configurable #5296

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -76,7 +75,7 @@ public static PrometheusHttpServerBuilder builder() {
return new PrometheusHttpServerBuilder();
}

PrometheusHttpServer(String host, int port) {
PrometheusHttpServer(String host, int port, ExecutorService executor) {
try {
server = HttpServer.create(new InetSocketAddress(host, port), 3);
} catch (IOException e) {
Expand All @@ -87,8 +86,7 @@ public static PrometheusHttpServerBuilder builder() {
server.createContext("/", metricsHandler);
server.createContext("/metrics", metricsHandler);
server.createContext("/-/healthy", HealthHandler.INSTANCE);

executor = Executors.newFixedThreadPool(5, THREAD_FACTORY);
this.executor = executor;
server.setExecutor(executor);

start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;

import io.opentelemetry.sdk.internal.DaemonThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** A builder for {@link PrometheusHttpServer}. */
public final class PrometheusHttpServerBuilder {

Expand All @@ -17,6 +21,8 @@ public final class PrometheusHttpServerBuilder {
private String host = DEFAULT_HOST;
private int port = DEFAULT_PORT;

private ExecutorService executor;

/** Sets the host to bind to. If unset, defaults to {@value #DEFAULT_HOST}. */
public PrometheusHttpServerBuilder setHost(String host) {
requireNonNull(host, "host");
Expand All @@ -32,13 +38,30 @@ public PrometheusHttpServerBuilder setPort(int port) {
return this;
}

/** Sets the {@link ExecutorService} to be used for {@link PrometheusHttpServer}. */
public PrometheusHttpServerBuilder setExecutor(ExecutorService executor) {
requireNonNull(executor, "executor");
this.executor = executor;
return this;
}

/**
* Returns a new {@link PrometheusHttpServer} with the configuration of this builder which can be
* registered with a {@link io.opentelemetry.sdk.metrics.SdkMeterProvider}.
*/
public PrometheusHttpServer build() {
return new PrometheusHttpServer(host, port);
ExecutorService executorService = this.executor;
if (executorService == null) {
executorService = getDefaultExecutor();
}
return new PrometheusHttpServer(host, port, executorService);
}

PrometheusHttpServerBuilder() {
this.executor = getDefaultExecutor();
}

PrometheusHttpServerBuilder() {}
private static ExecutorService getDefaultExecutor() {
return Executors.newFixedThreadPool(5, new DaemonThreadFactory("prometheus-http"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.exporter.prometheus;

import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand All @@ -32,11 +33,17 @@
import io.opentelemetry.sdk.resources.Resource;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.GZIPInputStream;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -295,6 +302,38 @@ void stringRepresentation() {
.isEqualTo("PrometheusHttpServer{address=" + prometheusServer.getAddress() + "}");
}

@Test
void defaultExecutor() {
assertThat(prometheusServer)
.extracting("executor", as(InstanceOfAssertFactories.type(ThreadPoolExecutor.class)))
.satisfies(
executor -> {
assertThat(executor.getCorePoolSize()).isEqualTo(5);
});
}

@Test
void customExecutor() throws IOException {
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(10);
int port;
try (ServerSocket socket2 = new ServerSocket(0)) {
port = socket2.getLocalPort();
}
PrometheusHttpServer server =
PrometheusHttpServer.builder()
.setHost("localhost")
.setPort(port)
.setExecutor(scheduledExecutor)
.build();
assertThat(server)
.extracting(
"executor", as(InstanceOfAssertFactories.type(ScheduledThreadPoolExecutor.class)))
.satisfies(
executor -> {
assertThat(executor.getCorePoolSize()).isEqualTo(10);
});
}

private static List<MetricData> generateTestData() {
return ImmutableList.of(
ImmutableMetricData.createLongSum(
Expand Down