Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16143: New JMX metrics for AsyncKafkaConsumer #17199

Merged
merged 1 commit into from
Dec 13, 2024

Conversation

FrankYang0529
Copy link
Member

Add following metrics to AsyncKafkaConsumer:

  • time-between-network-thread-poll-avg
  • time-between-network-thread-poll-max
  • application-event-queue-size
  • application-event-queue-time-avg
  • application-event-queue-time-max
  • application-event-queue-processing-time-avg
  • application-event-queue-processing-time-max
  • unsent-requests-queue-size
  • unsent-requests-queue-time-avg
  • unsent-requests-queue-time-max
  • background-event-queue-size
  • background-event-queue-time-avg
  • background-event-queue-time-max
  • background-event-queue-processing-time-avg
  • background-event-queue-processing-time-max

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@lianetm lianetm added consumer KIP-848 The Next Generation of the Consumer Rebalance Protocol ctr Consumer Threading Refactor (KIP-848) labels Sep 16, 2024
Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @FrankYang0529 for working on this!

I am intentionally not going to look at this too in-depth because it's a draft at the moment. However, it appears that the KafkaConsumerMetrics object is being used in both the application thread and background thread. From a quick glance, KafkaConsumerMetrics doesn't look thread-safe 🤔

If that's the case, we'll need to figure out how we can avoid potential synchronization issues.

Thanks!

@kirktrue
Copy link
Collaborator

kirktrue commented Oct 8, 2024

@FrankYang0529—I wanted to check in and see if you had any questions or needed anything to progress on this. Thanks!

@FrankYang0529
Copy link
Member Author

Hi @kirktrue, sorry for late. I will make this PR ready today.

@FrankYang0529
Copy link
Member Author

If that's the case, we'll need to figure out how we can avoid potential synchronization issues.

Hi @kirktrue, good catch. There is synchronized in Sensor#recordInternal. In this case, do we still need a lock in KafkaConsumerMetrics?

private void recordInternal(double value, long timeMs, boolean checkQuotas) {
this.lastRecordTime = timeMs;
synchronized (this) {
synchronized (metricLock()) {
// increment all the stats
for (StatAndConfig statAndConfig : this.stats) {
statAndConfig.stat.record(statAndConfig.config(), value, timeMs);
}
}
if (checkQuotas)
checkQuotas(timeMs);
}
for (Sensor parent : parents)
parent.record(value, timeMs, checkQuotas);
}

I wanted to check in and see if you had any questions or needed anything to progress on this.

The PR is almost ready. I will wait to see CI result. Thanks.

@FrankYang0529 FrankYang0529 marked this pull request as ready for review October 9, 2024 16:30
Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @FrankYang0529!

This looks pretty comprehensive. I haven't had time to dive as deep as I would like, but I left a first pass of comments.

Thanks!

Comment on lines 1756 to 1971
kafkaConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() - event.addedToQueueMs());
long startMs = time.milliseconds();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the metric determine how long it takes to process the background event queue, or how long it takes to process background events? If it's the latter, we want to update the metric inside each loop, but if it's the former we should update once outside the loop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this metric determine how long a background event is taking to be dequeued. In BackgroundEventHandler#add, we run BackgroundEvent#setAddedToQueueMs. When we start to process it, we can use current time - event.addedToQueueMs to know how long an event is in the queue.

