diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 5bbabe61a6ce1..ff679e5542d11 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -69,7 +69,7 @@ import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent; -import org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.IsolationLevel; @@ -109,7 +109,6 @@ import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -215,10 +214,11 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final ApplicationEventHandler applicationEventHandler; private final Time time; private final AtomicReference> groupMetadata = new AtomicReference<>(Optional.empty()); - private final KafkaConsumerMetrics kafkaConsumerMetrics; + private final AsyncConsumerMetrics kafkaConsumerMetrics; private Logger log; private final String clientId; private final BlockingQueue backgroundEventQueue; + private final BackgroundEventHandler backgroundEventHandler; private final BackgroundEventProcessor backgroundEventProcessor; private final CompletableEventReaper backgroundEventReaper; private final Deserializers deserializers; @@ -320,6 +320,7 @@ public void onGroupAssignmentUpdated(Set partitions) { this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); this.clientTelemetryReporter.ifPresent(reporters::add); this.metrics = createMetrics(config, time, reporters); + this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); List> interceptorList = configuredConsumerInterceptors(config); @@ -339,7 +340,11 @@ public void onGroupAssignmentUpdated(Set partitions) { ApiVersions apiVersions = new ApiVersions(); final BlockingQueue applicationEventQueue = new LinkedBlockingQueue<>(); - final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); + this.backgroundEventHandler = new BackgroundEventHandler( + backgroundEventQueue, + time, + kafkaConsumerMetrics + ); // This FetchBuffer is shared between the application and network threads. this.fetchBuffer = new FetchBuffer(logContext); @@ -352,7 +357,9 @@ public void onGroupAssignmentUpdated(Set partitions) { fetchMetricsManager.throttleTimeSensor(), clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), backgroundEventHandler, - false); + false, + kafkaConsumerMetrics + ); this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); this.groupMetadata.set(initializeGroupMetadata(config, groupRebalanceConfig)); final Supplier requestManagersSupplier = RequestManagers.supplier(time, @@ -382,7 +389,9 @@ public void onGroupAssignmentUpdated(Set partitions) { new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, - requestManagersSupplier); + requestManagersSupplier, + kafkaConsumerMetrics + ); this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker( logContext, @@ -402,8 +411,6 @@ public void onGroupAssignmentUpdated(Set partitions) { fetchMetricsManager, time); - this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, CONSUMER_METRIC_GROUP_PREFIX); - if (groupMetadata.get().isPresent() && GroupProtocol.of(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)) == GroupProtocol.CONSUMER) { config.ignore(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG); // Used by background thread @@ -460,10 +467,15 @@ public void onGroupAssignmentUpdated(Set partitions) { this.defaultApiTimeoutMs = Duration.ofMillis(defaultApiTimeoutMs); this.deserializers = deserializers; this.applicationEventHandler = applicationEventHandler; - this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer"); + this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics); this.clientTelemetryReporter = Optional.empty(); this.autoCommitEnabled = autoCommitEnabled; this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); + this.backgroundEventHandler = new BackgroundEventHandler( + backgroundEventQueue, + time, + kafkaConsumerMetrics + ); } AsyncKafkaConsumer(LogContext logContext, @@ -498,7 +510,7 @@ public void onGroupAssignmentUpdated(Set partitions) { deserializers, fetchMetricsManager, time); - this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer"); + this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics); GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( config, @@ -509,7 +521,11 @@ public void onGroupAssignmentUpdated(Set partitions) { BlockingQueue applicationEventQueue = new LinkedBlockingQueue<>(); this.backgroundEventQueue = new LinkedBlockingQueue<>(); - BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); + this.backgroundEventHandler = new BackgroundEventHandler( + backgroundEventQueue, + time, + kafkaConsumerMetrics + ); this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker( logContext, subscriptions, @@ -524,7 +540,8 @@ public void onGroupAssignmentUpdated(Set partitions) { client, metadata, backgroundEventHandler, - false + false, + kafkaConsumerMetrics ); this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); Supplier requestManagersSupplier = RequestManagers.supplier( @@ -556,7 +573,8 @@ public void onGroupAssignmentUpdated(Set partitions) { new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, - requestManagersSupplier); + requestManagersSupplier, + kafkaConsumerMetrics); this.backgroundEventProcessor = new BackgroundEventProcessor(); this.backgroundEventReaper = new CompletableEventReaper(logContext); } @@ -571,7 +589,8 @@ ApplicationEventHandler build( final CompletableEventReaper applicationEventReaper, final Supplier applicationEventProcessorSupplier, final Supplier networkClientDelegateSupplier, - final Supplier requestManagersSupplier + final Supplier requestManagersSupplier, + final AsyncConsumerMetrics asyncConsumerMetrics ); } @@ -1941,25 +1960,30 @@ private void subscribeInternal(Collection topics, Optional firstError = new AtomicReference<>(); - LinkedList events = new LinkedList<>(); - backgroundEventQueue.drainTo(events); - - for (BackgroundEvent event : events) { - try { - if (event instanceof CompletableEvent) - backgroundEventReaper.add((CompletableEvent) event); - - backgroundEventProcessor.process(event); - } catch (Throwable t) { - KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t); - - if (!firstError.compareAndSet(null, e)) - log.warn("An error occurred when processing the background event: {}", e.getMessage(), e); + List events = backgroundEventHandler.drainEvents(); + if (!events.isEmpty()) { + long startMs = time.milliseconds(); + for (BackgroundEvent event : events) { + kafkaConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() - event.enqueuedMs()); + try { + if (event instanceof CompletableEvent) + backgroundEventReaper.add((CompletableEvent) event); + + backgroundEventProcessor.process(event); + } catch (Throwable t) { + KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t); + + if (!firstError.compareAndSet(null, e)) + log.warn("An error occurred when processing the background event: {}", e.getMessage(), e); + } } + kafkaConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds() - startMs); } backgroundEventReaper.reap(time.milliseconds()); @@ -2088,7 +2112,7 @@ public Metrics metricsRegistry() { } @Override - public KafkaConsumerMetrics kafkaConsumerMetrics() { + public AsyncConsumerMetrics kafkaConsumerMetrics() { return kafkaConsumerMetrics; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index 7ca01ce257831..0e7b58acc2158 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.internals.IdempotentCloser; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.utils.KafkaThread; @@ -62,6 +63,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { private final Supplier applicationEventProcessorSupplier; private final Supplier networkClientDelegateSupplier; private final Supplier requestManagersSupplier; + private final AsyncConsumerMetrics asyncConsumerMetrics; private ApplicationEventProcessor applicationEventProcessor; private NetworkClientDelegate networkClientDelegate; private RequestManagers requestManagers; @@ -69,6 +71,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { private final IdempotentCloser closer = new IdempotentCloser(); private volatile Duration closeTimeout = Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS); private volatile long cachedMaximumTimeToWait = MAX_POLL_TIMEOUT_MS; + private long lastPollTimeMs = 0L; public ConsumerNetworkThread(LogContext logContext, Time time, @@ -76,7 +79,8 @@ public ConsumerNetworkThread(LogContext logContext, CompletableEventReaper applicationEventReaper, Supplier applicationEventProcessorSupplier, Supplier networkClientDelegateSupplier, - Supplier requestManagersSupplier) { + Supplier requestManagersSupplier, + AsyncConsumerMetrics asyncConsumerMetrics) { super(BACKGROUND_THREAD_NAME, true); this.time = time; this.log = logContext.logger(getClass()); @@ -86,6 +90,7 @@ public ConsumerNetworkThread(LogContext logContext, this.networkClientDelegateSupplier = networkClientDelegateSupplier; this.requestManagersSupplier = requestManagersSupplier; this.running = true; + this.asyncConsumerMetrics = asyncConsumerMetrics; } @Override @@ -141,6 +146,11 @@ void runOnce() { processApplicationEvents(); final long currentTimeMs = time.milliseconds(); + if (lastPollTimeMs != 0L) { + asyncConsumerMetrics.recordTimeBetweenNetworkThreadPoll(currentTimeMs - lastPollTimeMs); + } + lastPollTimeMs = currentTimeMs; + final long pollWaitTimeMs = requestManagers.entries().stream() .filter(Optional::isPresent) .map(Optional::get) @@ -166,8 +176,13 @@ void runOnce() { private void processApplicationEvents() { LinkedList events = new LinkedList<>(); applicationEventQueue.drainTo(events); + if (events.isEmpty()) + return; + asyncConsumerMetrics.recordApplicationEventQueueSize(0); + long startMs = time.milliseconds(); for (ApplicationEvent event : events) { + asyncConsumerMetrics.recordApplicationEventQueueTime(time.milliseconds() - event.enqueuedMs()); try { if (event instanceof CompletableEvent) { applicationEventReaper.add((CompletableEvent) event); @@ -181,6 +196,7 @@ private void processApplicationEvents() { log.warn("Error processing event {}", t.getMessage(), t); } } + asyncConsumerMetrics.recordApplicationEventQueueProcessingTime(time.milliseconds() - startMs); } /** @@ -189,7 +205,7 @@ private void processApplicationEvents() { * is given least one attempt to satisfy any network requests before checking if a timeout has expired. */ private void reapExpiredApplicationEvents(long currentTimeMs) { - applicationEventReaper.reap(currentTimeMs); + asyncConsumerMetrics.recordApplicationEventExpiredSize(applicationEventReaper.reap(currentTimeMs)); } /** @@ -326,7 +342,7 @@ void cleanup() { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { sendUnsentRequests(timer); - applicationEventReaper.reap(applicationEventQueue); + asyncConsumerMetrics.recordApplicationEventExpiredSize(applicationEventReaper.reap(applicationEventQueue)); closeQuietly(requestManagers, "request managers"); closeQuietly(networkClientDelegate, "network client delegate"); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java index d803b66780bfb..e4b0fa924c0d2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java @@ -68,6 +68,7 @@ public final class ConsumerUtils { public static final String CONSUMER_SHARE_METRIC_GROUP_PREFIX = "consumer-share"; public static final String COORDINATOR_METRICS_SUFFIX = "-coordinator-metrics"; public static final String CONSUMER_METRICS_SUFFIX = "-metrics"; + public static final String CONSUMER_METRIC_GROUP = CONSUMER_METRIC_GROUP_PREFIX + CONSUMER_METRICS_SUFFIX; /** * A fixed, large enough value will suffice for max. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 6c1ab43a4f253..3c280e39d0279 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; @@ -71,6 +72,7 @@ public class NetworkClientDelegate implements AutoCloseable { private final long retryBackoffMs; private Optional metadataError; private final boolean notifyMetadataErrorsViaErrorQueue; + private final AsyncConsumerMetrics asyncConsumerMetrics; public NetworkClientDelegate( final Time time, @@ -79,7 +81,8 @@ public NetworkClientDelegate( final KafkaClient client, final Metadata metadata, final BackgroundEventHandler backgroundEventHandler, - final boolean notifyMetadataErrorsViaErrorQueue) { + final boolean notifyMetadataErrorsViaErrorQueue, + final AsyncConsumerMetrics asyncConsumerMetrics) { this.time = time; this.client = client; this.metadata = metadata; @@ -90,6 +93,7 @@ public NetworkClientDelegate( this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); this.metadataError = Optional.empty(); this.notifyMetadataErrorsViaErrorQueue = notifyMetadataErrorsViaErrorQueue; + this.asyncConsumerMetrics = asyncConsumerMetrics; } // Visible for testing @@ -149,6 +153,7 @@ public void poll(final long timeoutMs, final long currentTimeMs) { this.client.poll(pollTimeoutMs, currentTimeMs); maybePropagateMetadataError(); checkDisconnects(currentTimeMs); + asyncConsumerMetrics.recordUnsentRequestsQueueSize(unsentRequests.size(), currentTimeMs); } private void maybePropagateMetadataError() { @@ -182,6 +187,7 @@ private void trySend(final long currentTimeMs) { unsent.timer.update(currentTimeMs); if (unsent.timer.isExpired()) { iterator.remove(); + asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - unsent.enqueueTimeMs()); unsent.handler.onFailure(currentTimeMs, new TimeoutException( "Failed to send request after " + unsent.timer.timeoutMs() + " ms.")); continue; @@ -192,6 +198,7 @@ private void trySend(final long currentTimeMs) { continue; } iterator.remove(); + asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - unsent.enqueueTimeMs()); } } @@ -219,6 +226,7 @@ protected void checkDisconnects(final long currentTimeMs) { UnsentRequest u = iter.next(); if (u.node.isPresent() && client.connectionFailed(u.node.get())) { iter.remove(); + asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs()); AuthenticationException authenticationException = client.authenticationException(u.node.get()); u.handler.onFailure(currentTimeMs, authenticationException); } @@ -282,6 +290,7 @@ public void addAll(final List requests) { public void add(final UnsentRequest r) { Objects.requireNonNull(r); r.setTimer(this.time, this.requestTimeoutMs); + r.setEnqueueTimeMs(time.milliseconds()); unsentRequests.add(r); } @@ -315,6 +324,7 @@ public static class UnsentRequest { private final Optional node; // empty if random node can be chosen private Timer timer; + private long enqueueTimeMs; // time when the request was enqueued to unsentRequests, not duration in the queue. public UnsentRequest(final AbstractRequest.Builder requestBuilder, final Optional node) { @@ -332,6 +342,20 @@ Timer timer() { return timer; } + /** + * Set the time when the request was enqueued to {@link NetworkClientDelegate#unsentRequests}. + */ + private void setEnqueueTimeMs(final long enqueueTimeMs) { + this.enqueueTimeMs = enqueueTimeMs; + } + + /** + * Return the time when the request was enqueued to {@link NetworkClientDelegate#unsentRequests}. + */ + private long enqueueTimeMs() { + return enqueueTimeMs; + } + CompletableFuture future() { return handler.future; } @@ -428,7 +452,8 @@ public static Supplier supplier(final Time time, final Sensor throttleTimeSensor, final ClientTelemetrySender clientTelemetrySender, final BackgroundEventHandler backgroundEventHandler, - final boolean notifyMetadataErrorsViaErrorQueue) { + final boolean notifyMetadataErrorsViaErrorQueue, + final AsyncConsumerMetrics asyncConsumerMetrics) { return new CachedSupplier<>() { @Override protected NetworkClientDelegate create() { @@ -442,7 +467,7 @@ protected NetworkClientDelegate create() { metadata, throttleTimeSensor, clientTelemetrySender); - return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue); + return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics); } }; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 0ca371253bcbd..4a39c75745e6d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -47,6 +47,7 @@ import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent; import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.clients.consumer.internals.metrics.KafkaShareConsumerMetrics; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -160,6 +161,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) { private final ApplicationEventHandler applicationEventHandler; private final Time time; private final KafkaShareConsumerMetrics kafkaShareConsumerMetrics; + private final AsyncConsumerMetrics asyncConsumerMetrics; private Logger log; private final String clientId; private final String groupId; @@ -252,6 +254,7 @@ private enum AcknowledgementMode { this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); this.clientTelemetryReporter.ifPresent(reporters::add); this.metrics = createMetrics(config, time, reporters); + this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics); this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer); this.currentFetch = ShareFetch.empty(); @@ -266,7 +269,8 @@ private enum AcknowledgementMode { ShareFetchMetricsManager shareFetchMetricsManager = createShareFetchMetricsManager(metrics); ApiVersions apiVersions = new ApiVersions(); final BlockingQueue applicationEventQueue = new LinkedBlockingQueue<>(); - final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); + final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler( + backgroundEventQueue, time, asyncConsumerMetrics); // This FetchBuffer is shared between the application and network threads. this.fetchBuffer = new ShareFetchBuffer(logContext); @@ -280,7 +284,8 @@ private enum AcknowledgementMode { shareFetchMetricsManager.throttleTimeSensor(), clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), backgroundEventHandler, - true + true, + asyncConsumerMetrics ); this.completedAcknowledgements = new LinkedList<>(); @@ -311,7 +316,8 @@ private enum AcknowledgementMode { new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, - requestManagersSupplier); + requestManagersSupplier, + asyncConsumerMetrics); this.backgroundEventProcessor = new BackgroundEventProcessor(); this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext); @@ -373,13 +379,15 @@ private enum AcknowledgementMode { new FetchConfig(config), deserializers); this.kafkaShareConsumerMetrics = new KafkaShareConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX); + this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics); final BlockingQueue applicationEventQueue = new LinkedBlockingQueue<>(); final BlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); - final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); + final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler( + backgroundEventQueue, time, asyncConsumerMetrics); final Supplier networkClientDelegateSupplier = - () -> new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, true); + () -> new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, true, asyncConsumerMetrics); GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( config, @@ -412,7 +420,8 @@ private enum AcknowledgementMode { new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, - requestManagersSupplier); + requestManagersSupplier, + asyncConsumerMetrics); this.backgroundEventQueue = new LinkedBlockingQueue<>(); this.backgroundEventProcessor = new BackgroundEventProcessor(); @@ -458,6 +467,7 @@ private enum AcknowledgementMode { this.kafkaShareConsumerMetrics = new KafkaShareConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX); this.clientTelemetryReporter = Optional.empty(); this.completedAcknowledgements = Collections.emptyList(); + this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics); } // auxiliary interface for testing @@ -470,7 +480,8 @@ ApplicationEventHandler build( final CompletableEventReaper applicationEventReaper, final Supplier applicationEventProcessorSupplier, final Supplier networkClientDelegateSupplier, - final Supplier requestManagersSupplier + final Supplier requestManagersSupplier, + final AsyncConsumerMetrics asyncConsumerMetrics ); } @@ -855,6 +866,7 @@ private void close(final Duration timeout, final boolean swallowException) { backgroundEventReaper.reap(backgroundEventQueue); closeQuietly(kafkaShareConsumerMetrics, "kafka share consumer metrics", firstException); + closeQuietly(asyncConsumerMetrics, "kafka async consumer metrics", firstException); closeQuietly(metrics, "consumer metrics", firstException); closeQuietly(deserializers, "consumer deserializers", firstException); clientTelemetryReporter.ifPresent(reporter -> closeQuietly(reporter, "consumer telemetry reporter", firstException)); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index c30c0c2edde4e..dfb775f8947c1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -51,6 +51,12 @@ public enum Type { */ private final Uuid id; + /** + * The time in milliseconds when this event was enqueued. + * This field can be changed after the event is created, so it should not be used in hashCode or equals. + */ + private long enqueuedMs; + protected ApplicationEvent(Type type) { this.type = Objects.requireNonNull(type); this.id = Uuid.randomUuid(); @@ -64,6 +70,14 @@ public Uuid id() { return id; } + public void setEnqueuedMs(long enqueuedMs) { + this.enqueuedMs = enqueuedMs; + } + + public long enqueuedMs() { + return enqueuedMs; + } + @Override public final boolean equals(Object o) { if (this == o) return true; @@ -78,7 +92,7 @@ public final int hashCode() { } protected String toStringBase() { - return "type=" + type + ", id=" + id; + return "type=" + type + ", id=" + id + ", enqueuedMs=" + enqueuedMs; } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java index 0baafcd3038d1..dd6a1666c7b71 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerUtils; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; import org.apache.kafka.clients.consumer.internals.RequestManagers; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.internals.IdempotentCloser; import org.apache.kafka.common.utils.LogContext; @@ -42,9 +43,11 @@ public class ApplicationEventHandler implements Closeable { private final Logger log; + private final Time time; private final BlockingQueue applicationEventQueue; private final ConsumerNetworkThread networkThread; private final IdempotentCloser closer = new IdempotentCloser(); + private final AsyncConsumerMetrics asyncConsumerMetrics; public ApplicationEventHandler(final LogContext logContext, final Time time, @@ -52,16 +55,20 @@ public ApplicationEventHandler(final LogContext logContext, final CompletableEventReaper applicationEventReaper, final Supplier applicationEventProcessorSupplier, final Supplier networkClientDelegateSupplier, - final Supplier requestManagersSupplier) { + final Supplier requestManagersSupplier, + final AsyncConsumerMetrics asyncConsumerMetrics) { this.log = logContext.logger(ApplicationEventHandler.class); + this.time = time; this.applicationEventQueue = applicationEventQueue; + this.asyncConsumerMetrics = asyncConsumerMetrics; this.networkThread = new ConsumerNetworkThread(logContext, time, applicationEventQueue, applicationEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, - requestManagersSupplier); + requestManagersSupplier, + asyncConsumerMetrics); this.networkThread.start(); } @@ -73,7 +80,9 @@ public ApplicationEventHandler(final LogContext logContext, */ public void add(final ApplicationEvent event) { Objects.requireNonNull(event, "ApplicationEvent provided to add must be non-null"); + event.setEnqueuedMs(time.milliseconds()); applicationEventQueue.add(event); + asyncConsumerMetrics.recordApplicationEventQueueSize(applicationEventQueue.size()); wakeupNetworkThread(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java index 7e9fdaed2d837..02fc4b4a29ba4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java @@ -38,6 +38,12 @@ public enum Type { */ private final Uuid id; + /** + * The time in milliseconds when this event was enqueued. + * This field can be changed after the event is created, so it should not be used in hashCode or equals. + */ + private long enqueuedMs; + protected BackgroundEvent(Type type) { this.type = Objects.requireNonNull(type); this.id = Uuid.randomUuid(); @@ -51,6 +57,14 @@ public Uuid id() { return id; } + public void setEnqueuedMs(long enqueuedMs) { + this.enqueuedMs = enqueuedMs; + } + + public long enqueuedMs() { + return enqueuedMs; + } + @Override public final boolean equals(Object o) { if (this == o) return true; @@ -65,7 +79,7 @@ public final int hashCode() { } protected String toStringBase() { - return "type=" + type + ", id=" + id; + return "type=" + type + ", id=" + id + ", enqueuedMs=" + enqueuedMs; } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java index f6ded0bf735e0..adc621d5f2e29 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java @@ -17,9 +17,13 @@ package org.apache.kafka.clients.consumer.internals.events; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; +import org.apache.kafka.common.utils.Time; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; -import java.util.Queue; +import java.util.concurrent.BlockingQueue; /** * An event handler that receives {@link BackgroundEvent background events} from the @@ -29,10 +33,16 @@ public class BackgroundEventHandler { - private final Queue backgroundEventQueue; + private final BlockingQueue backgroundEventQueue; + private final Time time; + private final AsyncConsumerMetrics asyncConsumerMetrics; - public BackgroundEventHandler(final Queue backgroundEventQueue) { + public BackgroundEventHandler(final BlockingQueue backgroundEventQueue, + final Time time, + final AsyncConsumerMetrics asyncConsumerMetrics) { this.backgroundEventQueue = backgroundEventQueue; + this.time = time; + this.asyncConsumerMetrics = asyncConsumerMetrics; } /** @@ -42,6 +52,20 @@ public BackgroundEventHandler(final Queue backgroundEventQueue) */ public void add(BackgroundEvent event) { Objects.requireNonNull(event, "BackgroundEvent provided to add must be non-null"); + event.setEnqueuedMs(time.milliseconds()); backgroundEventQueue.add(event); + asyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size()); + } + + /** + * Drain all the {@link BackgroundEvent events} from the handler. + * + * @return A list of {@link BackgroundEvent events} that were drained + */ + public List drainEvents() { + List events = new ArrayList<>(); + backgroundEventQueue.drainTo(events); + asyncConsumerMetrics.recordBackgroundEventQueueSize(0); + return events; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java index 019526836af76..5a0358df8964f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java @@ -82,8 +82,9 @@ public void add(CompletableEvent event) { * * @param currentTimeMs Current time with which to compare against the * {@link CompletableEvent#deadlineMs() expiration time} + * @return The number of events that were expired */ - public void reap(long currentTimeMs) { + public long reap(long currentTimeMs) { Consumer> expireEvent = event -> { long pastDueMs = currentTimeMs - event.deadlineMs(); TimeoutException error = new TimeoutException(String.format("%s was %s ms past its expiration of %s", event.getClass().getSimpleName(), pastDueMs, event.deadlineMs())); @@ -96,13 +97,16 @@ public void reap(long currentTimeMs) { }; // First, complete (exceptionally) any events that have passed their deadline AND aren't already complete. - tracked.stream() + long count = tracked.stream() .filter(e -> !e.future().isDone()) .filter(e -> currentTimeMs >= e.deadlineMs()) - .forEach(expireEvent); + .peek(expireEvent) + .count(); // Second, remove any events that are already complete, just to make sure we don't hold references. This will // include any events that finished successfully as well as any events we just completed exceptionally above. tracked.removeIf(e -> e.future().isDone()); + + return count; } /** @@ -122,8 +126,9 @@ public void reap(long currentTimeMs) { * don't take the deadline into consideration, just close it regardless. * * @param events Events from a queue that have not yet been tracked that also need to be reviewed + * @return The number of events that were expired */ - public void reap(Collection events) { + public long reap(Collection events) { Objects.requireNonNull(events, "Event queue to reap must be non-null"); Consumer> expireEvent = event -> { @@ -136,17 +141,20 @@ public void reap(Collection events) { } }; - tracked.stream() + long trackedExpiredCount = tracked.stream() .filter(e -> !e.future().isDone()) - .forEach(expireEvent); + .peek(expireEvent) + .count(); tracked.clear(); - events.stream() + long eventExpiredCount = events.stream() .filter(e -> e instanceof CompletableEvent) .map(e -> (CompletableEvent) e) .filter(e -> !e.future().isDone()) - .forEach(expireEvent); + .peek(expireEvent) + .count(); events.clear(); + return trackedExpiredCount + eventExpiredCount; } public int size() { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java new file mode 100644 index 0000000000000..09e84cbe985cc --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.metrics; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Value; + +import java.util.Arrays; + +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX; + +public class AsyncConsumerMetrics extends KafkaConsumerMetrics implements AutoCloseable { + private final Metrics metrics; + + public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME = "time-between-network-thread-poll"; + public static final String APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME = "application-event-queue-size"; + public static final String APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME = "application-event-queue-time"; + public static final String APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = "application-event-queue-processing-time"; + public static final String APPLICATION_EVENT_EXPIRED_SIZE_SENSOR_NAME = "application-events-expired-count"; + public static final String BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME = "background-event-queue-size"; + public static final String BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME = "background-event-queue-time"; + public static final String BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = "background-event-queue-processing-time"; + public static final String UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME = "unsent-requests-queue-size"; + public static final String UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME = "unsent-requests-queue-time"; + private final Sensor timeBetweenNetworkThreadPollSensor; + private final Sensor applicationEventQueueSizeSensor; + private final Sensor applicationEventQueueTimeSensor; + private final Sensor applicationEventQueueProcessingTimeSensor; + private final Sensor applicationEventExpiredSizeSensor; + private final Sensor backgroundEventQueueSizeSensor; + private final Sensor backgroundEventQueueTimeSensor; + private final Sensor backgroundEventQueueProcessingTimeSensor; + private final Sensor unsentRequestsQueueSizeSensor; + private final Sensor unsentRequestsQueueTimeSensor; + + public AsyncConsumerMetrics(Metrics metrics) { + super(metrics, CONSUMER_METRIC_GROUP_PREFIX); + + this.metrics = metrics; + this.timeBetweenNetworkThreadPollSensor = metrics.sensor(TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME); + this.timeBetweenNetworkThreadPollSensor.add( + metrics.metricName( + "time-between-network-thread-poll-avg", + CONSUMER_METRIC_GROUP, + "The average time taken, in milliseconds, between each poll in the network thread." + ), + new Avg() + ); + this.timeBetweenNetworkThreadPollSensor.add( + metrics.metricName( + "time-between-network-thread-poll-max", + CONSUMER_METRIC_GROUP, + "The maximum time taken, in milliseconds, between each poll in the network thread." + ), + new Max() + ); + + this.applicationEventQueueSizeSensor = metrics.sensor(APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME); + this.applicationEventQueueSizeSensor.add( + metrics.metricName( + APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME, + CONSUMER_METRIC_GROUP, + "The current number of events in the queue to send from the application thread to the background thread." + ), + new Value() + ); + + this.applicationEventQueueTimeSensor = metrics.sensor(APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME); + this.applicationEventQueueTimeSensor.add( + metrics.metricName( + "application-event-queue-time-avg", + CONSUMER_METRIC_GROUP, + "The average time, in milliseconds, that application events are taking to be dequeued." + ), + new Avg() + ); + this.applicationEventQueueTimeSensor.add( + metrics.metricName( + "application-event-queue-time-max", + CONSUMER_METRIC_GROUP, + "The maximum time, in milliseconds, that an application event took to be dequeued." + ), + new Max() + ); + + this.applicationEventQueueProcessingTimeSensor = metrics.sensor(APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME); + this.applicationEventQueueProcessingTimeSensor.add( + metrics.metricName( + "application-event-queue-processing-time-avg", + CONSUMER_METRIC_GROUP, + "The average time, in milliseconds, that the background thread takes to process all available application events." + ), + new Avg() + ); + this.applicationEventQueueProcessingTimeSensor.add( + metrics.metricName("application-event-queue-processing-time-max", + CONSUMER_METRIC_GROUP, + "The maximum time, in milliseconds, that the background thread took to process all available application events." + ), + new Max() + ); + + this.applicationEventExpiredSizeSensor = metrics.sensor(APPLICATION_EVENT_EXPIRED_SIZE_SENSOR_NAME); + this.applicationEventExpiredSizeSensor.add( + metrics.metricName( + APPLICATION_EVENT_EXPIRED_SIZE_SENSOR_NAME, + CONSUMER_METRIC_GROUP, + "The current number of expired application events." + ), + new Value() + ); + + this.unsentRequestsQueueSizeSensor = metrics.sensor(UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME); + this.unsentRequestsQueueSizeSensor.add( + metrics.metricName( + UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME, + CONSUMER_METRIC_GROUP, + "The current number of unsent requests in the background thread." + ), + new Value() + ); + + this.unsentRequestsQueueTimeSensor = metrics.sensor(UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME); + this.unsentRequestsQueueTimeSensor.add( + metrics.metricName( + "unsent-requests-queue-time-avg", + CONSUMER_METRIC_GROUP, + "The average time, in milliseconds, that requests are taking to be sent in the background thread." + ), + new Avg() + ); + this.unsentRequestsQueueTimeSensor.add( + metrics.metricName( + "unsent-requests-queue-time-max", + CONSUMER_METRIC_GROUP, + "The maximum time, in milliseconds, that a request remained unsent in the background thread." + ), + new Max() + ); + + this.backgroundEventQueueSizeSensor = metrics.sensor(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME); + this.backgroundEventQueueSizeSensor.add( + metrics.metricName( + BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME, + CONSUMER_METRIC_GROUP, + "The current number of events in the queue to send from the background thread to the application thread." + ), + new Value() + ); + + this.backgroundEventQueueTimeSensor = metrics.sensor(BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME); + this.backgroundEventQueueTimeSensor.add( + metrics.metricName( + "background-event-queue-time-avg", + CONSUMER_METRIC_GROUP, + "The average time, in milliseconds, that background events are taking to be dequeued." + ), + new Avg() + ); + this.backgroundEventQueueTimeSensor.add( + metrics.metricName( + "background-event-queue-time-max", + CONSUMER_METRIC_GROUP, + "The maximum time, in milliseconds, that background events are taking to be dequeued." + ), + new Max() + ); + + this.backgroundEventQueueProcessingTimeSensor = metrics.sensor(BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME); + this.backgroundEventQueueProcessingTimeSensor.add( + metrics.metricName( + "background-event-queue-processing-time-avg", + CONSUMER_METRIC_GROUP, + "The average time, in milliseconds, that the consumer took to process all available background events." + ), + new Avg() + ); + this.backgroundEventQueueProcessingTimeSensor.add( + metrics.metricName( + "background-event-queue-processing-time-max", + CONSUMER_METRIC_GROUP, + "The maximum time, in milliseconds, that the consumer took to process all available background events." + ), + new Max() + ); + } + + public void recordTimeBetweenNetworkThreadPoll(long timeBetweenNetworkThreadPoll) { + this.timeBetweenNetworkThreadPollSensor.record(timeBetweenNetworkThreadPoll); + } + + public void recordApplicationEventQueueSize(int size) { + this.applicationEventQueueSizeSensor.record(size); + } + + public void recordApplicationEventQueueTime(long time) { + this.applicationEventQueueTimeSensor.record(time); + } + + public void recordApplicationEventQueueProcessingTime(long processingTime) { + this.applicationEventQueueProcessingTimeSensor.record(processingTime); + } + + public void recordApplicationEventExpiredSize(long size) { + this.applicationEventExpiredSizeSensor.record(size); + } + + public void recordUnsentRequestsQueueSize(int size, long timeMs) { + this.unsentRequestsQueueSizeSensor.record(size, timeMs); + } + + public void recordUnsentRequestsQueueTime(long time) { + this.unsentRequestsQueueTimeSensor.record(time); + } + + public void recordBackgroundEventQueueSize(int size) { + this.backgroundEventQueueSizeSensor.record(size); + } + + public void recordBackgroundEventQueueTime(long time) { + this.backgroundEventQueueTimeSensor.record(time); + } + + public void recordBackgroundEventQueueProcessingTime(long processingTime) { + this.backgroundEventQueueProcessingTimeSensor.record(processingTime); + } + + @Override + public void close() { + Arrays.asList( + timeBetweenNetworkThreadPollSensor.name(), + applicationEventQueueSizeSensor.name(), + applicationEventQueueTimeSensor.name(), + applicationEventQueueProcessingTimeSensor.name(), + applicationEventExpiredSizeSensor.name(), + backgroundEventQueueSizeSensor.name(), + backgroundEventQueueTimeSensor.name(), + backgroundEventQueueProcessingTimeSensor.name(), + unsentRequestsQueueSizeSensor.name(), + unsentRequestsQueueTimeSensor.name() + ).forEach(metrics::removeSensor); + super.close(); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java new file mode 100644 index 0000000000000..a8ce990a23d54 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; +import org.apache.kafka.clients.consumer.internals.events.PollEvent; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; + +public class ApplicationEventHandlerTest { + private final Time time = new MockTime(); + private final BlockingQueue applicationEventsQueue = new LinkedBlockingQueue<>(); + private final ApplicationEventProcessor applicationEventProcessor = mock(ApplicationEventProcessor.class); + private final NetworkClientDelegate networkClientDelegate = mock(NetworkClientDelegate.class); + private final RequestManagers requestManagers = mock(RequestManagers.class); + private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); + + @Test + public void testRecordApplicationEventQueueSize() { + try (Metrics metrics = new Metrics(); + AsyncConsumerMetrics asyncConsumerMetrics = new AsyncConsumerMetrics(metrics); + ApplicationEventHandler applicationEventHandler = new ApplicationEventHandler( + new LogContext(), + time, + applicationEventsQueue, + applicationEventReaper, + () -> applicationEventProcessor, + () -> networkClientDelegate, + () -> requestManagers, + asyncConsumerMetrics + )) { + // add event + applicationEventHandler.add(new PollEvent(time.milliseconds())); + assertEquals( + 1, + (double) metrics.metric( + metrics.metricName( + AsyncConsumerMetrics.APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME, + ConsumerUtils.CONSUMER_METRIC_GROUP + ) + ).metricValue() + ); + } + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 5239482bdd0b2..819365e9712f5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -57,6 +57,7 @@ import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.Node; @@ -133,6 +134,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED; import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST; import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; @@ -211,7 +213,7 @@ private AsyncKafkaConsumer newConsumer(Properties props) { new StringDeserializer(), new StringDeserializer(), time, - (a, b, c, d, e, f, g) -> applicationEventHandler, + (a, b, c, d, e, f, g, h) -> applicationEventHandler, a -> backgroundEventReaper, (a, b, c, d, e, f, g) -> fetchCollector, (a, b, c, d) -> metadata, @@ -225,7 +227,7 @@ private AsyncKafkaConsumer newConsumer(ConsumerConfig config) { new StringDeserializer(), new StringDeserializer(), time, - (a, b, c, d, e, f, g) -> applicationEventHandler, + (a, b, c, d, e, f, g, h) -> applicationEventHandler, a -> backgroundEventReaper, (a, b, c, d, e, f, g) -> fetchCollector, (a, b, c, d) -> metadata, @@ -358,7 +360,7 @@ public void testCommitted() { assertEquals(topicPartitionOffsets, consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000))); verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class)); final Metric metric = consumer.metrics() - .get(consumer.metricsRegistry().metricName("committed-time-ns-total", "consumer-metrics")); + .get(consumer.metricsRegistry().metricName("committed-time-ns-total", CONSUMER_METRIC_GROUP)); assertTrue((double) metric.metricValue() > 0); } @@ -1915,6 +1917,31 @@ public void testSubscribePatternAgainstBrokerNotSupportingRegex() throws Interru }, "Consumer did not throw the expected UnsupportedVersionException on poll"); } + @Test + public void testRecordBackgroundEventQueueSizeAndBackgroundEventQueueTime() { + consumer = newConsumer( + mock(FetchBuffer.class), + mock(ConsumerInterceptors.class), + mock(ConsumerRebalanceListenerInvoker.class), + mock(SubscriptionState.class), + "group-id", + "client-id", + false); + Metrics metrics = consumer.metricsRegistry(); + AsyncConsumerMetrics kafkaConsumerMetrics = consumer.kafkaConsumerMetrics(); + + ConsumerRebalanceListenerCallbackNeededEvent event = new ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, Collections.emptySortedSet()); + event.setEnqueuedMs(time.milliseconds()); + backgroundEventQueue.add(event); + kafkaConsumerMetrics.recordBackgroundEventQueueSize(1); + + time.sleep(10); + consumer.processBackgroundEvents(); + assertEquals(0, (double) metrics.metric(metrics.metricName("background-event-queue-size", CONSUMER_METRIC_GROUP)).metricValue()); + assertEquals(10, (double) metrics.metric(metrics.metricName("background-event-queue-time-avg", CONSUMER_METRIC_GROUP)).metricValue()); + assertEquals(10, (double) metrics.metric(metrics.metricName("background-event-queue-time-max", CONSUMER_METRIC_GROUP)).metricValue()); + } + private Map mockTopicPartitionOffset() { final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java new file mode 100644 index 0000000000000..63269b6f5542d --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.MockTime; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP; +import static org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics.BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class BackgroundEventHandlerTest { + private final BlockingQueue backgroundEventsQueue = new LinkedBlockingQueue<>(); + + @Test + public void testRecordBackgroundEventQueueSize() { + try (Metrics metrics = new Metrics(); + AsyncConsumerMetrics asyncConsumerMetrics = new AsyncConsumerMetrics(metrics)) { + BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler( + backgroundEventsQueue, + new MockTime(0), + asyncConsumerMetrics); + // add event + backgroundEventHandler.add(new ErrorEvent(new Throwable())); + assertEquals( + 1, + (double) metrics.metric( + metrics.metricName(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME, CONSUMER_METRIC_GROUP) + ).metricValue() + ); + + // drain event + backgroundEventHandler.drainEvents(); + assertEquals( + 0, + (double) metrics.metric( + metrics.metricName(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME, CONSUMER_METRIC_GROUP) + ).metricValue() + ); + } + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java index 9d09aee2697ca..9517e04e05456 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.clients.consumer.internals.metrics.ConsumerRebalanceMetricsManager; import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager; import org.apache.kafka.common.KafkaException; @@ -117,8 +118,8 @@ public void setup() { subscriptionState = mock(SubscriptionState.class); commitRequestManager = mock(CommitRequestManager.class); backgroundEventQueue = new LinkedBlockingQueue<>(); - backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); time = new MockTime(0); + backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue, time, mock(AsyncConsumerMetrics.class)); metrics = new Metrics(time); rebalanceMetricsManager = new ConsumerRebalanceMetricsManager(metrics); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index 0e85fe2d79950..520279fc8d454 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -19,6 +19,9 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; +import org.apache.kafka.clients.consumer.internals.events.PollEvent; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -40,6 +43,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -53,7 +57,7 @@ public class ConsumerNetworkThreadTest { private final Time time; - private final BlockingQueue applicationEventsQueue; + private final BlockingQueue applicationEventQueue; private final ApplicationEventProcessor applicationEventProcessor; private final OffsetsRequestManager offsetsRequestManager; private final ConsumerHeartbeatRequestManager heartbeatRequestManager; @@ -62,6 +66,7 @@ public class ConsumerNetworkThreadTest { private final NetworkClientDelegate networkClientDelegate; private final RequestManagers requestManagers; private final CompletableEventReaper applicationEventReaper; + private final AsyncConsumerMetrics asyncConsumerMetrics; ConsumerNetworkThreadTest() { this.networkClientDelegate = mock(NetworkClientDelegate.class); @@ -72,17 +77,19 @@ public class ConsumerNetworkThreadTest { this.applicationEventProcessor = mock(ApplicationEventProcessor.class); this.applicationEventReaper = mock(CompletableEventReaper.class); this.time = new MockTime(); - this.applicationEventsQueue = new LinkedBlockingQueue<>(); + this.applicationEventQueue = new LinkedBlockingQueue<>(); + this.asyncConsumerMetrics = mock(AsyncConsumerMetrics.class); LogContext logContext = new LogContext(); this.consumerNetworkThread = new ConsumerNetworkThread( logContext, time, - applicationEventsQueue, + applicationEventQueue, applicationEventReaper, () -> applicationEventProcessor, () -> networkClientDelegate, - () -> requestManagers + () -> requestManagers, + asyncConsumerMetrics ); } @@ -183,14 +190,18 @@ public void testMaximumTimeToWait() { public void testCleanupInvokesReaper() { LinkedList queue = new LinkedList<>(); when(networkClientDelegate.unsentRequests()).thenReturn(queue); + when(applicationEventReaper.reap(applicationEventQueue)).thenReturn(1L); consumerNetworkThread.cleanup(); - verify(applicationEventReaper).reap(applicationEventsQueue); + verify(applicationEventReaper).reap(applicationEventQueue); + verify(asyncConsumerMetrics).recordApplicationEventExpiredSize(1L); } @Test public void testRunOnceInvokesReaper() { + when(applicationEventReaper.reap(any(Long.class))).thenReturn(1L); consumerNetworkThread.runOnce(); verify(applicationEventReaper).reap(any(Long.class)); + verify(asyncConsumerMetrics).recordApplicationEventExpiredSize(1L); } @Test @@ -199,4 +210,82 @@ public void testSendUnsentRequests() { consumerNetworkThread.cleanup(); verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong()); } + + @Test + public void testRunOnceRecordTimeBetweenNetworkThreadPoll() { + try (Metrics metrics = new Metrics(); + AsyncConsumerMetrics asyncConsumerMetrics = new AsyncConsumerMetrics(metrics); + ConsumerNetworkThread consumerNetworkThread = new ConsumerNetworkThread( + new LogContext(), + time, + applicationEventQueue, + applicationEventReaper, + () -> applicationEventProcessor, + () -> networkClientDelegate, + () -> requestManagers, + asyncConsumerMetrics + )) { + consumerNetworkThread.initializeResources(); + + consumerNetworkThread.runOnce(); + time.sleep(10); + consumerNetworkThread.runOnce(); + assertEquals( + 10, + (double) metrics.metric( + metrics.metricName("time-between-network-thread-poll-avg", CONSUMER_METRIC_GROUP) + ).metricValue() + ); + assertEquals( + 10, + (double) metrics.metric( + metrics.metricName("time-between-network-thread-poll-max", CONSUMER_METRIC_GROUP) + ).metricValue() + ); + } + } + + @Test + public void testRunOnceRecordApplicationEventQueueSizeAndApplicationEventQueueTime() { + try (Metrics metrics = new Metrics(); + AsyncConsumerMetrics asyncConsumerMetrics = new AsyncConsumerMetrics(metrics); + ConsumerNetworkThread consumerNetworkThread = new ConsumerNetworkThread( + new LogContext(), + time, + applicationEventQueue, + applicationEventReaper, + () -> applicationEventProcessor, + () -> networkClientDelegate, + () -> requestManagers, + asyncConsumerMetrics + )) { + consumerNetworkThread.initializeResources(); + + PollEvent event = new PollEvent(0); + event.setEnqueuedMs(time.milliseconds()); + applicationEventQueue.add(event); + asyncConsumerMetrics.recordApplicationEventQueueSize(1); + + time.sleep(10); + consumerNetworkThread.runOnce(); + assertEquals( + 0, + (double) metrics.metric( + metrics.metricName("application-event-queue-size", CONSUMER_METRIC_GROUP) + ).metricValue() + ); + assertEquals( + 10, + (double) metrics.metric( + metrics.metricName("application-event-queue-time-avg", CONSUMER_METRIC_GROUP) + ).metricValue() + ); + assertEquals( + 10, + (double) metrics.metric( + metrics.metricName("application-event-queue-time-max", CONSUMER_METRIC_GROUP) + ).metricValue() + ); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index e97efd2b75feb..ab665aac69588 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.KafkaException; @@ -3779,7 +3780,7 @@ public TestableNetworkClientDelegate(Time time, Metadata metadata, BackgroundEventHandler backgroundEventHandler, boolean notifyMetadataErrorsViaErrorQueue) { - super(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue); + super(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class)); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java index 4177419fa534c..81eb5187fecfb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java @@ -23,11 +23,13 @@ import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; @@ -41,15 +43,17 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedList; import java.util.Objects; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -228,8 +232,8 @@ public void testPropagateMetadataErrorWithErrorEvent() { AuthenticationException authException = new AuthenticationException("Test Auth Exception"); doThrow(authException).when(metadata).maybeThrowAnyException(); - LinkedList backgroundEventQueue = new LinkedList<>(); - this.backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); + BlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); + this.backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue, time, mock(AsyncConsumerMetrics.class)); NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(true); assertEquals(0, backgroundEventQueue.size()); @@ -242,20 +246,59 @@ public void testPropagateMetadataErrorWithErrorEvent() { assertEquals(authException, ((ErrorEvent) event).error()); } + @Test + public void testRecordUnsentRequestsQueueTime() throws Exception { + try (Metrics metrics = new Metrics(); + AsyncConsumerMetrics asyncConsumerMetrics = new AsyncConsumerMetrics(metrics); + NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(false, asyncConsumerMetrics)) { + NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); + networkClientDelegate.add(unsentRequest); + asyncConsumerMetrics.recordUnsentRequestsQueueSize(1, time.milliseconds()); + + time.sleep(10); + long timeMs = time.milliseconds(); + networkClientDelegate.poll(0, timeMs); + assertEquals( + 0, + (double) metrics.metric( + metrics.metricName("unsent-requests-queue-size", CONSUMER_METRIC_GROUP) + ).metricValue() + ); + assertEquals( + 10, + (double) metrics.metric( + metrics.metricName("unsent-requests-queue-time-avg", CONSUMER_METRIC_GROUP) + ).metricValue() + ); + assertEquals( + 10, + (double) metrics.metric( + metrics.metricName("unsent-requests-queue-time-max", CONSUMER_METRIC_GROUP) + ).metricValue() + ); + } + } + public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue) { + return newNetworkClientDelegate(notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class)); + } + + public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue, AsyncConsumerMetrics asyncConsumerMetrics) { LogContext logContext = new LogContext(); Properties properties = new Properties(); properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(GROUP_ID_CONFIG, GROUP_ID); properties.put(REQUEST_TIMEOUT_MS_CONFIG, REQUEST_TIMEOUT_MS); - return new NetworkClientDelegate(this.time, + return new NetworkClientDelegate(time, new ConsumerConfig(properties), logContext, this.client, this.metadata, this.backgroundEventHandler, - notifyMetadataErrorsViaErrorQueue); + notifyMetadataErrorsViaErrorQueue, + asyncConsumerMetrics + ); } public NetworkClientDelegate.UnsentRequest newUnsentFindCoordinatorRequest() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index 7231e0b639029..1b1ed587203ef 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.KafkaException; @@ -113,6 +114,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -1656,7 +1658,7 @@ private void buildRequestManager(MetricConfig metricConfig, subscriptions, fetchConfig, deserializers); - BackgroundEventHandler backgroundEventHandler = new TestableBackgroundEventHandler(completedAcknowledgements); + BackgroundEventHandler backgroundEventHandler = new TestableBackgroundEventHandler(time, completedAcknowledgements); shareConsumeRequestManager = spy(new TestableShareConsumeRequestManager<>( logContext, groupId, @@ -1687,8 +1689,9 @@ private void buildDependencies(MetricConfig metricConfig, properties.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(requestTimeoutMs)); properties.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs)); ConsumerConfig config = new ConsumerConfig(properties); - networkClientDelegate = spy(new TestableNetworkClientDelegate(time, config, logContext, client, metadata, - new BackgroundEventHandler(new LinkedBlockingQueue<>()), false)); + networkClientDelegate = spy(new TestableNetworkClientDelegate( + time, config, logContext, client, metadata, + new BackgroundEventHandler(new LinkedBlockingQueue<>(), time, mock(AsyncConsumerMetrics.class)), false)); } private class TestableShareConsumeRequestManager extends ShareConsumeRequestManager { @@ -1742,7 +1745,7 @@ public TestableNetworkClientDelegate(Time time, Metadata metadata, BackgroundEventHandler backgroundEventHandler, boolean notifyMetadataErrorsViaErrorQueue) { - super(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue); + super(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class)); } @Override @@ -1837,8 +1840,8 @@ private void failUnsentRequests(Node node) { private static class TestableBackgroundEventHandler extends BackgroundEventHandler { List> completedAcknowledgements; - public TestableBackgroundEventHandler(List> completedAcknowledgements) { - super(new LinkedBlockingQueue<>()); + public TestableBackgroundEventHandler(Time time, List> completedAcknowledgements) { + super(new LinkedBlockingQueue<>(), time, mock(AsyncConsumerMetrics.class)); this.completedAcknowledgements = completedAcknowledgements; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java index 7fd0bc3fe60e5..04db229c8df35 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java @@ -122,7 +122,7 @@ private ShareConsumerImpl newConsumer(ConsumerConfig config) { new StringDeserializer(), new StringDeserializer(), time, - (a, b, c, d, e, f, g) -> applicationEventHandler, + (a, b, c, d, e, f, g, h) -> applicationEventHandler, a -> backgroundEventReaper, (a, b, c, d, e) -> fetchCollector, backgroundEventQueue diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java index eabcb8773e17f..71e44631ee8c8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java @@ -49,7 +49,7 @@ public void testExpired() { // Without any time passing, we check the reaper and verify that the event is not done amd is still // being tracked. - reaper.reap(time.milliseconds()); + assertEquals(0, reaper.reap(time.milliseconds())); assertFalse(event.future().isDone()); assertEquals(1, reaper.size()); @@ -62,7 +62,7 @@ public void testExpired() { // Call the reaper and validate that the event is now "done" (expired), the correct exception type is // thrown, and the event is no longer tracked. - reaper.reap(time.milliseconds()); + assertEquals(1, reaper.reap(time.milliseconds())); assertTrue(event.future().isDone()); assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event.future())); assertEquals(0, reaper.size()); @@ -77,7 +77,7 @@ public void testCompleted() { // Without any time passing, we check the reaper and verify that the event is not done amd is still // being tracked. - reaper.reap(time.milliseconds()); + assertEquals(0, reaper.reap(time.milliseconds())); assertFalse(event.future().isDone()); assertEquals(1, reaper.size()); @@ -91,7 +91,7 @@ public void testCompleted() { time.sleep(timeoutMs + 1); // Call the reaper and validate that the event is not considered expired, but is still no longer tracked. - reaper.reap(time.milliseconds()); + assertEquals(0, reaper.reap(time.milliseconds())); assertTrue(event.future().isDone()); assertNull(ConsumerUtils.getResult(event.future())); assertEquals(0, reaper.size()); @@ -108,7 +108,7 @@ public void testCompletedAndExpired() { // Without any time passing, we check the reaper and verify that the event is not done amd is still // being tracked. - reaper.reap(time.milliseconds()); + assertEquals(0, reaper.reap(time.milliseconds())); assertFalse(event1.future().isDone()); assertFalse(event2.future().isDone()); assertEquals(2, reaper.size()); @@ -124,7 +124,7 @@ public void testCompletedAndExpired() { // Validate that the first (completed) event is not expired, but the second one is expired. In either case, // both should be completed and neither should be tracked anymore. - reaper.reap(time.milliseconds()); + assertEquals(1, reaper.reap(time.milliseconds())); assertTrue(event1.future().isDone()); assertTrue(event2.future().isDone()); assertNull(ConsumerUtils.getResult(event1.future())); @@ -150,7 +150,7 @@ public void testIncompleteQueue() { assertEquals(2, queue.size()); // Go ahead and reap the incomplete from the queue. - reaper.reap(queue); + assertEquals(1, reaper.reap(queue)); // The first event was completed, so we didn't expire it in the reaper. assertTrue(event1.future().isDone()); @@ -186,7 +186,7 @@ public void testIncompleteTracked() { assertEquals(2, reaper.size()); // Go ahead and reap the incomplete events. Both sets should be zero after that. - reaper.reap(queue); + assertEquals(1, reaper.reap(queue)); assertEquals(0, reaper.size()); assertEquals(0, queue.size()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java new file mode 100644 index 0000000000000..2913bcfad70f1 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.metrics; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Metrics; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashSet; + +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class AsyncConsumerMetricsTest { + private static final long METRIC_VALUE = 123L; + + private final Metrics metrics = new Metrics(); + private AsyncConsumerMetrics consumerMetrics; + + @AfterEach + public void tearDown() { + if (consumerMetrics != null) { + consumerMetrics.close(); + } + metrics.close(); + } + + @Test + public void shouldMetricNames() { + // create + consumerMetrics = new AsyncConsumerMetrics(metrics); + HashSet expectedMetrics = new HashSet<>(Arrays.asList( + metrics.metricName("last-poll-seconds-ago", CONSUMER_METRIC_GROUP), + metrics.metricName("time-between-poll-avg", CONSUMER_METRIC_GROUP), + metrics.metricName("time-between-poll-max", CONSUMER_METRIC_GROUP), + metrics.metricName("poll-idle-ratio-avg", CONSUMER_METRIC_GROUP), + metrics.metricName("commit-sync-time-ns-total", CONSUMER_METRIC_GROUP), + metrics.metricName("committed-time-ns-total", CONSUMER_METRIC_GROUP) + )); + expectedMetrics.forEach( + metricName -> assertTrue( + metrics.metrics().containsKey(metricName), + "Missing metric: " + metricName + ) + ); + + HashSet expectedConsumerMetrics = new HashSet<>(Arrays.asList( + metrics.metricName("time-between-network-thread-poll-avg", CONSUMER_METRIC_GROUP), + metrics.metricName("time-between-network-thread-poll-max", CONSUMER_METRIC_GROUP), + metrics.metricName("application-event-queue-size", CONSUMER_METRIC_GROUP), + metrics.metricName("application-event-queue-time-avg", CONSUMER_METRIC_GROUP), + metrics.metricName("application-event-queue-time-max", CONSUMER_METRIC_GROUP), + metrics.metricName("application-event-queue-processing-time-avg", CONSUMER_METRIC_GROUP), + metrics.metricName("application-event-queue-processing-time-max", CONSUMER_METRIC_GROUP), + metrics.metricName("unsent-requests-queue-size", CONSUMER_METRIC_GROUP), + metrics.metricName("unsent-requests-queue-time-avg", CONSUMER_METRIC_GROUP), + metrics.metricName("unsent-requests-queue-time-max", CONSUMER_METRIC_GROUP), + metrics.metricName("background-event-queue-size", CONSUMER_METRIC_GROUP), + metrics.metricName("background-event-queue-time-avg", CONSUMER_METRIC_GROUP), + metrics.metricName("background-event-queue-time-max", CONSUMER_METRIC_GROUP), + metrics.metricName("background-event-queue-processing-time-avg", CONSUMER_METRIC_GROUP), + metrics.metricName("background-event-queue-processing-time-max", CONSUMER_METRIC_GROUP) + )); + expectedConsumerMetrics.forEach( + metricName -> assertTrue( + metrics.metrics().containsKey(metricName), + "Missing metric: " + metricName + ) + ); + + // close + consumerMetrics.close(); + expectedMetrics.forEach( + metricName -> assertFalse( + metrics.metrics().containsKey(metricName), + "Metric present after close: " + metricName + ) + ); + expectedConsumerMetrics.forEach( + metricName -> assertFalse( + metrics.metrics().containsKey(metricName), + "Metric present after close: " + metricName + ) + ); + } + + @Test + public void shouldRecordTimeBetweenNetworkThreadPoll() { + consumerMetrics = new AsyncConsumerMetrics(metrics); + // When: + consumerMetrics.recordTimeBetweenNetworkThreadPoll(METRIC_VALUE); + + // Then: + assertMetricValue("time-between-network-thread-poll-avg"); + assertMetricValue("time-between-network-thread-poll-max"); + } + + @Test + public void shouldRecordApplicationEventQueueSize() { + consumerMetrics = new AsyncConsumerMetrics(metrics); + // When: + consumerMetrics.recordApplicationEventQueueSize(10); + + // Then: + assertEquals( + metrics.metric( + metrics.metricName( + "application-event-queue-size", + CONSUMER_METRIC_GROUP + ) + ).metricValue(), + (double) 10 + ); + } + + @Test + public void shouldRecordApplicationEventQueueTime() { + consumerMetrics = new AsyncConsumerMetrics(metrics); + // When: + consumerMetrics.recordApplicationEventQueueTime(METRIC_VALUE); + + // Then: + assertMetricValue("application-event-queue-time-avg"); + assertMetricValue("application-event-queue-time-max"); + } + + @Test + public void shouldRecordApplicationEventQueueProcessingTime() { + consumerMetrics = new AsyncConsumerMetrics(metrics); + // When: + consumerMetrics.recordApplicationEventQueueProcessingTime(METRIC_VALUE); + + // Then: + assertMetricValue("application-event-queue-processing-time-avg"); + assertMetricValue("application-event-queue-processing-time-max"); + } + + @Test + public void shouldRecordUnsentRequestsQueueSize() { + consumerMetrics = new AsyncConsumerMetrics(metrics); + // When: + consumerMetrics.recordUnsentRequestsQueueSize(10, 100); + + // Then: + assertEquals( + metrics.metric( + metrics.metricName( + "unsent-requests-queue-size", + CONSUMER_METRIC_GROUP + ) + ).metricValue(), + (double) 10 + ); + } + + @Test + public void shouldRecordUnsentRequestsQueueTime() { + consumerMetrics = new AsyncConsumerMetrics(metrics); + // When: + consumerMetrics.recordUnsentRequestsQueueTime(METRIC_VALUE); + + // Then: + assertMetricValue("unsent-requests-queue-time-avg"); + assertMetricValue("unsent-requests-queue-time-max"); + } + + @Test + public void shouldRecordBackgroundEventQueueSize() { + consumerMetrics = new AsyncConsumerMetrics(metrics); + // When: + consumerMetrics.recordBackgroundEventQueueSize(10); + + // Then: + assertEquals( + metrics.metric( + metrics.metricName( + "background-event-queue-size", + CONSUMER_METRIC_GROUP + ) + ).metricValue(), + (double) 10 + ); + } + + @Test + public void shouldRecordBackgroundEventQueueTime() { + consumerMetrics = new AsyncConsumerMetrics(metrics); + // When: + consumerMetrics.recordBackgroundEventQueueTime(METRIC_VALUE); + + // Then: + assertMetricValue("background-event-queue-time-avg"); + assertMetricValue("background-event-queue-time-max"); + } + + @Test + public void shouldRecordBackgroundEventQueueProcessingTime() { + consumerMetrics = new AsyncConsumerMetrics(metrics); + // When: + consumerMetrics.recordBackgroundEventQueueProcessingTime(METRIC_VALUE); + + // Then: + assertMetricValue("background-event-queue-processing-time-avg"); + assertMetricValue("background-event-queue-processing-time-avg"); + } + + private void assertMetricValue(final String name) { + assertEquals( + metrics.metric( + metrics.metricName( + name, + CONSUMER_METRIC_GROUP + ) + ).metricValue(), + (double) METRIC_VALUE + ); + } +}