Skip to content

Commit

Permalink
remove metrics from MeterRegistry when shutting down JettyServerThrea…
Browse files Browse the repository at this point in the history
…dPoolMetrics/InstrumentedQueuedThreadPool

fixes #4000
  • Loading branch information
wakingrufus authored and marcingrzejszczak committed Feb 6, 2024
1 parent d130d5e commit a13d34b
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.micrometer.core.instrument.binder.jetty;

import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
Expand All @@ -39,6 +40,9 @@ public class InstrumentedQueuedThreadPool extends QueuedThreadPool {

private final Iterable<Tag> tags;

@Nullable
private JettyServerThreadPoolMetrics threadPoolMetrics;

/**
* Default values for the instrumented thread pool.
* @param registry where metrics will be bound
Expand Down Expand Up @@ -112,8 +116,16 @@ public InstrumentedQueuedThreadPool(MeterRegistry registry, Iterable<Tag> tags,
@Override
protected void doStart() throws Exception {
super.doStart();
JettyServerThreadPoolMetrics threadPoolMetrics = new JettyServerThreadPoolMetrics(this, tags);
threadPoolMetrics = new JettyServerThreadPoolMetrics(this, tags);
threadPoolMetrics.bindTo(registry);
}

@Override
protected void doStop() throws Exception {
if (threadPoolMetrics != null) {
threadPoolMetrics.close();
}
super.doStop();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
*/
package io.micrometer.core.instrument.binder.jetty;

import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* {@link MeterBinder} for Jetty {@link ThreadPool}.
* <p>
Expand All @@ -38,52 +43,80 @@
* @author Manabu Matsuzaki
* @author Andy Wilkinson
* @author Johnny Lim
* @since 1.1.0
* @see InstrumentedQueuedThreadPool
* @since 1.1.0
*/
public class JettyServerThreadPoolMetrics implements MeterBinder {
public class JettyServerThreadPoolMetrics implements MeterBinder, AutoCloseable {

private final ThreadPool threadPool;

private final Iterable<Tag> tags;

private final String MIN = "jetty.threads.config.min";

private final String MAX = "jetty.threads.config.max";

private final String BUSY = "jetty.threads.busy";

private final String JOBS = "jetty.threads.jobs";

private final String CURRENT = "jetty.threads.current";

private final String IDLE = "jetty.threads.idle";
private final Set<Meter.Id> registeredMeterIds = ConcurrentHashMap.newKeySet();
@Nullable
private MeterRegistry registry;

public JettyServerThreadPoolMetrics(ThreadPool threadPool, Iterable<Tag> tags) {
this.threadPool = threadPool;
this.tags = tags;
}

@Override
public void bindTo(MeterRegistry registry) {
this.registry = registry;
if (threadPool instanceof SizedThreadPool) {
SizedThreadPool sizedThreadPool = (SizedThreadPool) threadPool;
Gauge.builder("jetty.threads.config.min", sizedThreadPool, SizedThreadPool::getMinThreads)
Gauge minGauge = Gauge.builder(MIN, sizedThreadPool, SizedThreadPool::getMinThreads)
.description("The minimum number of threads in the pool")
.tags(tags)
.register(registry);
Gauge.builder("jetty.threads.config.max", sizedThreadPool, SizedThreadPool::getMaxThreads)
registeredMeterIds.add(minGauge.getId());
Gauge maxGauge = Gauge.builder(MAX, sizedThreadPool, SizedThreadPool::getMaxThreads)
.description("The maximum number of threads in the pool")
.tags(tags)
.register(registry);
registeredMeterIds.add(maxGauge.getId());
if (threadPool instanceof QueuedThreadPool) {
QueuedThreadPool queuedThreadPool = (QueuedThreadPool) threadPool;
Gauge.builder("jetty.threads.busy", queuedThreadPool, QueuedThreadPool::getBusyThreads)
Gauge busyGauge = Gauge.builder(BUSY, queuedThreadPool, QueuedThreadPool::getBusyThreads)
.description("The number of busy threads in the pool")
.tags(tags)
.register(registry);
Gauge.builder("jetty.threads.jobs", queuedThreadPool, QueuedThreadPool::getQueueSize)
registeredMeterIds.add(busyGauge.getId());
Gauge jobsGauge = Gauge.builder(JOBS, queuedThreadPool, QueuedThreadPool::getQueueSize)
.description("Number of jobs queued waiting for a thread")
.tags(tags)
.register(registry);
registeredMeterIds.add(jobsGauge.getId());
}
}
Gauge.builder("jetty.threads.current", threadPool, ThreadPool::getThreads)
Gauge currentGauge = Gauge.builder(CURRENT, threadPool, ThreadPool::getThreads)
.description("The total number of threads in the pool")
.tags(tags)
.register(registry);
Gauge.builder("jetty.threads.idle", threadPool, ThreadPool::getIdleThreads)
registeredMeterIds.add(currentGauge.getId());
Gauge idleGauge = Gauge.builder(IDLE, threadPool, ThreadPool::getIdleThreads)
.description("The number of idle threads in the pool")
.tags(tags)
.register(registry);
registeredMeterIds.add(idleGauge.getId());
}

@Override
public void close() throws Exception {
if (registry != null) {
registeredMeterIds.forEach(registry::remove);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2019 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.core.instrument.binder.jetty;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.assertj.core.api.ListAssert;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;

class InstrumentedQueuedThreadPoolTest {

@Test
void test_metrics() throws Exception {
MeterRegistry meterRegistry = new SimpleMeterRegistry();
QueuedThreadPool instance = new InstrumentedQueuedThreadPool(meterRegistry,
Collections.singletonList(Tag.of("pool", "1")));

instance.start();

assertThatMetricsExist(meterRegistry);

Gauge jobsGauge = meterRegistry.find("jetty.threads.jobs").gauge();

assertThat(jobsGauge.getId().getTags()).as("ensure metrics have thread pool tag").contains(Tag.of("pool", "1"));

instance.stop();

assertThatMetricsDontExist(meterRegistry);
}

@Test
void test_multiple_pools() throws Exception {
MeterRegistry meterRegistry = new SimpleMeterRegistry();
QueuedThreadPool pool1 = new InstrumentedQueuedThreadPool(meterRegistry,
Collections.singletonList(Tag.of("pool", "1")));

QueuedThreadPool pool2 = new InstrumentedQueuedThreadPool(meterRegistry,
Collections.singletonList(Tag.of("pool", "2")));

pool1.start();
pool2.start();

assertThatMetricsExist(meterRegistry);

assertThat(meterRegistry.find("jetty.threads.jobs").tag("pool", "1").gauge()).as("pool 1 gauge exists")
.isNotNull();
assertThat(meterRegistry.find("jetty.threads.jobs").tag("pool", "2").gauge()).as("pool 2 gauge exists")
.isNotNull();

pool1.stop();

assertThat(meterRegistry.find("jetty.threads.jobs").tag("pool", "1").gauge())
.as("pool 1 gauge no longer exists")
.isNull();
assertThat(meterRegistry.find("jetty.threads.jobs").tag("pool", "2").gauge()).as("pool 2 gauge exists")
.isNotNull();

pool2.stop();

assertThatMetricsDontExist(meterRegistry);
}

private void assertThatMetricsExist(MeterRegistry meterRegistry) {
assertThatMetrics(meterRegistry, (l, a) -> a.containsAll(l));
}

private void assertThatMetricsDontExist(MeterRegistry meterRegistry) {
assertThatMetrics(meterRegistry, (l, a) -> a.doesNotContainAnyElementsOf(l));
}

private void assertThatMetrics(MeterRegistry meterRegistry,
BiFunction<Iterable<String>, ListAssert<String>, ListAssert<String>> assertFunction) {
assertFunction.apply(
Arrays.asList("jetty.threads.jobs", "jetty.threads.busy", "jetty.threads.idle",
"jetty.threads.config.max", "jetty.threads.config.min", "jetty.threads.current"),
assertThat(
meterRegistry.getMeters().stream().map(m -> m.getId().getName()).collect(Collectors.toList())));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,23 @@ class JettyServerThreadPoolMetricsTest {

private Server server;

private JettyServerThreadPoolMetrics threadPoolMetrics;

@BeforeEach
void setup() throws Exception {
registry = new SimpleMeterRegistry(SimpleConfig.DEFAULT, new MockClock());

Iterable<Tag> tags = Collections.singletonList(Tag.of("id", "0"));
QueuedThreadPool threadPool = new InstrumentedQueuedThreadPool(registry, tags);
QueuedThreadPool threadPool = new QueuedThreadPool();

threadPool.setMinThreads(32);
threadPool.setMaxThreads(100);
server = new Server(threadPool);
ServerConnector connector = new ServerConnector(server);
server.setConnectors(new Connector[] { connector });
server.start();
threadPoolMetrics = new JettyServerThreadPoolMetrics(threadPool, tags);
threadPoolMetrics.bindTo(registry);
}

@AfterEach
Expand All @@ -60,11 +65,15 @@ void teardown() throws Exception {
}

@Test
void threadMetrics() {
void threadMetrics() throws Exception {
assertThat(registry.get("jetty.threads.config.min").gauge().value()).isEqualTo(32.0);
assertThat(registry.get("jetty.threads.config.max").gauge().value()).isEqualTo(100.0);
assertThat(registry.get("jetty.threads.current").gauge().value()).isNotEqualTo(0.0);
assertThat(registry.get("jetty.threads.busy").gauge().value()).isGreaterThanOrEqualTo(0.0);

threadPoolMetrics.close();

assertThat(registry.getMeters()).as("Meters are removed after close").isEmpty();
}

}

0 comments on commit a13d34b

Please sign in to comment.