@@ -139,6 +155,9 @@ void runOnce() {
processApplicationEvents();

final long currentTimeMs = time.milliseconds();
final long timeSinceLastPollMs = lastPollTimeMs != 0L ? currentTimeMs - lastPollTimeMs : currentTimeMs;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the first invocation of runOnce(), timeSinceLastPollMs will be something like 1728954137284. Is that correct?

Comment on lines 94 to 102
public ConsumerNetworkThread(LogContext logContext,
Time time,
BlockingQueue<ApplicationEvent> applicationEventQueue,
CompletableEventReaper applicationEventReaper,
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
Supplier<RequestManagers> requestManagersSupplier) {
this(logContext, time, applicationEventQueue, applicationEventReaper, applicationEventProcessorSupplier,
networkClientDelegateSupplier, requestManagersSupplier, Optional.empty());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to remove this constructor? Can't we call the existing constructor with Optional.empty()?

Comment on lines 94 to 101
public NetworkClientDelegate(
final Time time,
final ConsumerConfig config,
final LogContext logContext,
final KafkaClient client,
final Metadata metadata,
final BackgroundEventHandler backgroundEventHandler) {
this(time, config, logContext, client, metadata, backgroundEventHandler, Optional.empty());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to remove this constructor? Can't callers invoke the existing constructor with Optional.empty()?

Comment on lines 65 to 80
public void setAddedToQueueMs(long addedToQueueMs) {
this.addedToQueueMs = addedToQueueMs;
}

public long addedToQueueMs() {
return addedToQueueMs;
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void setAddedToQueueMs(long addedToQueueMs) {
this.addedToQueueMs = addedToQueueMs;
}
public long addedToQueueMs() {
return addedToQueueMs;
}
public void setEnqueuedMs(long enqueuedMs) {
enqueuedMs = enqueuedMs;
}
public long enqueuedMs() {
return enqueuedMs;
}

@@ -73,7 +90,9 @@ public ApplicationEventHandler(final LogContext logContext,
*/
public void add(final ApplicationEvent event) {
Objects.requireNonNull(event, "ApplicationEvent provided to add must be non-null");
event.setAddedToQueueMs(System.currentTimeMillis());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to get the current time in milliseconds from the Time object that was passed in to the constructor.

Comment on lines 338 to 358
void setAddedToQueueMs(final long addedToQueueMs) {
this.addedToQueueMs = addedToQueueMs;
}

long addedToQueueMs() {
return addedToQueueMs;
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's a bit nit-picky, but can we change it to:

Suggested change
void setAddedToQueueMs(final long addedToQueueMs) {
this.addedToQueueMs = addedToQueueMs;
}
long addedToQueueMs() {
return addedToQueueMs;
}
void setEnqueuedMs(final long enqueuedMs) {
this.enqueuedMs = enqueuedMs;
}
long enqueuedMs() {
return enqueuedMs;
}

Comment on lines 55 to 66
public void setAddedToQueueMs(long addedToQueueMs) {
this.addedToQueueMs = addedToQueueMs;
}

public long addedToQueueMs() {
return addedToQueueMs;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void setAddedToQueueMs(long addedToQueueMs) {
this.addedToQueueMs = addedToQueueMs;
}
public long addedToQueueMs() {
return addedToQueueMs;
}
public void setEnqueuedMs(long enqueuedMs) {
this. enqueuedMs = enqueuedMs;
}
public long enqueuedMs() {
return enqueuedMs;
}

Comment on lines 43 to 44
public BackgroundEventHandler(final Queue<BackgroundEvent> backgroundEventQueue) {
this(backgroundEventQueue, Optional.empty());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with the other changes. I'd rather make the callers pass in Optional.empty().

@@ -42,6 +51,8 @@ public BackgroundEventHandler(final Queue<BackgroundEvent> backgroundEventQueue)
*/
public void add(BackgroundEvent event) {
Objects.requireNonNull(event, "BackgroundEvent provided to add must be non-null");
event.setAddedToQueueMs(System.currentTimeMillis());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here: we need to update the constructor to provide a Time object and then use that here.

@FrankYang0529
Copy link
Member Author

Hi @kirktrue, I address all comments. Could you take a look when you have time? Thank you.

Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @FrankYang0529!

Overall, I think it's headed in the right direction. I have mostly minor requests, but the one that's slightly larger is class hierarchy around the KafkaConsumerMetrics. If you want to punt on that, file a Jira and I'll take care of it later.

I didn't get to the unit tests yet, though 😢

Thanks!

Comment on lines +65 to +75
public void setEnqueuedMs(long enqueuedMs) {
this.enqueuedMs = enqueuedMs;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that we're introducing mutability into all the events. I understand that this changes a lot less code than having the constructor take the creation time or something.

Can you add a comment to the enqueued variable that states that because of its mutability that it should not be used in hashCode() or equals() or toStringBase() (or something along those lines)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take the point about hashCode() or equals() but we probably should be able to see the enqueued time in the string representation of the events.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add enqueued time to toStringBase. Thanks.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably should be able to see the enqueued time in the string representation of the events

Agreed. Good point. Thanks @AndrewJSchofield.

@@ -37,6 +37,7 @@ public enum Type {
* {@link #equals(Object)} and can be used in log messages when debugging.
*/
private final Uuid id;
private long enqueuedMs;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as per above with ApplicationEvent.

kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueTime(time.milliseconds() - event.enqueuedMs()));
long startMs = time.milliseconds();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we swap the ordering of these two lines to avoid the extra call to time.milliseconds()?

Suggested change
kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueTime(time.milliseconds() - event.enqueuedMs()));
long startMs = time.milliseconds();
long startMs = time.milliseconds();
kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueTime(startMs - event.enqueuedMs()));

@@ -162,15 +171,20 @@ void runOnce() {
private void processApplicationEvents() {
LinkedList<ApplicationEvent> events = new LinkedList<>();
applicationEventQueue.drainTo(events);
kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueSize(applicationEventQueue.size()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we drain applicationEventQueue, it'll be empty, right? If, so why not just do:

Suggested change
kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueSize(applicationEventQueue.size()));
kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueSize(0));

private ApplicationEventProcessor applicationEventProcessor;
private NetworkClientDelegate networkClientDelegate;
private RequestManagers requestManagers;
private volatile boolean running;
private final IdempotentCloser closer = new IdempotentCloser();
private volatile Duration closeTimeout = Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS);
private volatile long cachedMaximumTimeToWait = MAX_POLL_TIMEOUT_MS;
private long lastPollTimeMs = 0L;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only ever written to and read from the same thread, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pinging on this. I believe it's only written on the background thread, but just want to be sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kirktrue, this is only used for metrics time-between-network-thread-poll-max and time-between-network-thread-poll-avg. Thanks.

@@ -42,6 +47,8 @@ public BackgroundEventHandler(final Queue<BackgroundEvent> backgroundEventQueue)
*/
public void add(BackgroundEvent event) {
Objects.requireNonNull(event, "BackgroundEvent provided to add must be non-null");
event.setEnqueuedMs(System.currentTimeMillis());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add a Time variable to this class so that we can do this instead:

Suggested change
event.setEnqueuedMs(System.currentTimeMillis());
event.setEnqueuedMs(time.milliseconds());

import java.util.concurrent.TimeUnit;

import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX;

public class KafkaConsumerMetrics implements AutoCloseable {
private final GroupProtocol groupProtocol;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to create a subclass of KafkaConsumerMetrics instead of adding group protocol-specific bits on here? I see that the ShareConsumerImpl uses a custom KafkaShareConsumerMetrics that at first glance is entirely a subset of KafkaConsumerMetrics, so there's some refactoring that could be done here.

Comment on lines 56 to 64
private Sensor timeBetweenNetworkThreadPollSensor;
private Sensor applicationEventQueueSizeSensor;
private Sensor applicationEventQueueTimeSensor;
private Sensor applicationEventQueueProcessingTimeSensor;
private Sensor backgroundEventQueueSizeSensor;
private Sensor backgroundEventQueueTimeSensor;
private Sensor backgroundEventQueueProcessingTimeSensor;
private Sensor unsentRequestsQueueSizeSensor;
private Sensor unsentRequestsQueueTimeSensor;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make these final too? It'll be a bit ugly in the else block of the constructor to mark them all as null. But that goes away if we make this a subclass.

Comment on lines 53 to 78
metricGroupName, "The number of seconds since the last poll() invocation.");
metricGroupName, "The number of seconds since the last poll() invocation.");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove these whitespace changes? They seem extraneous.

Comment on lines 262 to 275

if (groupProtocol == GroupProtocol.CONSUMER) {
Arrays.asList(
timeBetweenNetworkThreadPollSensor.name(),
applicationEventQueueSizeSensor.name(),
applicationEventQueueTimeSensor.name(),
applicationEventQueueProcessingTimeSensor.name(),
backgroundEventQueueSizeSensor.name(),
backgroundEventQueueTimeSensor.name(),
backgroundEventQueueProcessingTimeSensor.name(),
unsentRequestsQueueSizeSensor.name(),
unsentRequestsQueueTimeSensor.name()
).forEach(metrics::removeSensor);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another place that having a subclass would help.

@FrankYang0529 FrankYang0529 force-pushed the KAFKA-16143 branch 2 times, most recently from f121073 to 90af010 Compare November 1, 2024 07:07
@FrankYang0529 FrankYang0529 force-pushed the KAFKA-16143 branch 2 times, most recently from 5f171f3 to 68b27ad Compare November 11, 2024 02:46
@FrankYang0529 FrankYang0529 force-pushed the KAFKA-16143 branch 3 times, most recently from 20951a9 to 4ce1c45 Compare November 18, 2024 11:45
@lianetm
Copy link
Member

lianetm commented Nov 18, 2024

Hey @FrankYang0529, sorry I haven't had the bandwidth for this. I will be taking a look this week. Thanks!

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @FrankYang0529, thanks for taking over this one! Some initial comments.

backgroundEventQueue.add(event);
kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordBackgroundEventQueueSize(backgroundEventQueue.size()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change here makes sense to me, but makes me wonder if we should push it further. It would be helpful if we could try to keep all the updates for this queue size metric in this component that holds the queue, so we can easily maintain/track how "add" and "remove/drain" update that metric.

We could then use that drain from the processBackgroundEvents, instead of manually draining the queue and recording the metric there . What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree.

Comment on lines 1908 to 1910
LinkedList<BackgroundEvent> events = new LinkedList<>();
backgroundEventQueue.drainTo(events);
kafkaAsyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I was wondering if we could encapsulate a BackgroundEventHandler.drain or similar, that would take care of draining the queue and recording the metric (all metric updates done there consistently)

Suggested change
LinkedList<BackgroundEvent> events = new LinkedList<>();
backgroundEventQueue.drainTo(events);
kafkaAsyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size());
LinkedList<BackgroundEvent> events = backgroundEventHandler.drainBackgroundEvents();

Comment on lines 172 to 174
LinkedList<ApplicationEvent> events = new LinkedList<>();
applicationEventQueue.drainTo(events);
kafkaAsyncConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueSize(0));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keeping the symmetry with the background event, would it make sense to encapsulate these actions in the ApplicationEventHandler so that we keep that component responsible of add and drain the queue (including the metric actions related to those ops)?

It would mean that this ConsumerNetworkThread would keep the ref to the ApplicationEventHandler that has the queue (instead of directly having the queue like it does now), but that is already available, so I guess we just need to pass it in the constructor instead of the queue. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We initialize ConsumerNetworkThread in ApplicationEventHandler. If we want to reference ApplicationEventHandler in ConsumerNetworkThread, we have to give this as parameter. Probably, we can do some refactor in next PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good point, I forgot about that dependency! (feels kind of unexpected actually). Given that structure we shouldn't reference AppEventHandler in the ConsumerNetworkThread because we would end up with a circular dependency.

Sorry for the extra work, but I would suggest we revert this back to your initial change, without having a drainEvents, and we rethink this class structure in a separate jira, to see if it would make sense to decouple the AppEventHandler from the network thread (and if so then we could properly add a drainEvents, without circular deps). Makes sense?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a followup Jira for it: https://issues.apache.org/jira/browse/KAFKA-18048.

try {
if (event instanceof CompletableEvent)
applicationEventReaper.add((CompletableEvent<?>) event);

applicationEventProcessor.process(event);
} catch (Throwable t) {
log.warn("Error processing event {}", t.getMessage(), t);
} finally {
kafkaAsyncConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueProcessingTime(time.milliseconds() - startMs));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the best place to record this? From the description of the metric I get that we want to measure the time "that the consumer network takes to process all available application events". So wouldn't it be simpler to record the metric once per runOnce instead of recording it N times on each run? (startTime right before the loop over events, and ending/recording right after the loop).

I went to the KIP discussion thread to double check this interpretation, and this was the intention behind what was proposed (by me actually I discovered he he).

LM3. Thinking about the actual usage of "time-between-network-thread-poll-xxx" metric, I imagine it would be helpful to know more about what could be impacting it. As I see it, the network thread cadence could be mainly impacted by: 1- app event processing (generate requests), 2- network client poll (actual send/receive). For 2, the new consumer reuses the same component as the legacy one, but 1 is specific to the new consumer, so what about a metric for application-event-processing-time-ms (we could consider avg I would say). It would be the time that the network thread takes to process all available events on each run.

What do you think?

@@ -170,6 +174,7 @@ private void trySend(final long currentTimeMs) {
Iterator<UnsentRequest> iterator = unsentRequests.iterator();
while (iterator.hasNext()) {
UnsentRequest unsent = iterator.next();
kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordUnsentRequestsQueueTime(currentTimeMs - unsent.enqueuedMs()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recording this here means we would consider the request removed from the unsent queue even in the case where it cannot be sent and it actually stays in the unsent queue (!doSend), right? If so, I guess we should probably record this only when we do remove it from the queue with iterator.remove() (either because it's expired, or because we did sent it).

Also, shouldn't we record this same metric on checkDisconnects if the request is removed from the unsent queue because the node is disconnected?

this.applicationEventQueueProcessingTimeSensor.record(processingTime);
}

public void recordUnsentRequestsQueueSize(int size) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metric is about the size of the queue at a given time, so I expect we should have another param here timeMs, for the time where we read the metric, and we should pass it into the .record, that has an overload for it.

@@ -1911,6 +1922,8 @@ private boolean processBackgroundEvents() {

if (!firstError.compareAndSet(null, e))
log.warn("An error occurred when processing the background event: {}", e.getMessage(), e);
} finally {
kafkaAsyncConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds() - startMs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to recordApplicationEventQueueProcessingTime. The metric description states this is about the time "that the consumer took to process all available background events' . Shouldn't we simply take the time from right before the loop to right after it ends, and record the metric once per run of the processBackgroundEvents?

unsentRequests.add(r);
kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordUnsentRequestsQueueSize(unsentRequests.size()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still debating whether this is the best place to record this. We want snapshots in time of the queue size. Recording here has the limitation that we won't be recording when the size decreases (ie. requests sent, failed due to disconnections). So I wonder if recording this on poll, which is called regularly, would given a better view of the queue size?

The way add/poll are used from the ConsumerNetworkThread.runOnce they end up being called sequentially anyways, but I'm thinking about the case where, let's say managers are not returning any requests (so addAll is called with empty, add never called), but there could be unsent requests in the queue, that could be sent out, cancelled, time out, etc). Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we still record a metric in NetworkClientDelegate#add for increasing path. For decreasing path, we can record in NetworkClientDelegate#poll, because it covers both trySend and checkDisconnects. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to record the unsent event queue size at the end of a poll iteration, after events were added and removed/sent, that's truly what could help spot issues (too many requests being left "unsent" on each run). Then it actually makes me wonder about the value of also calling it on add, would it be useful to see the number of events added on each run? or just seeing how many where left unsent is all that matters?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we just want to see how many unsent requests left. We can just record at the end of NetworkClientDelegate#poll.

public static final String UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME = "unsent-requests-queue-size";
public static final String UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME = "unsent-requests-queue-time";
private final Sensor timeBetweenNetworkThreadPollSensor;
private final Sensor applicationEventQueueSizeSensor;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space before Sensor

@FrankYang0529 FrankYang0529 force-pushed the KAFKA-16143 branch 2 times, most recently from f64243c to 4187497 Compare November 26, 2024 14:00
Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @FrankYang0529. I left a few comments.

Out of necessity, there's a good amount of duplication of metric logic between the application and background thread queues. It would be good to include cleanup of the metrics in KAFKA-18048, if possible.

Thanks!

Comment on lines 355 to 356
backgroundEventHandler,
asyncConsumerMetrics);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: minor alignment issue:

Suggested change
backgroundEventHandler,
asyncConsumerMetrics);
backgroundEventHandler,
asyncConsumerMetrics);

Comment on lines 342 to 343
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue, time, asyncConsumerMetrics);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: for better or worse, we've adopted this style for multi-line parameter lists:

Suggested change
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue, time, asyncConsumerMetrics);
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue,
time,
asyncConsumerMetrics
);

Comment on lines 514 to 515
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue, time, asyncConsumerMetrics);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue, time, asyncConsumerMetrics);
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue,
time,
asyncConsumerMetrics
);

Comment on lines 217 to 216
private final KafkaConsumerMetrics kafkaConsumerMetrics;
private final AsyncConsumerMetrics asyncConsumerMetrics;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer it be left as kafkaConsumerMetrics. Calling out the distinction in the variable name isn't really adding anything (IMO).

Comment on lines +65 to +75
public void setEnqueuedMs(long enqueuedMs) {
this.enqueuedMs = enqueuedMs;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably should be able to see the enqueued time in the string representation of the events

Agreed. Good point. Thanks @AndrewJSchofield.

@@ -42,26 +44,32 @@
public class ApplicationEventHandler implements Closeable {

private final Logger log;
private final Time time;
private final BlockingQueue<ApplicationEvent> applicationEventQueue;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this is an area that could use some refactoring. Thanks for filing KAFKA-18048, @FrankYang0529.

* The time in milliseconds when this event was enqueued.
* This field can be changed after the event is created, so it should not be used in hashCode, equals, or toStringBase.
*/
private long enqueuedMs;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need the enqueuedMs in the toStringBase() method as per the ApplicationEvent’s method of the same name.

@FrankYang0529
Copy link
Member Author

Hi @AndrewJSchofield / @lianetm / @kirktrue, could you please review this PR when you have time? Thank you.

Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. I have just a few nitpicks. I'd also like to see a resolution to @AndrewJSchofield’s outstanding question, if possible.

Comment on lines 390 to 391
requestManagersSupplier,
kafkaConsumerMetrics
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: alignment.

Suggested change
requestManagersSupplier,
kafkaConsumerMetrics
requestManagersSupplier,
kafkaConsumerMetrics

Comment on lines 358 to 359
backgroundEventHandler,
kafkaConsumerMetrics
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: alignment.

Suggested change
backgroundEventHandler,
kafkaConsumerMetrics
backgroundEventHandler,
kafkaConsumerMetrics

Comment on lines 573 to 574
requestManagersSupplier,
kafkaConsumerMetrics);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: alignment.

Suggested change
requestManagersSupplier,
kafkaConsumerMetrics);
requestManagersSupplier,
kafkaConsumerMetrics);

private ApplicationEventProcessor applicationEventProcessor;
private NetworkClientDelegate networkClientDelegate;
private RequestManagers requestManagers;
private volatile boolean running;
private final IdempotentCloser closer = new IdempotentCloser();
private volatile Duration closeTimeout = Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS);
private volatile long cachedMaximumTimeToWait = MAX_POLL_TIMEOUT_MS;
private long lastPollTimeMs = 0L;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pinging on this. I believe it's only written on the background thread, but just want to be sure.

@FrankYang0529
Copy link
Member Author

Hi @kirktrue, I addressed all comments. Could you take a look when you have time? Thank you.

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @FrankYang0529!

}
kafkaConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds() - startMs);
}

