Skip to content

Commit

Permalink
Fixes micrometer-metrics#4976 - added optional custom scheduler to Co…
Browse files Browse the repository at this point in the history
…nsumer KafkaClientMetrics
  • Loading branch information
vasiliy-sarzhynskyi committed Oct 11, 2024
1 parent c53b4b1 commit 06cff5f
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;

import java.util.concurrent.ScheduledExecutorService;

/**
* Kafka Client metrics binder. This should be closed on application shutdown to clean up
* resources.
Expand Down Expand Up @@ -60,6 +62,16 @@ public KafkaClientMetrics(Producer<?, ?> kafkaProducer) {
super(kafkaProducer::metrics);
}

/**
* Kafka {@link Consumer} metrics binder
* @param kafkaConsumer consumer instance to be instrumented
* @param tags additional tags
* @param scheduler scheduler to check and bind metrics
*/
public KafkaClientMetrics(Consumer<?, ?> kafkaConsumer, Iterable<Tag> tags, ScheduledExecutorService scheduler) {
super(kafkaConsumer::metrics, tags, scheduler);
}

/**
* Kafka {@link Consumer} metrics binder
* @param kafkaConsumer consumer instance to be instrumented
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class KafkaMetrics implements MeterBinder, AutoCloseable {
static final String KAFKA_VERSION_TAG_NAME = "kafka.version";
static final String DEFAULT_VALUE = "unknown";

private static final String DEFAULT_SCHEDULER_THREAD_NAME_PREFIX = "micrometer-kafka-metrics";

private static final Set<Class<?>> counterMeasurableClasses = new HashSet<>();

static {
Expand All @@ -96,8 +98,9 @@ class KafkaMetrics implements MeterBinder, AutoCloseable {

private final Duration refreshInterval;

private final ScheduledExecutorService scheduler = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("micrometer-kafka-metrics"));
private final ScheduledExecutorService scheduler;

private final boolean schedulerExternallyManaged;

@Nullable
private Iterable<Tag> commonTags;
Expand All @@ -122,11 +125,23 @@ class KafkaMetrics implements MeterBinder, AutoCloseable {
this(metricsSupplier, extraTags, DEFAULT_REFRESH_INTERVAL);
}

KafkaMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> extraTags,
ScheduledExecutorService scheduler) {
this(metricsSupplier, extraTags, DEFAULT_REFRESH_INTERVAL, scheduler, true);
}

KafkaMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> extraTags,
Duration refreshInterval) {
this(metricsSupplier, extraTags, refreshInterval, createDefaultScheduler(), false);
}

KafkaMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> extraTags,
Duration refreshInterval, ScheduledExecutorService scheduler, boolean schedulerExternallyManaged) {
this.metricsSupplier = metricsSupplier;
this.extraTags = extraTags;
this.refreshInterval = refreshInterval;
this.scheduler = scheduler;
this.schedulerExternallyManaged = schedulerExternallyManaged;
}

@Override
Expand Down Expand Up @@ -295,6 +310,10 @@ private static Class<? extends Measurable> getMeasurableClass(Metric metric) {
}
}

private static ScheduledExecutorService createDefaultScheduler() {
return Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(DEFAULT_SCHEDULER_THREAD_NAME_PREFIX));
}

private Gauge registerGauge(MeterRegistry registry, MetricName metricName, String meterName, Iterable<Tag> tags) {
return Gauge.builder(meterName, this.metrics, toMetricValue(metricName))
.tags(tags)
Expand Down Expand Up @@ -344,7 +363,9 @@ private Meter.Id meterIdForComparison(MetricName metricName) {

@Override
public void close() {
this.scheduler.shutdownNow();
if (!schedulerExternallyManaged) {
this.scheduler.shutdownNow();
}

for (Meter.Id id : registeredMeterIds) {
registry.remove(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.junit.jupiter.api.Test;

import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX;
import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
Expand All @@ -34,7 +36,7 @@ class KafkaClientMetricsConsumerTest {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";

private Tags tags = Tags.of("app", "myapp", "version", "1");
private final Tags tags = Tags.of("app", "myapp", "version", "1");

KafkaClientMetrics metrics;

Expand Down Expand Up @@ -71,6 +73,27 @@ void shouldCreateMetersWithTags() {
}
}

@Test
void shouldCreateMetersWithTagsAndCustomScheduler() {
try (Consumer<String, String> consumer = createConsumer()) {
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(2);
metrics = new KafkaClientMetrics(consumer, tags, customScheduler);
MeterRegistry registry = new SimpleMeterRegistry();

metrics.bindTo(registry);

assertThat(registry.getMeters()).hasSizeGreaterThan(0)
.extracting(meter -> meter.getId().getTag("app"))
.allMatch(s -> s.equals("myapp"));

metrics.close();
assertThat(customScheduler.isShutdown()).isFalse();

customScheduler.shutdownNow();
assertThat(customScheduler.isShutdown()).isTrue();
}
}

private Consumer<String, String> createConsumer() {
Properties consumerConfig = new Properties();
consumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class KafkaMetricsTest {

Expand Down Expand Up @@ -68,7 +71,7 @@ void shouldKeepMetersWhenMetricsDoNotChange() {
}

@Test
void closeShouldRemoveAllMeters() {
void closeShouldRemoveAllMetersAndShutdownDefaultScheduler() {
// Given
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
MetricName metricName = new MetricName("a", "b", "c", new LinkedHashMap<>());
Expand All @@ -80,9 +83,35 @@ void closeShouldRemoveAllMeters() {

kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
assertThat(isDefaultMetricsSchedulerThreadAlive()).isTrue();

kafkaMetrics.close();
assertThat(registry.getMeters()).isEmpty();
await().until(() -> !isDefaultMetricsSchedulerThreadAlive());
}

@Test
void closeShouldRemoveAllMetersAndNotShutdownCustomScheduler() {
// Given
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
MetricName metricName = new MetricName("a", "b", "c", new LinkedHashMap<>());
KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
return Collections.singletonMap(metricName, metric);
};
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(2);
kafkaMetrics = new KafkaMetrics(supplier, Collections.emptyList(), customScheduler);
MeterRegistry registry = new SimpleMeterRegistry();

kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
await().until(() -> !isDefaultMetricsSchedulerThreadAlive());

kafkaMetrics.close();
assertThat(registry.getMeters()).isEmpty();
assertThat(customScheduler.isShutdown()).isFalse();

customScheduler.shutdownNow();
assertThat(customScheduler.isShutdown()).isTrue();
}

@Test
Expand Down Expand Up @@ -552,4 +581,13 @@ private KafkaMetric createKafkaMetric(MetricName metricName) {
return new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
}

private static boolean isDefaultMetricsSchedulerThreadAlive() {
return Thread.getAllStackTraces()
.keySet()
.stream()
.filter(Thread::isAlive)
.map(Thread::getName)
.anyMatch(name -> name.startsWith("micrometer-kafka-metrics"));
}

}

0 comments on commit 06cff5f

Please sign in to comment.