From 4ce1c450fb286f91c6e54d7a846c7e97858c7536 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Mon, 11 Nov 2024 10:46:40 +0800 Subject: [PATCH] KAFKA-16143: New JMX metrics for AsyncKafkaConsumer Signed-off-by: PoAn Yang --- .../internals/AsyncKafkaConsumer.java | 55 +++-- .../internals/ConsumerNetworkThread.java | 16 +- .../internals/NetworkClientDelegate.java | 23 +- .../consumer/internals/ShareConsumerImpl.java | 21 +- .../internals/events/ApplicationEvent.java | 14 ++ .../events/ApplicationEventHandler.java | 14 +- .../internals/events/BackgroundEvent.java | 14 ++ .../events/BackgroundEventHandler.java | 13 +- .../metrics/KafkaAsyncConsumerMetrics.java | 186 ++++++++++++++++ .../ApplicationEventHandlerTest.java | 66 ++++++ .../internals/AsyncKafkaConsumerTest.java | 30 ++- .../internals/BackgroundEventHandlerTest.java | 50 +++++ .../ConsumerMembershipManagerTest.java | 2 +- .../internals/ConsumerNetworkThreadTest.java | 59 +++++- .../internals/FetchRequestManagerTest.java | 2 +- .../internals/NetworkClientDelegateTest.java | 43 +++- .../ShareConsumeRequestManagerTest.java | 12 +- .../internals/ShareConsumerImplTest.java | 2 +- .../KafkaAsyncConsumerMetricsTest.java | 198 ++++++++++++++++++ 19 files changed, 769 insertions(+), 51 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaAsyncConsumerMetrics.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaAsyncConsumerMetricsTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index da3f3f2f25b45..176ff809c9d23 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -65,7 +65,7 @@ import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent; -import org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics; +import org.apache.kafka.clients.consumer.internals.metrics.KafkaAsyncConsumerMetrics; import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.IsolationLevel; @@ -213,7 +213,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final ApplicationEventHandler applicationEventHandler; private final Time time; private final AtomicReference> groupMetadata = new AtomicReference<>(Optional.empty()); - private final KafkaConsumerMetrics kafkaConsumerMetrics; + private final KafkaAsyncConsumerMetrics kafkaAsyncConsumerMetrics; private Logger log; private final String clientId; private final BlockingQueue backgroundEventQueue; @@ -318,6 +318,7 @@ public void onGroupAssignmentUpdated(Set partitions) { this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); this.clientTelemetryReporter.ifPresent(reporters::add); this.metrics = createMetrics(config, time, reporters); + this.kafkaAsyncConsumerMetrics = new KafkaAsyncConsumerMetrics(metrics, CONSUMER_METRIC_GROUP_PREFIX); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); List> interceptorList = configuredConsumerInterceptors(config); @@ -337,7 +338,8 @@ public void onGroupAssignmentUpdated(Set partitions) { ApiVersions apiVersions = new ApiVersions(); final BlockingQueue applicationEventQueue = new LinkedBlockingQueue<>(); - final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); + final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler( + backgroundEventQueue, time, Optional.of(kafkaAsyncConsumerMetrics)); // This FetchBuffer is shared between the application and network threads. this.fetchBuffer = new FetchBuffer(logContext); @@ -349,7 +351,8 @@ public void onGroupAssignmentUpdated(Set partitions) { metrics, fetchMetricsManager.throttleTimeSensor(), clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), - backgroundEventHandler); + backgroundEventHandler, + Optional.of(kafkaAsyncConsumerMetrics)); this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); this.groupMetadata.set(initializeGroupMetadata(config, groupRebalanceConfig)); final Supplier requestManagersSupplier = RequestManagers.supplier(time, @@ -379,7 +382,8 @@ public void onGroupAssignmentUpdated(Set partitions) { new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, - requestManagersSupplier); + requestManagersSupplier, + Optional.of(kafkaAsyncConsumerMetrics)); this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker( logContext, @@ -399,8 +403,6 @@ public void onGroupAssignmentUpdated(Set partitions) { fetchMetricsManager, time); - this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, CONSUMER_METRIC_GROUP_PREFIX); - if (groupMetadata.get().isPresent() && GroupProtocol.of(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)) == GroupProtocol.CONSUMER) { config.ignore(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG); // Used by background thread @@ -457,7 +459,7 @@ public void onGroupAssignmentUpdated(Set partitions) { this.defaultApiTimeoutMs = defaultApiTimeoutMs; this.deserializers = deserializers; this.applicationEventHandler = applicationEventHandler; - this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer"); + this.kafkaAsyncConsumerMetrics = new KafkaAsyncConsumerMetrics(metrics, CONSUMER_METRIC_GROUP_PREFIX); this.clientTelemetryReporter = Optional.empty(); this.autoCommitEnabled = autoCommitEnabled; this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); @@ -495,7 +497,7 @@ public void onGroupAssignmentUpdated(Set partitions) { deserializers, fetchMetricsManager, time); - this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer"); + this.kafkaAsyncConsumerMetrics = new KafkaAsyncConsumerMetrics(metrics, CONSUMER_METRIC_GROUP_PREFIX); GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( config, @@ -506,7 +508,8 @@ public void onGroupAssignmentUpdated(Set partitions) { BlockingQueue applicationEventQueue = new LinkedBlockingQueue<>(); this.backgroundEventQueue = new LinkedBlockingQueue<>(); - BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); + BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler( + backgroundEventQueue, time, Optional.of(kafkaAsyncConsumerMetrics)); this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker( logContext, subscriptions, @@ -520,7 +523,8 @@ public void onGroupAssignmentUpdated(Set partitions) { logContext, client, metadata, - backgroundEventHandler + backgroundEventHandler, + Optional.of(kafkaAsyncConsumerMetrics) ); this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); Supplier requestManagersSupplier = RequestManagers.supplier( @@ -552,7 +556,8 @@ public void onGroupAssignmentUpdated(Set partitions) { new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, - requestManagersSupplier); + requestManagersSupplier, + Optional.of(kafkaAsyncConsumerMetrics)); this.backgroundEventProcessor = new BackgroundEventProcessor(); this.backgroundEventReaper = new CompletableEventReaper(logContext); } @@ -567,7 +572,8 @@ ApplicationEventHandler build( final CompletableEventReaper applicationEventReaper, final Supplier applicationEventProcessorSupplier, final Supplier networkClientDelegateSupplier, - final Supplier requestManagersSupplier + final Supplier requestManagersSupplier, + final Optional kafkaConsumerMetrics ); } @@ -709,7 +715,7 @@ public ConsumerRecords poll(final Duration timeout) { acquireAndEnsureOpen(); try { - kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs()); + kafkaAsyncConsumerMetrics.recordPollStart(timer.currentTimeMs()); if (subscriptions.hasNoSubscriptionOrUserAssignment()) { throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); @@ -749,7 +755,7 @@ public ConsumerRecords poll(final Duration timeout) { return ConsumerRecords.empty(); } finally { - kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs()); + kafkaAsyncConsumerMetrics.recordPollEnd(timer.currentTimeMs()); release(); } } @@ -943,7 +949,7 @@ public Map committed(final Set closeQuietly(reporter, "async consumer telemetry reporter", firstException)); @@ -1455,7 +1461,7 @@ private void commitSync(Optional> offsets interceptors.onCommit(committedOffsets); } finally { wakeupTrigger.clearTask(); - kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); + kafkaAsyncConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); release(); } } @@ -1893,14 +1899,19 @@ private void subscribeInternal(Collection topics, Optional firstError = new AtomicReference<>(); LinkedList events = new LinkedList<>(); backgroundEventQueue.drainTo(events); + kafkaAsyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size()); for (BackgroundEvent event : events) { + kafkaAsyncConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() - event.enqueuedMs()); + long startMs = time.milliseconds(); try { if (event instanceof CompletableEvent) backgroundEventReaper.add((CompletableEvent) event); @@ -1911,6 +1922,8 @@ private boolean processBackgroundEvents() { if (!firstError.compareAndSet(null, e)) log.warn("An error occurred when processing the background event: {}", e.getMessage(), e); + } finally { + kafkaAsyncConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds() - startMs); } } @@ -2040,8 +2053,8 @@ public Metrics metricsRegistry() { } @Override - public KafkaConsumerMetrics kafkaConsumerMetrics() { - return kafkaConsumerMetrics; + public KafkaAsyncConsumerMetrics kafkaConsumerMetrics() { + return kafkaAsyncConsumerMetrics; } // Visible for testing diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index 4f7d256104bb8..e5915bcc1157a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; +import org.apache.kafka.clients.consumer.internals.metrics.KafkaAsyncConsumerMetrics; import org.apache.kafka.common.internals.IdempotentCloser; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.utils.KafkaThread; @@ -60,6 +61,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { private final Supplier applicationEventProcessorSupplier; private final Supplier networkClientDelegateSupplier; private final Supplier requestManagersSupplier; + private final Optional kafkaAsyncConsumerMetrics; private ApplicationEventProcessor applicationEventProcessor; private NetworkClientDelegate networkClientDelegate; private RequestManagers requestManagers; @@ -67,6 +69,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { private final IdempotentCloser closer = new IdempotentCloser(); private volatile Duration closeTimeout = Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS); private volatile long cachedMaximumTimeToWait = MAX_POLL_TIMEOUT_MS; + private long lastPollTimeMs = 0L; public ConsumerNetworkThread(LogContext logContext, Time time, @@ -74,7 +77,8 @@ public ConsumerNetworkThread(LogContext logContext, CompletableEventReaper applicationEventReaper, Supplier applicationEventProcessorSupplier, Supplier networkClientDelegateSupplier, - Supplier requestManagersSupplier) { + Supplier requestManagersSupplier, + Optional kafkaAsyncConsumerMetrics) { super(BACKGROUND_THREAD_NAME, true); this.time = time; this.log = logContext.logger(getClass()); @@ -84,6 +88,7 @@ public ConsumerNetworkThread(LogContext logContext, this.networkClientDelegateSupplier = networkClientDelegateSupplier; this.requestManagersSupplier = requestManagersSupplier; this.running = true; + this.kafkaAsyncConsumerMetrics = kafkaAsyncConsumerMetrics; } @Override @@ -139,6 +144,10 @@ void runOnce() { processApplicationEvents(); final long currentTimeMs = time.milliseconds(); + final long timeSinceLastPollMs = lastPollTimeMs != 0L ? currentTimeMs - lastPollTimeMs : 0; + lastPollTimeMs = currentTimeMs; + kafkaAsyncConsumerMetrics.ifPresent(metrics -> metrics.recordTimeBetweenNetworkThreadPoll(timeSinceLastPollMs)); + final long pollWaitTimeMs = requestManagers.entries().stream() .filter(Optional::isPresent) .map(Optional::get) @@ -162,8 +171,11 @@ void runOnce() { private void processApplicationEvents() { LinkedList events = new LinkedList<>(); applicationEventQueue.drainTo(events); + kafkaAsyncConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueSize(0)); for (ApplicationEvent event : events) { + long startMs = time.milliseconds(); + kafkaAsyncConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueTime(startMs - event.enqueuedMs())); try { if (event instanceof CompletableEvent) applicationEventReaper.add((CompletableEvent) event); @@ -171,6 +183,8 @@ private void processApplicationEvents() { applicationEventProcessor.process(event); } catch (Throwable t) { log.warn("Error processing event {}", t.getMessage(), t); + } finally { + kafkaAsyncConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueProcessingTime(time.milliseconds() - startMs)); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 56e4d6977480a..56bb26efd1566 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; +import org.apache.kafka.clients.consumer.internals.metrics.KafkaAsyncConsumerMetrics; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; @@ -69,6 +70,7 @@ public class NetworkClientDelegate implements AutoCloseable { private final int requestTimeoutMs; private final Queue unsentRequests; private final long retryBackoffMs; + private final Optional kafkaConsumerMetrics; public NetworkClientDelegate( final Time time, @@ -76,7 +78,8 @@ public NetworkClientDelegate( final LogContext logContext, final KafkaClient client, final Metadata metadata, - final BackgroundEventHandler backgroundEventHandler) { + final BackgroundEventHandler backgroundEventHandler, + final Optional kafkaConsumerMetrics) { this.time = time; this.client = client; this.metadata = metadata; @@ -85,6 +88,7 @@ public NetworkClientDelegate( this.unsentRequests = new ArrayDeque<>(); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); + this.kafkaConsumerMetrics = kafkaConsumerMetrics; } // Visible for testing @@ -170,6 +174,7 @@ private void trySend(final long currentTimeMs) { Iterator iterator = unsentRequests.iterator(); while (iterator.hasNext()) { UnsentRequest unsent = iterator.next(); + kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordUnsentRequestsQueueTime(currentTimeMs - unsent.enqueuedMs())); unsent.timer.update(currentTimeMs); if (unsent.timer.isExpired()) { iterator.remove(); @@ -267,7 +272,9 @@ public void addAll(final List requests) { public void add(final UnsentRequest r) { Objects.requireNonNull(r); r.setTimer(this.time, this.requestTimeoutMs); + r.setEnqueuedMs(this.time.milliseconds()); unsentRequests.add(r); + kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordUnsentRequestsQueueSize(unsentRequests.size())); } public static class PollResult { @@ -300,6 +307,7 @@ public static class UnsentRequest { private final Optional node; // empty if random node can be chosen private Timer timer; + private long enqueuedMs; public UnsentRequest(final AbstractRequest.Builder requestBuilder, final Optional node) { @@ -317,6 +325,14 @@ Timer timer() { return timer; } + void setEnqueuedMs(final long enqueuedMs) { + this.enqueuedMs = enqueuedMs; + } + + long enqueuedMs() { + return enqueuedMs; + } + CompletableFuture future() { return handler.future; } @@ -412,7 +428,8 @@ public static Supplier supplier(final Time time, final Metrics metrics, final Sensor throttleTimeSensor, final ClientTelemetrySender clientTelemetrySender, - final BackgroundEventHandler backgroundEventHandler) { + final BackgroundEventHandler backgroundEventHandler, + final Optional kafkaConsumerMetrics) { return new CachedSupplier() { @Override protected NetworkClientDelegate create() { @@ -426,7 +443,7 @@ protected NetworkClientDelegate create() { metadata, throttleTimeSensor, clientTelemetrySender); - return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler); + return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, kafkaConsumerMetrics); } }; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index e209ec00b0d18..66ab78513aa2d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -47,6 +47,7 @@ import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent; import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent; +import org.apache.kafka.clients.consumer.internals.metrics.KafkaAsyncConsumerMetrics; import org.apache.kafka.clients.consumer.internals.metrics.KafkaShareConsumerMetrics; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -266,7 +267,8 @@ private enum AcknowledgementMode { ShareFetchMetricsManager shareFetchMetricsManager = createShareFetchMetricsManager(metrics); ApiVersions apiVersions = new ApiVersions(); final BlockingQueue applicationEventQueue = new LinkedBlockingQueue<>(); - final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); + final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler( + backgroundEventQueue, time, Optional.empty()); // This FetchBuffer is shared between the application and network threads. this.fetchBuffer = new ShareFetchBuffer(logContext); @@ -279,7 +281,8 @@ private enum AcknowledgementMode { metrics, shareFetchMetricsManager.throttleTimeSensor(), clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), - backgroundEventHandler + backgroundEventHandler, + Optional.empty() ); this.completedAcknowledgements = new LinkedList<>(); @@ -310,7 +313,8 @@ private enum AcknowledgementMode { new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, - requestManagersSupplier); + requestManagersSupplier, + Optional.empty()); this.backgroundEventProcessor = new BackgroundEventProcessor(); this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext); @@ -375,10 +379,11 @@ private enum AcknowledgementMode { final BlockingQueue applicationEventQueue = new LinkedBlockingQueue<>(); final BlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); - final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); + final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler( + backgroundEventQueue, time, Optional.empty()); final Supplier networkClientDelegateSupplier = - () -> new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler); + () -> new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, Optional.empty()); GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( config, @@ -411,7 +416,8 @@ private enum AcknowledgementMode { new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, - requestManagersSupplier); + requestManagersSupplier, + Optional.empty()); this.backgroundEventQueue = new LinkedBlockingQueue<>(); this.backgroundEventProcessor = new BackgroundEventProcessor(); @@ -469,7 +475,8 @@ ApplicationEventHandler build( final CompletableEventReaper applicationEventReaper, final Supplier applicationEventProcessorSupplier, final Supplier networkClientDelegateSupplier, - final Supplier requestManagersSupplier + final Supplier requestManagersSupplier, + final Optional kafkaConsumerMetrics ); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index 9bd229a3fe806..bac261978a390 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -49,6 +49,12 @@ public enum Type { */ private final Uuid id; + /** + * The time in milliseconds when this event was enqueued. + * This field can be changed after the event is created, so it should not be used in hashCode, equals, or toStringBase. + */ + private long enqueuedMs; + protected ApplicationEvent(Type type) { this.type = Objects.requireNonNull(type); this.id = Uuid.randomUuid(); @@ -62,6 +68,14 @@ public Uuid id() { return id; } + public void setEnqueuedMs(long enqueuedMs) { + this.enqueuedMs = enqueuedMs; + } + + public long enqueuedMs() { + return enqueuedMs; + } + @Override public final boolean equals(Object o) { if (this == o) return true; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java index 0baafcd3038d1..756c6fa6c129b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerUtils; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; import org.apache.kafka.clients.consumer.internals.RequestManagers; +import org.apache.kafka.clients.consumer.internals.metrics.KafkaAsyncConsumerMetrics; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.internals.IdempotentCloser; import org.apache.kafka.common.utils.LogContext; @@ -31,6 +32,7 @@ import java.io.Closeable; import java.time.Duration; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Future; import java.util.function.Supplier; @@ -42,9 +44,11 @@ public class ApplicationEventHandler implements Closeable { private final Logger log; + private final Time time; private final BlockingQueue applicationEventQueue; private final ConsumerNetworkThread networkThread; private final IdempotentCloser closer = new IdempotentCloser(); + private final Optional kafkaAsyncConsumerMetrics; public ApplicationEventHandler(final LogContext logContext, final Time time, @@ -52,16 +56,20 @@ public ApplicationEventHandler(final LogContext logContext, final CompletableEventReaper applicationEventReaper, final Supplier applicationEventProcessorSupplier, final Supplier networkClientDelegateSupplier, - final Supplier requestManagersSupplier) { + final Supplier requestManagersSupplier, + final Optional kafkaAsyncConsumerMetrics) { this.log = logContext.logger(ApplicationEventHandler.class); + this.time = time; this.applicationEventQueue = applicationEventQueue; + this.kafkaAsyncConsumerMetrics = kafkaAsyncConsumerMetrics; this.networkThread = new ConsumerNetworkThread(logContext, time, applicationEventQueue, applicationEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, - requestManagersSupplier); + requestManagersSupplier, + kafkaAsyncConsumerMetrics); this.networkThread.start(); } @@ -73,7 +81,9 @@ public ApplicationEventHandler(final LogContext logContext, */ public void add(final ApplicationEvent event) { Objects.requireNonNull(event, "ApplicationEvent provided to add must be non-null"); + event.setEnqueuedMs(time.milliseconds()); applicationEventQueue.add(event); + kafkaAsyncConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueSize(applicationEventQueue.size())); wakeupNetworkThread(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java index 7e9fdaed2d837..e9eb6269716ed 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java @@ -38,6 +38,12 @@ public enum Type { */ private final Uuid id; + /** + * The time in milliseconds when this event was enqueued. + * This field can be changed after the event is created, so it should not be used in hashCode, equals, or toStringBase. + */ + private long enqueuedMs; + protected BackgroundEvent(Type type) { this.type = Objects.requireNonNull(type); this.id = Uuid.randomUuid(); @@ -51,6 +57,14 @@ public Uuid id() { return id; } + public void setEnqueuedMs(long enqueuedMs) { + this.enqueuedMs = enqueuedMs; + } + + public long enqueuedMs() { + return enqueuedMs; + } + @Override public final boolean equals(Object o) { if (this == o) return true; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java index f6ded0bf735e0..b656a4a372902 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java @@ -17,8 +17,11 @@ package org.apache.kafka.clients.consumer.internals.events; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; +import org.apache.kafka.clients.consumer.internals.metrics.KafkaAsyncConsumerMetrics; +import org.apache.kafka.common.utils.Time; import java.util.Objects; +import java.util.Optional; import java.util.Queue; /** @@ -30,9 +33,15 @@ public class BackgroundEventHandler { private final Queue backgroundEventQueue; + private final Time time; + private final Optional kafkaConsumerMetrics; - public BackgroundEventHandler(final Queue backgroundEventQueue) { + public BackgroundEventHandler(final Queue backgroundEventQueue, + final Time time, + final Optional kafkaConsumerMetrics) { this.backgroundEventQueue = backgroundEventQueue; + this.time = time; + this.kafkaConsumerMetrics = kafkaConsumerMetrics; } /** @@ -42,6 +51,8 @@ public BackgroundEventHandler(final Queue backgroundEventQueue) */ public void add(BackgroundEvent event) { Objects.requireNonNull(event, "BackgroundEvent provided to add must be non-null"); + event.setEnqueuedMs(time.milliseconds()); backgroundEventQueue.add(event); + kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordBackgroundEventQueueSize(backgroundEventQueue.size())); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaAsyncConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaAsyncConsumerMetrics.java new file mode 100644 index 0000000000000..a7c09af1ac039 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaAsyncConsumerMetrics.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.metrics; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Value; + +import java.util.Arrays; + +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX; + +public class KafkaAsyncConsumerMetrics extends KafkaConsumerMetrics implements AutoCloseable { + private final Metrics metrics; + + public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME = "time-between-network-thread-poll"; + public static final String APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME = "application-event-queue-size"; + public static final String APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME = "application-event-queue-time"; + public static final String APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = "application-event-queue-processing-time"; + public static final String BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME = "background-event-queue-size"; + public static final String BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME = "background-event-queue-time"; + public static final String BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = "background-event-queue-processing-time"; + public static final String UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME = "unsent-requests-queue-size"; + public static final String UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME = "unsent-requests-queue-time"; + private final Sensor timeBetweenNetworkThreadPollSensor; + private final Sensor applicationEventQueueSizeSensor; + private final Sensor applicationEventQueueTimeSensor; + private final Sensor applicationEventQueueProcessingTimeSensor; + private final Sensor backgroundEventQueueSizeSensor; + private final Sensor backgroundEventQueueTimeSensor; + private final Sensor backgroundEventQueueProcessingTimeSensor; + private final Sensor unsentRequestsQueueSizeSensor; + private final Sensor unsentRequestsQueueTimeSensor; + + public KafkaAsyncConsumerMetrics(Metrics metrics, String metricGrpPrefix) { + super(metrics, metricGrpPrefix); + + this.metrics = metrics; + final String metricGroupName = metricGrpPrefix + CONSUMER_METRICS_SUFFIX; + this.timeBetweenNetworkThreadPollSensor = metrics.sensor(TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME); + this.timeBetweenNetworkThreadPollSensor.add(metrics.metricName("time-between-network-thread-poll-avg", + metricGroupName, + "The average time taken, in milliseconds, between each poll in the network thread."), + new Avg()); + this.timeBetweenNetworkThreadPollSensor.add(metrics.metricName("time-between-network-thread-poll-max", + metricGroupName, + "The maximum time taken, in milliseconds, between each poll in the network thread."), + new Max()); + + this.applicationEventQueueSizeSensor = metrics.sensor(APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME); + this.applicationEventQueueSizeSensor.add(metrics.metricName("application-event-queue-size", + metricGroupName, + "The current number of events in the consumer network application event queue."), + new Value()); + + this.applicationEventQueueTimeSensor = metrics.sensor(APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME); + this.applicationEventQueueTimeSensor.add(metrics.metricName("application-event-queue-time-avg", + metricGroupName, + "The average time, in milliseconds, that application events are taking to be dequeued."), + new Avg()); + this.applicationEventQueueTimeSensor.add(metrics.metricName("application-event-queue-time-max", + metricGroupName, + "The maximum time, in milliseconds, that an application event took to be dequeued."), + new Max()); + + this.applicationEventQueueProcessingTimeSensor = metrics.sensor(APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME); + this.applicationEventQueueProcessingTimeSensor.add(metrics.metricName("application-event-queue-processing-time-avg", + metricGroupName, + "The average time, in milliseconds, that the consumer network takes to process all available application events."), + new Avg()); + this.applicationEventQueueProcessingTimeSensor.add(metrics.metricName("application-event-queue-processing-time-max", + metricGroupName, + "The maximum time, in milliseconds, that the consumer network took to process all available application events."), + new Max()); + + this.unsentRequestsQueueSizeSensor = metrics.sensor(UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME); + this.unsentRequestsQueueSizeSensor.add(metrics.metricName("unsent-requests-queue-size", + metricGroupName, + "The current number of unsent requests in the consumer network."), + new Value()); + + this.unsentRequestsQueueTimeSensor = metrics.sensor(UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME); + this.unsentRequestsQueueTimeSensor.add(metrics.metricName("unsent-requests-queue-time-avg", + metricGroupName, + "The average time, in milliseconds, that requests are taking to be sent in the consumer network."), + new Avg()); + this.unsentRequestsQueueTimeSensor.add(metrics.metricName("unsent-requests-queue-time-max", + metricGroupName, + "The maximum time, in milliseconds, that a request remained unsent in the consumer network."), + new Max()); + + this.backgroundEventQueueSizeSensor = metrics.sensor(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME); + this.backgroundEventQueueSizeSensor.add(metrics.metricName("background-event-queue-size", + metricGroupName, + "The current number of events in the consumer background event queue."), + new Value()); + + this.backgroundEventQueueTimeSensor = metrics.sensor(BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME); + this.backgroundEventQueueTimeSensor.add(metrics.metricName("background-event-queue-time-avg", + metricGroupName, + "The average time, in milliseconds, that background events are taking to be dequeued."), + new Avg()); + this.backgroundEventQueueTimeSensor.add(metrics.metricName("background-event-queue-time-max", + metricGroupName, + "The maximum time, in milliseconds, that background events are taking to be dequeued."), + new Max()); + + this.backgroundEventQueueProcessingTimeSensor = metrics.sensor(BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME); + this.backgroundEventQueueProcessingTimeSensor.add(metrics.metricName("background-event-queue-processing-time-avg", + metricGroupName, + "The average time, in milliseconds, that the consumer took to process all available background events."), + new Avg()); + this.backgroundEventQueueProcessingTimeSensor.add(metrics.metricName("background-event-queue-processing-time-max", + metricGroupName, + "The maximum time, in milliseconds, that the consumer took to process all available background events."), + new Max()); + } + + public void recordTimeBetweenNetworkThreadPoll(long timeBetweenNetworkThreadPoll) { + this.timeBetweenNetworkThreadPollSensor.record(timeBetweenNetworkThreadPoll); + } + + public void recordApplicationEventQueueSize(int size) { + this.applicationEventQueueSizeSensor.record(size); + } + + public void recordApplicationEventQueueTime(long time) { + this.applicationEventQueueTimeSensor.record(time); + } + + public void recordApplicationEventQueueProcessingTime(long processingTime) { + this.applicationEventQueueProcessingTimeSensor.record(processingTime); + } + + public void recordUnsentRequestsQueueSize(int size) { + this.unsentRequestsQueueSizeSensor.record(size); + } + + public void recordUnsentRequestsQueueTime(long time) { + this.unsentRequestsQueueTimeSensor.record(time); + } + + public void recordBackgroundEventQueueSize(int size) { + this.backgroundEventQueueSizeSensor.record(size); + } + + public void recordBackgroundEventQueueTime(long time) { + this.backgroundEventQueueTimeSensor.record(time); + } + + public void recordBackgroundEventQueueProcessingTime(long processingTime) { + this.backgroundEventQueueProcessingTimeSensor.record(processingTime); + } + + @Override + public void close() { + Arrays.asList( + timeBetweenNetworkThreadPollSensor.name(), + applicationEventQueueSizeSensor.name(), + applicationEventQueueTimeSensor.name(), + applicationEventQueueProcessingTimeSensor.name(), + backgroundEventQueueSizeSensor.name(), + backgroundEventQueueTimeSensor.name(), + backgroundEventQueueProcessingTimeSensor.name(), + unsentRequestsQueueSizeSensor.name(), + unsentRequestsQueueTimeSensor.name() + ).forEach(metrics::removeSensor); + super.close(); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java new file mode 100644 index 0000000000000..4536dc0a8e6fd --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; +import org.apache.kafka.clients.consumer.internals.events.PollEvent; +import org.apache.kafka.clients.consumer.internals.metrics.KafkaAsyncConsumerMetrics; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; + +public class ApplicationEventHandlerTest { + private final Time time = new MockTime(); + private final BlockingQueue applicationEventsQueue = new LinkedBlockingQueue<>(); + private final ApplicationEventProcessor applicationEventProcessor = mock(ApplicationEventProcessor.class); + private final NetworkClientDelegate networkClientDelegate = mock(NetworkClientDelegate.class); + private final RequestManagers requestManagers = mock(RequestManagers.class); + private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); + + @Test + public void testRecordApplicationEventQueueSize() { + try (Metrics metrics = new Metrics(); + KafkaAsyncConsumerMetrics kafkaConsumerMetrics = new KafkaAsyncConsumerMetrics(metrics, "consumer"); + ApplicationEventHandler applicationEventHandler = new ApplicationEventHandler( + new LogContext(), + time, + applicationEventsQueue, + applicationEventReaper, + () -> applicationEventProcessor, + () -> networkClientDelegate, + () -> requestManagers, + Optional.of(kafkaConsumerMetrics) + )) { + PollEvent event = new PollEvent(time.milliseconds()); + applicationEventHandler.add(event); + assertEquals(1, (double) metrics.metric(metrics.metricName("application-event-queue-size", "consumer-metrics")).metricValue()); + } + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 8eb8ec4c85bd5..b061b6b0ace95 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -53,6 +53,7 @@ import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; +import org.apache.kafka.clients.consumer.internals.metrics.KafkaAsyncConsumerMetrics; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.TopicPartition; @@ -197,7 +198,7 @@ private AsyncKafkaConsumer newConsumer(Properties props) { new StringDeserializer(), new StringDeserializer(), time, - (a, b, c, d, e, f, g) -> applicationEventHandler, + (a, b, c, d, e, f, g, h) -> applicationEventHandler, a -> backgroundEventReaper, (a, b, c, d, e, f, g) -> fetchCollector, (a, b, c, d) -> metadata, @@ -211,7 +212,7 @@ private AsyncKafkaConsumer newConsumer(ConsumerConfig config) { new StringDeserializer(), new StringDeserializer(), time, - (a, b, c, d, e, f, g) -> applicationEventHandler, + (a, b, c, d, e, f, g, h) -> applicationEventHandler, a -> backgroundEventReaper, (a, b, c, d, e, f, g) -> fetchCollector, (a, b, c, d) -> metadata, @@ -1829,6 +1830,31 @@ public void testSeekToEnd() { assertEquals(OffsetResetStrategy.LATEST, resetOffsetEvent.offsetResetStrategy()); } + @Test + public void testRecordBackgroundEventQueueSizeAndBackgroundEventQueueTime() { + consumer = newConsumer( + mock(FetchBuffer.class), + mock(ConsumerInterceptors.class), + mock(ConsumerRebalanceListenerInvoker.class), + mock(SubscriptionState.class), + "group-id", + "client-id", + false); + Metrics metrics = consumer.metricsRegistry(); + KafkaAsyncConsumerMetrics kafkaConsumerMetrics = consumer.kafkaConsumerMetrics(); + + ConsumerRebalanceListenerCallbackNeededEvent event = new ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, Collections.emptySortedSet()); + event.setEnqueuedMs(time.milliseconds()); + backgroundEventQueue.add(event); + kafkaConsumerMetrics.recordBackgroundEventQueueSize(1); + + time.sleep(10); + consumer.processBackgroundEvents(); + assertEquals(0, (double) metrics.metric(metrics.metricName("background-event-queue-size", "consumer-metrics")).metricValue()); + assertTrue((double) metrics.metric(metrics.metricName("background-event-queue-time-avg", "consumer-metrics")).metricValue() > 0); + assertTrue((double) metrics.metric(metrics.metricName("background-event-queue-time-max", "consumer-metrics")).metricValue() > 0); + } + private Map mockTopicPartitionOffset() { final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java new file mode 100644 index 0000000000000..aa2dee9f7dfda --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; +import org.apache.kafka.clients.consumer.internals.metrics.KafkaAsyncConsumerMetrics; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.MockTime; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class BackgroundEventHandlerTest { + private final BlockingQueue applicationEventsQueue = new LinkedBlockingQueue<>(); + + @Test + public void testRecordBackgroundEventQueueSize() { + try (Metrics metrics = new Metrics(); + KafkaAsyncConsumerMetrics kafkaConsumerMetrics = new KafkaAsyncConsumerMetrics(metrics, "consumer")) { + BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler( + applicationEventsQueue, + new MockTime(0), + Optional.of(kafkaConsumerMetrics)); + BackgroundEvent event = new ErrorEvent(new Throwable()); + backgroundEventHandler.add(event); + assertEquals(1, (double) metrics.metric(metrics.metricName("background-event-queue-size", "consumer-metrics")).metricValue()); + } + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java index 9d09aee2697ca..c75aca0ed16f6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java @@ -117,8 +117,8 @@ public void setup() { subscriptionState = mock(SubscriptionState.class); commitRequestManager = mock(CommitRequestManager.class); backgroundEventQueue = new LinkedBlockingQueue<>(); - backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); time = new MockTime(0); + backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue, time, Optional.empty()); metrics = new Metrics(time); rebalanceMetricsManager = new ConsumerRebalanceMetricsManager(metrics); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index 0e85fe2d79950..4b3d1d1c9719a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -19,6 +19,9 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; +import org.apache.kafka.clients.consumer.internals.events.PollEvent; +import org.apache.kafka.clients.consumer.internals.metrics.KafkaAsyncConsumerMetrics; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -82,7 +85,8 @@ public class ConsumerNetworkThreadTest { applicationEventReaper, () -> applicationEventProcessor, () -> networkClientDelegate, - () -> requestManagers + () -> requestManagers, + Optional.empty() ); } @@ -199,4 +203,57 @@ public void testSendUnsentRequests() { consumerNetworkThread.cleanup(); verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong()); } + + @Test + public void testRunOnceRecordTimeBetweenNetworkThreadPoll() { + try (Metrics metrics = new Metrics(); + KafkaAsyncConsumerMetrics kafkaConsumerMetrics = new KafkaAsyncConsumerMetrics(metrics, "consumer"); + ConsumerNetworkThread consumerNetworkThread = new ConsumerNetworkThread( + new LogContext(), + time, + applicationEventsQueue, + applicationEventReaper, + () -> applicationEventProcessor, + () -> networkClientDelegate, + () -> requestManagers, + Optional.of(kafkaConsumerMetrics) + )) { + consumerNetworkThread.initializeResources(); + + consumerNetworkThread.runOnce(); + time.sleep(10); + consumerNetworkThread.runOnce(); + assertTrue((double) metrics.metric(metrics.metricName("time-between-network-thread-poll-avg", "consumer-metrics")).metricValue() > 0); + assertTrue((double) metrics.metric(metrics.metricName("time-between-network-thread-poll-max", "consumer-metrics")).metricValue() > 0); + } + } + + @Test + public void testRunOnceRecordApplicationEventQueueSizeAndApplicationEventQueueTime() { + try (Metrics metrics = new Metrics(); + KafkaAsyncConsumerMetrics kafkaConsumerMetrics = new KafkaAsyncConsumerMetrics(metrics, "consumer"); + ConsumerNetworkThread consumerNetworkThread = new ConsumerNetworkThread( + new LogContext(), + time, + applicationEventsQueue, + applicationEventReaper, + () -> applicationEventProcessor, + () -> networkClientDelegate, + () -> requestManagers, + Optional.of(kafkaConsumerMetrics) + )) { + consumerNetworkThread.initializeResources(); + + PollEvent event = new PollEvent(0); + event.setEnqueuedMs(time.milliseconds()); + applicationEventsQueue.add(event); + kafkaConsumerMetrics.recordApplicationEventQueueSize(1); + + time.sleep(10); + consumerNetworkThread.runOnce(); + assertEquals(0, (double) metrics.metric(metrics.metricName("application-event-queue-size", "consumer-metrics")).metricValue()); + assertTrue((double) metrics.metric(metrics.metricName("application-event-queue-time-avg", "consumer-metrics")).metricValue() > 0); + assertTrue((double) metrics.metric(metrics.metricName("application-event-queue-time-max", "consumer-metrics")).metricValue() > 0); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index 3e3f70a7443f0..2d359c8b8167c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -3780,7 +3780,7 @@ public TestableNetworkClientDelegate(Time time, KafkaClient client, Metadata metadata, BackgroundEventHandler backgroundEventHandler) { - super(time, config, logContext, client, metadata, backgroundEventHandler); + super(time, config, logContext, client, metadata, backgroundEventHandler, Optional.empty()); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java index 10e454499431f..d1d63b23a03dd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java @@ -23,11 +23,13 @@ import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; +import org.apache.kafka.clients.consumer.internals.metrics.KafkaAsyncConsumerMetrics; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; @@ -213,7 +215,7 @@ public void testPropagateMetadataError() { doThrow(authException).when(metadata).maybeThrowAnyException(); LinkedList backgroundEventQueue = new LinkedList<>(); - this.backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); + this.backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue, time, Optional.empty()); NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(); assertEquals(0, backgroundEventQueue.size()); @@ -226,19 +228,50 @@ public void testPropagateMetadataError() { assertEquals(authException, ((ErrorEvent) event).error()); } + @Test + public void testRecordUnsentRequestsQueueSize() throws Exception { + try (Metrics metrics = new Metrics(); + KafkaAsyncConsumerMetrics kafkaConsumerMetrics = new KafkaAsyncConsumerMetrics(metrics, "consumer"); + NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(Optional.of(kafkaConsumerMetrics))) { + NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); + networkClientDelegate.add(unsentRequest); + assertEquals(1, (double) metrics.metric(metrics.metricName("unsent-requests-queue-size", "consumer-metrics")).metricValue()); + } + } + + @Test + public void testRecordUnsentRequestsQueueTime() throws Exception { + try (Metrics metrics = new Metrics(); + KafkaAsyncConsumerMetrics kafkaConsumerMetrics = new KafkaAsyncConsumerMetrics(metrics, "consumer"); + NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(Optional.of(kafkaConsumerMetrics))) { + NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); + networkClientDelegate.add(unsentRequest); + + long timeMs = time.milliseconds(); + networkClientDelegate.poll(0, timeMs + 10); + assertTrue((double) metrics.metric(metrics.metricName("unsent-requests-queue-time-avg", "consumer-metrics")).metricValue() > 0); + assertTrue((double) metrics.metric(metrics.metricName("unsent-requests-queue-time-max", "consumer-metrics")).metricValue() > 0); + } + } + public NetworkClientDelegate newNetworkClientDelegate() { + return newNetworkClientDelegate(Optional.empty()); + } + + public NetworkClientDelegate newNetworkClientDelegate(Optional kafkaConsumerMetrics) { LogContext logContext = new LogContext(); Properties properties = new Properties(); properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(GROUP_ID_CONFIG, GROUP_ID); properties.put(REQUEST_TIMEOUT_MS_CONFIG, REQUEST_TIMEOUT_MS); - return new NetworkClientDelegate(this.time, + return new NetworkClientDelegate(time, new ConsumerConfig(properties), logContext, - this.client, - this.metadata, - this.backgroundEventHandler); + client, + metadata, + backgroundEventHandler, + kafkaConsumerMetrics); } public NetworkClientDelegate.UnsentRequest newUnsentFindCoordinatorRequest() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index 6af9509d04b90..dfac86b1291e9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -1494,7 +1494,7 @@ private void buildRequestManager(MetricConfig metricConfig, subscriptions, fetchConfig, deserializers); - BackgroundEventHandler backgroundEventHandler = new TestableBackgroundEventHandler(completedAcknowledgements); + BackgroundEventHandler backgroundEventHandler = new TestableBackgroundEventHandler(time, completedAcknowledgements); shareConsumeRequestManager = spy(new TestableShareConsumeRequestManager<>( logContext, groupId, @@ -1525,7 +1525,9 @@ private void buildDependencies(MetricConfig metricConfig, properties.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(requestTimeoutMs)); properties.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs)); ConsumerConfig config = new ConsumerConfig(properties); - networkClientDelegate = spy(new TestableNetworkClientDelegate(time, config, logContext, client, metadata, new BackgroundEventHandler(new LinkedBlockingQueue<>()))); + networkClientDelegate = spy(new TestableNetworkClientDelegate( + time, config, logContext, client, metadata, + new BackgroundEventHandler(new LinkedBlockingQueue<>(), time, Optional.empty()))); } private class TestableShareConsumeRequestManager extends ShareConsumeRequestManager { @@ -1578,7 +1580,7 @@ public TestableNetworkClientDelegate(Time time, KafkaClient client, Metadata metadata, BackgroundEventHandler backgroundEventHandler) { - super(time, config, logContext, client, metadata, backgroundEventHandler); + super(time, config, logContext, client, metadata, backgroundEventHandler, Optional.empty()); } @Override @@ -1673,8 +1675,8 @@ private void failUnsentRequests(Node node) { private static class TestableBackgroundEventHandler extends BackgroundEventHandler { List> completedAcknowledgements; - public TestableBackgroundEventHandler(List> completedAcknowledgements) { - super(new LinkedBlockingQueue<>()); + public TestableBackgroundEventHandler(Time time, List> completedAcknowledgements) { + super(new LinkedBlockingQueue<>(), time, Optional.empty()); this.completedAcknowledgements = completedAcknowledgements; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java index dc06875f7565d..61734893eeee0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java @@ -123,7 +123,7 @@ private ShareConsumerImpl newConsumer(ConsumerConfig config) { new StringDeserializer(), new StringDeserializer(), time, - (a, b, c, d, e, f, g) -> applicationEventHandler, + (a, b, c, d, e, f, g, h) -> applicationEventHandler, a -> backgroundEventReaper, (a, b, c, d, e) -> fetchCollector, backgroundEventQueue diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaAsyncConsumerMetricsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaAsyncConsumerMetricsTest.java new file mode 100644 index 0000000000000..61ff4969201cc --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaAsyncConsumerMetricsTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.metrics; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Metrics; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashSet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class KafkaAsyncConsumerMetricsTest { + private static final long METRIC_VALUE = 123L; + private static final String CONSUMER_GROUP_PREFIX = "consumer"; + private static final String CONSUMER_METRIC_GROUP = "consumer-metrics"; + + private final Metrics metrics = new Metrics(); + private KafkaAsyncConsumerMetrics consumerMetrics; + + @AfterEach + public void tearDown() { + if (consumerMetrics != null) { + consumerMetrics.close(); + } + metrics.close(); + } + + @Test + public void shouldMetricNames() { + // create + consumerMetrics = new KafkaAsyncConsumerMetrics(metrics, CONSUMER_GROUP_PREFIX); + HashSet expectedMetrics = new HashSet<>(Arrays.asList( + metrics.metricName("last-poll-seconds-ago", CONSUMER_METRIC_GROUP), + metrics.metricName("time-between-poll-avg", CONSUMER_METRIC_GROUP), + metrics.metricName("time-between-poll-max", CONSUMER_METRIC_GROUP), + metrics.metricName("poll-idle-ratio-avg", CONSUMER_METRIC_GROUP), + metrics.metricName("commit-sync-time-ns-total", CONSUMER_METRIC_GROUP), + metrics.metricName("committed-time-ns-total", CONSUMER_METRIC_GROUP) + )); + expectedMetrics.forEach(metricName -> assertTrue(metrics.metrics().containsKey(metricName), "Missing metric: " + metricName)); + + HashSet expectedConsumerMetrics = new HashSet<>(Arrays.asList( + metrics.metricName("time-between-network-thread-poll-avg", CONSUMER_METRIC_GROUP), + metrics.metricName("time-between-network-thread-poll-max", CONSUMER_METRIC_GROUP), + metrics.metricName("application-event-queue-size", CONSUMER_METRIC_GROUP), + metrics.metricName("application-event-queue-time-avg", CONSUMER_METRIC_GROUP), + metrics.metricName("application-event-queue-time-max", CONSUMER_METRIC_GROUP), + metrics.metricName("application-event-queue-processing-time-avg", CONSUMER_METRIC_GROUP), + metrics.metricName("application-event-queue-processing-time-max", CONSUMER_METRIC_GROUP), + metrics.metricName("unsent-requests-queue-size", CONSUMER_METRIC_GROUP), + metrics.metricName("unsent-requests-queue-time-avg", CONSUMER_METRIC_GROUP), + metrics.metricName("unsent-requests-queue-time-max", CONSUMER_METRIC_GROUP), + metrics.metricName("background-event-queue-size", CONSUMER_METRIC_GROUP), + metrics.metricName("background-event-queue-time-avg", CONSUMER_METRIC_GROUP), + metrics.metricName("background-event-queue-time-max", CONSUMER_METRIC_GROUP), + metrics.metricName("background-event-queue-processing-time-avg", CONSUMER_METRIC_GROUP), + metrics.metricName("background-event-queue-processing-time-max", CONSUMER_METRIC_GROUP) + )); + expectedConsumerMetrics.forEach(metricName -> assertTrue(metrics.metrics().containsKey(metricName), "Missing metric: " + metricName)); + + // close + consumerMetrics.close(); + expectedMetrics.forEach(metricName -> assertFalse(metrics.metrics().containsKey(metricName), "Missing metric: " + metricName)); + expectedConsumerMetrics.forEach(metricName -> assertFalse(metrics.metrics().containsKey(metricName), "Missing metric: " + metricName)); + } + + @Test + public void shouldRecordTimeBetweenNetworkThreadPoll() { + consumerMetrics = new KafkaAsyncConsumerMetrics(metrics, CONSUMER_GROUP_PREFIX); + // When: + consumerMetrics.recordTimeBetweenNetworkThreadPoll(METRIC_VALUE); + + // Then: + assertMetricValue("time-between-network-thread-poll-avg"); + assertMetricValue("time-between-network-thread-poll-max"); + } + + @Test + public void shouldRecordApplicationEventQueueSize() { + consumerMetrics = new KafkaAsyncConsumerMetrics(metrics, CONSUMER_GROUP_PREFIX); + // When: + consumerMetrics.recordApplicationEventQueueSize(10); + + // Then: + assertEquals( + metrics.metric(metrics.metricName("application-event-queue-size", CONSUMER_METRIC_GROUP)).metricValue(), + (double) 10 + ); + } + + @Test + public void shouldRecordApplicationEventQueueTime() { + consumerMetrics = new KafkaAsyncConsumerMetrics(metrics, CONSUMER_GROUP_PREFIX); + // When: + consumerMetrics.recordApplicationEventQueueTime(METRIC_VALUE); + + // Then: + assertMetricValue("application-event-queue-time-avg"); + assertMetricValue("application-event-queue-time-max"); + } + + @Test + public void shouldRecordApplicationEventQueueProcessingTime() { + consumerMetrics = new KafkaAsyncConsumerMetrics(metrics, CONSUMER_GROUP_PREFIX); + // When: + consumerMetrics.recordApplicationEventQueueProcessingTime(METRIC_VALUE); + + // Then: + assertMetricValue("application-event-queue-processing-time-avg"); + assertMetricValue("application-event-queue-processing-time-max"); + } + + @Test + public void shouldRecordUnsentRequestsQueueSize() { + consumerMetrics = new KafkaAsyncConsumerMetrics(metrics, CONSUMER_GROUP_PREFIX); + // When: + consumerMetrics.recordUnsentRequestsQueueSize(10); + + // Then: + assertEquals( + metrics.metric(metrics.metricName("unsent-requests-queue-size", CONSUMER_METRIC_GROUP)).metricValue(), + (double) 10 + ); + } + + @Test + public void shouldRecordUnsentRequestsQueueTime() { + consumerMetrics = new KafkaAsyncConsumerMetrics(metrics, CONSUMER_GROUP_PREFIX); + // When: + consumerMetrics.recordUnsentRequestsQueueTime(METRIC_VALUE); + + // Then: + assertMetricValue("unsent-requests-queue-time-avg"); + assertMetricValue("unsent-requests-queue-time-max"); + } + + @Test + public void shouldRecordBackgroundEventQueueSize() { + consumerMetrics = new KafkaAsyncConsumerMetrics(metrics, CONSUMER_GROUP_PREFIX); + // When: + consumerMetrics.recordBackgroundEventQueueSize(10); + + // Then: + assertEquals( + metrics.metric(metrics.metricName("background-event-queue-size", CONSUMER_METRIC_GROUP)).metricValue(), + (double) 10 + ); + } + + @Test + public void shouldRecordBackgroundEventQueueTime() { + consumerMetrics = new KafkaAsyncConsumerMetrics(metrics, CONSUMER_GROUP_PREFIX); + // When: + consumerMetrics.recordBackgroundEventQueueTime(METRIC_VALUE); + + // Then: + assertMetricValue("background-event-queue-time-avg"); + assertMetricValue("background-event-queue-time-max"); + } + + @Test + public void shouldRecordBackgroundEventQueueProcessingTime() { + consumerMetrics = new KafkaAsyncConsumerMetrics(metrics, CONSUMER_GROUP_PREFIX); + // When: + consumerMetrics.recordBackgroundEventQueueProcessingTime(METRIC_VALUE); + + // Then: + assertMetricValue("background-event-queue-processing-time-avg"); + assertMetricValue("background-event-queue-processing-time-avg"); + } + + private void assertMetricValue(final String name) { + assertEquals( + metrics.metric(metrics.metricName(name, CONSUMER_METRIC_GROUP)).metricValue(), + (double) METRIC_VALUE + ); + } +}