backgroundEventReaper.reap(time.milliseconds());
Copy link
Member

@lianetm lianetm Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, and if we agree on what we want we could just send an update in the KIP email thread to add it to the KIP and here.

To align internally first, I guess we would be interested in the num/avg of expired events, but we need to consider how that metric would go crazy and be a false alarm in cases like poll(0) right? Should we consider the expiration relevant only if there was a non-zero timeout? Thoughts?

/**
* Set the time when the request was enqueued to {@link NetworkClientDelegate#unsentRequests}.
*/
void setEnqueueTimeMs(final long enqueueTimeMs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be private right?

/**
* Return the time when the request was enqueued to {@link NetworkClientDelegate#unsentRequests}.
*/
long enqueueTimeMs() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one is less sensitive, but if it's only used here as it seems we could consider private too

this.applicationEventQueueSizeSensor = metrics.sensor(APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME);
this.applicationEventQueueSizeSensor.add(metrics.metricName("application-event-queue-size",
metricGroupName,
"The current number of events in the consumer network application event queue."),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I guess that, in a time from now, even us that know this by heart will get tricked with if this is the outgoing or incoming queue. Should we be more explicit with something like

Suggested change
"The current number of events in the consumer network application event queue."),
"The current number of events in the queue to send from the application thread to the background thread."),

