-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16143: New JMX metrics for AsyncKafkaConsumer #17199
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; | ||
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent; | ||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; | ||
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; | ||
import org.apache.kafka.common.internals.IdempotentCloser; | ||
import org.apache.kafka.common.requests.AbstractRequest; | ||
import org.apache.kafka.common.utils.KafkaThread; | ||
|
@@ -62,21 +63,24 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { | |
private final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier; | ||
private final Supplier<NetworkClientDelegate> networkClientDelegateSupplier; | ||
private final Supplier<RequestManagers> requestManagersSupplier; | ||
private final AsyncConsumerMetrics asyncConsumerMetrics; | ||
private ApplicationEventProcessor applicationEventProcessor; | ||
private NetworkClientDelegate networkClientDelegate; | ||
private RequestManagers requestManagers; | ||
private volatile boolean running; | ||
private final IdempotentCloser closer = new IdempotentCloser(); | ||
private volatile Duration closeTimeout = Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS); | ||
private volatile long cachedMaximumTimeToWait = MAX_POLL_TIMEOUT_MS; | ||
private long lastPollTimeMs = 0L; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is only ever written to and read from the same thread, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pinging on this. I believe it's only written on the background thread, but just want to be sure. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @kirktrue, this is only used for metrics |
||
|
||
public ConsumerNetworkThread(LogContext logContext, | ||
Time time, | ||
BlockingQueue<ApplicationEvent> applicationEventQueue, | ||
CompletableEventReaper applicationEventReaper, | ||
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier, | ||
Supplier<NetworkClientDelegate> networkClientDelegateSupplier, | ||
Supplier<RequestManagers> requestManagersSupplier) { | ||
Supplier<RequestManagers> requestManagersSupplier, | ||
AsyncConsumerMetrics asyncConsumerMetrics) { | ||
super(BACKGROUND_THREAD_NAME, true); | ||
this.time = time; | ||
this.log = logContext.logger(getClass()); | ||
|
@@ -86,6 +90,7 @@ public ConsumerNetworkThread(LogContext logContext, | |
this.networkClientDelegateSupplier = networkClientDelegateSupplier; | ||
this.requestManagersSupplier = requestManagersSupplier; | ||
this.running = true; | ||
this.asyncConsumerMetrics = asyncConsumerMetrics; | ||
} | ||
|
||
@Override | ||
|
@@ -141,6 +146,11 @@ void runOnce() { | |
processApplicationEvents(); | ||
|
||
final long currentTimeMs = time.milliseconds(); | ||
if (lastPollTimeMs != 0L) { | ||
asyncConsumerMetrics.recordTimeBetweenNetworkThreadPoll(currentTimeMs - lastPollTimeMs); | ||
} | ||
lastPollTimeMs = currentTimeMs; | ||
|
||
final long pollWaitTimeMs = requestManagers.entries().stream() | ||
.filter(Optional::isPresent) | ||
.map(Optional::get) | ||
|
@@ -166,8 +176,13 @@ void runOnce() { | |
private void processApplicationEvents() { | ||
LinkedList<ApplicationEvent> events = new LinkedList<>(); | ||
applicationEventQueue.drainTo(events); | ||
if (events.isEmpty()) | ||
return; | ||
|
||
asyncConsumerMetrics.recordApplicationEventQueueSize(0); | ||
long startMs = time.milliseconds(); | ||
for (ApplicationEvent event : events) { | ||
asyncConsumerMetrics.recordApplicationEventQueueTime(time.milliseconds() - event.enqueuedMs()); | ||
try { | ||
if (event instanceof CompletableEvent) { | ||
applicationEventReaper.add((CompletableEvent<?>) event); | ||
|
@@ -181,6 +196,7 @@ private void processApplicationEvents() { | |
log.warn("Error processing event {}", t.getMessage(), t); | ||
} | ||
} | ||
asyncConsumerMetrics.recordApplicationEventQueueProcessingTime(time.milliseconds() - startMs); | ||
} | ||
|
||
/** | ||
|
@@ -189,7 +205,7 @@ private void processApplicationEvents() { | |
* is given least one attempt to satisfy any network requests <em>before</em> checking if a timeout has expired. | ||
*/ | ||
private void reapExpiredApplicationEvents(long currentTimeMs) { | ||
applicationEventReaper.reap(currentTimeMs); | ||
asyncConsumerMetrics.recordApplicationEventExpiredSize(applicationEventReaper.reap(currentTimeMs)); | ||
} | ||
|
||
/** | ||
|
@@ -326,7 +342,7 @@ void cleanup() { | |
log.error("Unexpected error during shutdown. Proceed with closing.", e); | ||
} finally { | ||
sendUnsentRequests(timer); | ||
applicationEventReaper.reap(applicationEventQueue); | ||
asyncConsumerMetrics.recordApplicationEventExpiredSize(applicationEventReaper.reap(applicationEventQueue)); | ||
|
||
closeQuietly(requestManagers, "request managers"); | ||
closeQuietly(networkClientDelegate, "network client delegate"); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are we going to account for events which expired and are removed from the queue by the event reaper? They probably ought to be included in the metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't have this metric in original KIP. Do we want to add one?
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1068%3A+New+metrics+for+the+new+KafkaConsumer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, and if we agree on what we want we could just send an update in the KIP email thread to add it to the KIP and here.
To align internally first, I guess we would be interested in the num/avg of expired events, but we need to consider how that metric would go crazy and be a false alarm in cases like poll(0) right? Should we consider the expiration relevant only if there was a non-zero timeout? Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the only background event which has deadline is
ConsumerRebalanceListenerCallbackNeededEvent
, but its deadline isLong.MAX_VALUE
, so it will never be expired. Do we want to update it? If yes, I can create a Jira for it. If not, we may not need to add this metric. WDYT?kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java
Line 42 in 104fa57
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree that it's not applicable for background events because this callback needed is the only
CompletableBackgroundEvent
, and it's intentionally not expired (I don't think we need to change that)I would say that what might be interesting to know is expiration of Application events.
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
Line 184 in ee42644
There we do have lots of events with deadline, I guess that's what @AndrewJSchofield had in mind maybe? (I notice now that the initial comment was here on the reap of background events).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the background event reaper triggered my thought, but @lianetm is correct. The interesting part is the events with deadlines that are timing out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a new metric
application-event-expired-size
. After we all agree this metric, I will send a mail to original vote thread to mention the new metric.I also change
CompletableEventReaper#reap
to return expired event count, so we can record the value. Thanks.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this sounds like a useful metric to have. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense to me too. I would just suggest we name it
application-events-expired-count
.