(and then we can consistently have the flipped version of the message for the background-event-queue-size metric)

Comment on lines 60 to 63
PollEvent event = new PollEvent(time.milliseconds());

// add event
applicationEventHandler.add(event);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
PollEvent event = new PollEvent(time.milliseconds());
// add event
applicationEventHandler.add(event);
// add event
applicationEventHandler.add(new PollEvent(time.milliseconds()));


// add event
applicationEventHandler.add(event);
assertEquals(1, (double) metrics.metric(metrics.metricName("application-event-queue-size", "consumer-metrics")).metricValue());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we reuse the metric name constants we already have?

Comment on lines 1976 to 1977
assertTrue((double) metrics.metric(metrics.metricName("background-event-queue-time-avg", "consumer-metrics")).metricValue() > 0);
assertTrue((double) metrics.metric(metrics.metricName("background-event-queue-time-max", "consumer-metrics")).metricValue() > 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't we be more precise here and expect >= 10?

@lianetm
Copy link
Member

lianetm commented Dec 9, 2024

Hello @FrankYang0529 , could you please solve the conflicts and address the minor comments left? Given that this is introducing new metrics we should meet the Feature freeze deadline which is this week so let's give it the final push. Thanks!

@FrankYang0529
Copy link
Member Author

Hi @lianetm / @AndrewJSchofield, thanks for the review and suggestion. I will update this PR and make it ready tomorrow. Sorry for late.

@FrankYang0529
Copy link
Member Author

Resolved almost all comments. Remaining two discussion thread:

  1. Add application expired event metric: KAFKA-16143: New JMX metrics for AsyncKafkaConsumer #17199 (comment)
  2. AsyncConsumerMetrics doesn't inherit KafkaConsumerMetrics: KAFKA-16143: New JMX metrics for AsyncKafkaConsumer #17199 (comment)

}
kafkaConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds() - startMs);
}

backgroundEventReaper.reap(time.milliseconds());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this sounds like a useful metric to have. Thanks!

private final Sensor unsentRequestsQueueSizeSensor;
private final Sensor unsentRequestsQueueTimeSensor;

public AsyncConsumerMetrics(Metrics metrics, String metricGrpPrefix) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I made a quick pass and I don't see anywhere that we're passing in anything other than CONSUMER_METRIC_GROUP_PREFIX or "consumer". Does it make sense to provide this parameter if the value is always the same?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this param will be "share" or similar when integrated with the ShareConsumer as @AndrewJSchofield suggested #17199 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lianetm, sorry, I didn't notice that. I addressed other comments and will leave this for https://issues.apache.org/jira/browse/KAFKA-18220.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good. Please also merge trunk latest changes when you address what't left for this PR. Thanks!

Comment on lines 83 to 84
expectedMetrics.forEach(metricName -> assertFalse(metrics.metrics().containsKey(metricName), "Missing metric: " + metricName));
expectedConsumerMetrics.forEach(metricName -> assertFalse(metrics.metrics().containsKey(metricName), "Missing metric: " + metricName));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message on test failure is incorrect here, right? Shouldn't it be:

Suggested change
expectedMetrics.forEach(metricName -> assertFalse(metrics.metrics().containsKey(metricName), "Missing metric: " + metricName));
expectedConsumerMetrics.forEach(metricName -> assertFalse(metrics.metrics().containsKey(metricName), "Missing metric: " + metricName));
expectedMetrics.forEach(metricName -> assertFalse(metrics.metrics().containsKey(metricName), "Metric present after close: " + metricName));
expectedConsumerMetrics.forEach(metricName -> assertFalse(metrics.metrics().containsKey(metricName), "Metric present after close: " + metricName));

private static final String CONSUMER_GROUP_PREFIX = "consumer";
private static final String CONSUMER_METRIC_GROUP = "consumer-metrics";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't these constants already declared elsewhere?

@FrankYang0529 FrankYang0529 force-pushed the KAFKA-16143 branch 2 times, most recently from ea1a661 to a54f409 Compare December 11, 2024 03:18
Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few last minor comments, and 2 follow-ups:

  1. https://issues.apache.org/jira/browse/KAFKA-18048
  2. refactor to allow ShareConsumer to leverage the new metrics.

With that it LGTM, but let's also wait to hear what @AndrewJSchofield and @kirktrue think. Thanks @FrankYang0529 !

private final Sensor unsentRequestsQueueSizeSensor;
private final Sensor unsentRequestsQueueTimeSensor;

public AsyncConsumerMetrics(Metrics metrics, String metricGrpPrefix) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this param will be "share" or similar when integrated with the ShareConsumer as @AndrewJSchofield suggested #17199 (comment)

@AndrewJSchofield
Copy link
Member

@lianetm @FrankYang0529 I have opened https://issues.apache.org/jira/browse/KAFKA-18220 to track any refactoring to this to improve the code structure where we want to use this for both the AsyncKafkaConsumer and ShareConsumerImpl. I'm happy that we take this for AK 4.1.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved, with the follow-up task https://issues.apache.org/jira/browse/KAFKA-18220 to address the code structure when used in the share consumer. @lianetm still has some outstanding comments I think.

@FrankYang0529 FrankYang0529 force-pushed the KAFKA-16143 branch 2 times, most recently from d5a1688 to c5e6fbb Compare December 12, 2024 15:22
Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @FrankYang0529 ! LGTM.

@lianetm lianetm merged commit 770d64d into apache:trunk Dec 13, 2024
10 checks passed
@lianetm
Copy link
Member

lianetm commented Dec 14, 2024

Hey @FrankYang0529 , please remember to send the email sharing the update for the expired event metric discovered with this implementation. I missed adding it to the follow ups I mentioned above :). Thanks!

@FrankYang0529 FrankYang0529 deleted the KAFKA-16143 branch December 14, 2024 16:26
@FrankYang0529
Copy link
Member Author

Hi @lianetm, thanks for reviewing this PR. I mentioned the metric in the vote thread.

https://lists.apache.org/thread/4sn92okp89pt3pzk6f0g3wp0lbtn0g2c

tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
clients consumer ctr Consumer Threading Refactor (KIP-848) KIP-848 The Next Generation of the Consumer Rebalance Protocol
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants