diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 521f82e5629f9..2a7db5b24663e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -47,6 +47,8 @@ import org.apache.kafka.clients.consumer.internals.events.CommitEvent; import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.CompletableEvent; +import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; @@ -100,6 +102,7 @@ import java.util.Collections; import java.util.ConcurrentModificationException; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -132,6 +135,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets; +import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; import static org.apache.kafka.common.utils.Utils.closeQuietly; import static org.apache.kafka.common.utils.Utils.isBlank; import static org.apache.kafka.common.utils.Utils.swallow; @@ -164,48 +168,14 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { *
  • {@link ConsumerRebalanceListener} callbacks that are to be executed on the application thread
  • * */ - private class BackgroundEventProcessor extends EventProcessor { + private class BackgroundEventProcessor implements EventProcessor { - private final ApplicationEventHandler applicationEventHandler; private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker; - public BackgroundEventProcessor(final LogContext logContext, - final BlockingQueue backgroundEventQueue, - final ApplicationEventHandler applicationEventHandler, - final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker) { - super(logContext, backgroundEventQueue); - this.applicationEventHandler = applicationEventHandler; + public BackgroundEventProcessor(final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker) { this.rebalanceListenerInvoker = rebalanceListenerInvoker; } - /** - * Process the events—if any—that were produced by the {@link ConsumerNetworkThread network thread}. - * It is possible that {@link ErrorEvent an error} - * could occur when processing the events. In such cases, the processor will take a reference to the first - * error, continue to process the remaining events, and then throw the first error that occurred. - */ - @Override - public boolean process() { - AtomicReference firstError = new AtomicReference<>(); - - ProcessHandler processHandler = (event, error) -> { - if (error.isPresent()) { - KafkaException e = error.get(); - - if (!firstError.compareAndSet(null, e)) { - log.warn("An error occurred when processing the event: {}", e.getMessage(), e); - } - } - }; - - boolean hadEvents = process(processHandler); - - if (firstError.get() != null) - throw firstError.get(); - - return hadEvents; - } - @Override public void process(final BackgroundEvent event) { switch (event.type()) { @@ -247,7 +217,9 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final KafkaConsumerMetrics kafkaConsumerMetrics; private Logger log; private final String clientId; + private final BlockingQueue backgroundEventQueue; private final BackgroundEventProcessor backgroundEventProcessor; + private final CompletableEventReaper backgroundEventReaper; private final Deserializers deserializers; /** @@ -294,6 +266,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { valueDeserializer, Time.SYSTEM, ApplicationEventHandler::new, + CompletableEventReaper::new, FetchCollector::new, ConsumerMetadata::new, new LinkedBlockingQueue<>() @@ -306,6 +279,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { final Deserializer valueDeserializer, final Time time, final ApplicationEventHandlerFactory applicationEventHandlerFactory, + final CompletableEventReaperFactory backgroundEventReaperFactory, final FetchCollectorFactory fetchCollectorFactory, final ConsumerMetadataFactory metadataFactory, final LinkedBlockingQueue backgroundEventQueue) { @@ -317,6 +291,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); LogContext logContext = createLogContext(config, groupRebalanceConfig); + this.backgroundEventQueue = backgroundEventQueue; this.log = logContext.logger(getClass()); log.debug("Initializing the Kafka consumer"); @@ -378,12 +353,12 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { ); final Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, metadata, - applicationEventQueue, requestManagersSupplier); this.applicationEventHandler = applicationEventHandlerFactory.build( logContext, time, applicationEventQueue, + new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier); @@ -395,11 +370,9 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { new RebalanceCallbackMetricsManager(metrics) ); this.backgroundEventProcessor = new BackgroundEventProcessor( - logContext, - backgroundEventQueue, - applicationEventHandler, rebalanceListenerInvoker ); + this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext); this.assignors = ConsumerPartitionAssignor.getAssignorInstances( config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)) @@ -444,6 +417,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { Time time, ApplicationEventHandler applicationEventHandler, BlockingQueue backgroundEventQueue, + CompletableEventReaper backgroundEventReaper, ConsumerRebalanceListenerInvoker rebalanceListenerInvoker, Metrics metrics, SubscriptionState subscriptions, @@ -461,12 +435,9 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { this.isolationLevel = IsolationLevel.READ_UNCOMMITTED; this.interceptors = Objects.requireNonNull(interceptors); this.time = time; - this.backgroundEventProcessor = new BackgroundEventProcessor( - logContext, - backgroundEventQueue, - applicationEventHandler, - rebalanceListenerInvoker - ); + this.backgroundEventQueue = backgroundEventQueue; + this.backgroundEventProcessor = new BackgroundEventProcessor(rebalanceListenerInvoker); + this.backgroundEventReaper = backgroundEventReaper; this.metrics = metrics; this.groupMetadata.set(initializeGroupMetadata(groupId, Optional.empty())); this.metadata = metadata; @@ -526,7 +497,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { this.groupMetadata.set(initializeGroupMetadata(config, groupRebalanceConfig)); BlockingQueue applicationEventQueue = new LinkedBlockingQueue<>(); - BlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); + this.backgroundEventQueue = new LinkedBlockingQueue<>(); BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker( logContext, @@ -563,21 +534,17 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( logContext, metadata, - applicationEventQueue, requestManagersSupplier ); this.applicationEventHandler = new ApplicationEventHandler(logContext, time, applicationEventQueue, + new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier); - this.backgroundEventProcessor = new BackgroundEventProcessor( - logContext, - backgroundEventQueue, - applicationEventHandler, - rebalanceListenerInvoker - ); + this.backgroundEventProcessor = new BackgroundEventProcessor(rebalanceListenerInvoker); + this.backgroundEventReaper = new CompletableEventReaper(logContext); } // auxiliary interface for testing @@ -587,6 +554,7 @@ ApplicationEventHandler build( final LogContext logContext, final Time time, final BlockingQueue applicationEventQueue, + final CompletableEventReaper applicationEventReaper, final Supplier applicationEventProcessorSupplier, final Supplier networkClientDelegateSupplier, final Supplier requestManagersSupplier @@ -594,6 +562,13 @@ ApplicationEventHandler build( } + // auxiliary interface for testing + interface CompletableEventReaperFactory { + + CompletableEventReaper build(final LogContext logContext); + + } + // auxiliary interface for testing interface FetchCollectorFactory { @@ -939,14 +914,12 @@ public Map committed(final Set committedOffsets = applicationEventHandler.addAndGet(event, - timer); + final Map committedOffsets = applicationEventHandler.addAndGet(event); committedOffsets.forEach(this::updateLastSeenEpochIfNewer); return committedOffsets; } catch (TimeoutException e) { @@ -992,12 +965,11 @@ public List partitionsFor(String topic, Duration timeout) { throw new TimeoutException(); } - final Timer timer = time.timer(timeout); - final TopicMetadataEvent topicMetadataEvent = new TopicMetadataEvent(topic, timer); + final TopicMetadataEvent topicMetadataEvent = new TopicMetadataEvent(topic, calculateDeadlineMs(time, timeout)); wakeupTrigger.setActiveTask(topicMetadataEvent.future()); try { Map> topicMetadata = - applicationEventHandler.addAndGet(topicMetadataEvent, timer); + applicationEventHandler.addAndGet(topicMetadataEvent); return topicMetadata.getOrDefault(topic, Collections.emptyList()); } finally { @@ -1021,11 +993,10 @@ public Map> listTopics(Duration timeout) { throw new TimeoutException(); } - final Timer timer = time.timer(timeout); - final AllTopicsMetadataEvent topicMetadataEvent = new AllTopicsMetadataEvent(timer); + final AllTopicsMetadataEvent topicMetadataEvent = new AllTopicsMetadataEvent(calculateDeadlineMs(time, timeout)); wakeupTrigger.setActiveTask(topicMetadataEvent.future()); try { - return applicationEventHandler.addAndGet(topicMetadataEvent, timer); + return applicationEventHandler.addAndGet(topicMetadataEvent); } finally { wakeupTrigger.clearTask(); } @@ -1093,10 +1064,9 @@ public Map offsetsForTimes(Map offsetsForTimes(Map beginningOrEndOffset(Collection timestampToSearch = partitions .stream() .collect(Collectors.toMap(Function.identity(), tp -> timestamp)); - Timer timer = time.timer(timeout); ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent( timestampToSearch, - timer, + calculateDeadlineMs(time, timeout), false); // If timeout is set to zero return empty immediately; otherwise try to get the results @@ -1167,9 +1136,7 @@ private Map beginningOrEndOffset(Collection offsetAndTimestampMap; - offsetAndTimestampMap = applicationEventHandler.addAndGet( - listOffsetsEvent, - timer); + offsetAndTimestampMap = applicationEventHandler.addAndGet(listOffsetsEvent); return offsetAndTimestampMap.entrySet() .stream() .collect(Collectors.toMap( @@ -1269,6 +1236,12 @@ private void close(Duration timeout, boolean swallowException) { if (applicationEventHandler != null) closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); closeTimer.update(); + + // close() can be called from inside one of the constructors. In that case, it's possible that neither + // the reaper nor the background event queue were constructed, so check them first to avoid NPE. + if (backgroundEventReaper != null && backgroundEventQueue != null) + backgroundEventReaper.reap(backgroundEventQueue); + closeQuietly(interceptors, "consumer interceptors", firstException); closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException); closeQuietly(metrics, "consumer metrics", firstException); @@ -1295,21 +1268,21 @@ private void close(Duration timeout, boolean swallowException) { void prepareShutdown(final Timer timer, final AtomicReference firstException) { if (!groupMetadata.get().isPresent()) return; - maybeAutoCommitSync(autoCommitEnabled, timer); + + if (autoCommitEnabled) + autoCommitSync(timer); + applicationEventHandler.add(new CommitOnCloseEvent()); completeQuietly( () -> { maybeRevokePartitions(); - applicationEventHandler.addAndGet(new LeaveOnCloseEvent(timer), timer); + applicationEventHandler.addAndGet(new LeaveOnCloseEvent(calculateDeadlineMs(timer))); }, "Failed to send leaveGroup heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException); } // Visible for testing - void maybeAutoCommitSync(final boolean shouldAutoCommit, - final Timer timer) { - if (!shouldAutoCommit) - return; + void autoCommitSync(final Timer timer) { Map allConsumed = subscriptions.allConsumed(); log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); try { @@ -1376,10 +1349,10 @@ public void commitSync(Map offsets, Duration acquireAndEnsureOpen(); long commitStart = time.nanoseconds(); try { - Timer requestTimer = time.timer(timeout.toMillis()); - SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, requestTimer); + SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, calculateDeadlineMs(time, timeout)); CompletableFuture commitFuture = commit(syncCommitEvent); + Timer requestTimer = time.timer(timeout.toMillis()); awaitPendingAsyncCommitsAndExecuteCommitCallbacks(requestTimer, true); wakeupTrigger.setActiveTask(commitFuture); @@ -1523,12 +1496,12 @@ public void unsubscribe() { fetchBuffer.retainAll(Collections.emptySet()); if (groupMetadata.get().isPresent()) { Timer timer = time.timer(Long.MAX_VALUE); - UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(timer); + UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer)); applicationEventHandler.add(unsubscribeEvent); log.info("Unsubscribing all topics or patterns and assigned partitions"); try { - processBackgroundEvents(backgroundEventProcessor, unsubscribeEvent.future(), timer); + processBackgroundEvents(unsubscribeEvent.future(), timer); log.info("Unsubscribed all topics or patterns and assigned partitions"); } catch (TimeoutException e) { log.error("Failed while waiting for the unsubscribe event to complete"); @@ -1637,7 +1610,7 @@ private boolean updateFetchPositions(final Timer timer) { // Validate positions using the partition leader end offsets, to detect if any partition // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch // request, retrieve the partition end offsets, and validate the current position against it. - applicationEventHandler.addAndGet(new ValidatePositionsEvent(timer), timer); + applicationEventHandler.addAndGet(new ValidatePositionsEvent(calculateDeadlineMs(timer))); cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions(); if (cachedSubscriptionHasAllFetchPositions) return true; @@ -1660,7 +1633,7 @@ private boolean updateFetchPositions(final Timer timer) { // which are awaiting reset. This will trigger a ListOffset request, retrieve the // partition offsets according to the strategy (ex. earliest, latest), and update the // positions. - applicationEventHandler.addAndGet(new ResetPositionsEvent(timer), timer); + applicationEventHandler.addAndGet(new ResetPositionsEvent(calculateDeadlineMs(timer))); return true; } catch (TimeoutException e) { return false; @@ -1693,9 +1666,9 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer timer) { final FetchCommittedOffsetsEvent event = new FetchCommittedOffsetsEvent( initializingPartitions, - timer); + calculateDeadlineMs(timer)); wakeupTrigger.setActiveTask(event.future()); - final Map offsets = applicationEventHandler.addAndGet(event, timer); + final Map offsets = applicationEventHandler.addAndGet(event); refreshCommittedOffsets(offsets, metadata, subscriptions); return true; } catch (TimeoutException e) { @@ -1722,7 +1695,7 @@ public boolean updateAssignmentMetadataIfNeeded(Timer timer) { maybeThrowFencedInstanceException(); offsetCommitCallbackInvoker.executeCallbacks(); maybeUpdateSubscriptionMetadata(); - backgroundEventProcessor.process(); + processBackgroundEvents(); return updateFetchPositions(timer); } @@ -1848,6 +1821,40 @@ private void subscribeInternal(Collection topics, Optional firstError = new AtomicReference<>(); + + LinkedList events = new LinkedList<>(); + backgroundEventQueue.drainTo(events); + + for (BackgroundEvent event : events) { + try { + if (event instanceof CompletableEvent) + backgroundEventReaper.add((CompletableEvent) event); + + backgroundEventProcessor.process(event); + } catch (Throwable t) { + KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t); + + if (!firstError.compareAndSet(null, e)) + log.warn("An error occurred when processing the background event: {}", e.getMessage(), e); + } + } + + backgroundEventReaper.reap(time.milliseconds()); + + if (firstError.get() != null) + throw firstError.get(); + + return !events.isEmpty(); + } + /** * This method can be used by cases where the caller has an event that needs to both block for completion but * also process background events. For some events, in order to fully process the associated logic, the @@ -1870,28 +1877,26 @@ private void subscribeInternal(Collection topics, Optional T processBackgroundEvents(EventProcessor eventProcessor, - Future future, - Timer timer) { + T processBackgroundEvents(Future future, Timer timer) { do { - boolean hadEvents = eventProcessor.process(); + boolean hadEvents = processBackgroundEvents(); try { if (future.isDone()) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index aa352cd68a22e..adee6594603bb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -20,6 +20,8 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.CompletableEvent; +import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.common.internals.IdempotentCloser; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.utils.KafkaThread; @@ -31,9 +33,11 @@ import java.io.Closeable; import java.time.Duration; import java.util.Collection; +import java.util.LinkedList; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.BlockingQueue; import java.util.function.Supplier; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS; @@ -50,6 +54,8 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { private static final String BACKGROUND_THREAD_NAME = "consumer_background_thread"; private final Time time; private final Logger log; + private final BlockingQueue applicationEventQueue; + private final CompletableEventReaper applicationEventReaper; private final Supplier applicationEventProcessorSupplier; private final Supplier networkClientDelegateSupplier; private final Supplier requestManagersSupplier; @@ -63,12 +69,16 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { public ConsumerNetworkThread(LogContext logContext, Time time, + BlockingQueue applicationEventQueue, + CompletableEventReaper applicationEventReaper, Supplier applicationEventProcessorSupplier, Supplier networkClientDelegateSupplier, Supplier requestManagersSupplier) { super(BACKGROUND_THREAD_NAME, true); this.time = time; this.log = logContext.logger(getClass()); + this.applicationEventQueue = applicationEventQueue; + this.applicationEventReaper = applicationEventReaper; this.applicationEventProcessorSupplier = applicationEventProcessorSupplier; this.networkClientDelegateSupplier = networkClientDelegateSupplier; this.requestManagersSupplier = requestManagersSupplier; @@ -125,10 +135,7 @@ void initializeResources() { * */ void runOnce() { - // Process the events—if any—that were produced by the application thread. It is possible that when processing - // an event generates an error. In such cases, the processor will log an exception, but we do not want those - // errors to be propagated to the caller. - applicationEventProcessor.process(); + processApplicationEvents(); final long currentTimeMs = time.milliseconds(); final long pollWaitTimeMs = requestManagers.entries().stream() @@ -144,6 +151,36 @@ void runOnce() { .map(Optional::get) .map(rm -> rm.maximumTimeToWait(currentTimeMs)) .reduce(Long.MAX_VALUE, Math::min); + + reapExpiredApplicationEvents(currentTimeMs); + } + + /** + * Process the events—if any—that were produced by the application thread. + */ + private void processApplicationEvents() { + LinkedList events = new LinkedList<>(); + applicationEventQueue.drainTo(events); + + for (ApplicationEvent event : events) { + try { + if (event instanceof CompletableEvent) + applicationEventReaper.add((CompletableEvent) event); + + applicationEventProcessor.process(event); + } catch (Throwable t) { + log.warn("Error processing event {}", t.getMessage(), t); + } + } + } + + /** + * "Complete" any events that have expired. This cleanup step should only be called after the network I/O + * thread has made at least one call to {@link NetworkClientDelegate#poll(long, long) poll} so that each event + * is given least one attempt to satisfy any network requests before checking if a timeout has expired. + */ + private void reapExpiredApplicationEvents(long currentTimeMs) { + applicationEventReaper.reap(currentTimeMs); } /** @@ -273,9 +310,10 @@ void cleanup() { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { sendUnsentRequests(timer); + applicationEventReaper.reap(applicationEventQueue); + closeQuietly(requestManagers, "request managers"); closeQuietly(networkClientDelegate, "network client delegate"); - closeQuietly(applicationEventProcessor, "application event processor"); log.debug("Closed the consumer network thread"); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java index 8e93de5a24c4c..76a550ad71985 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java @@ -1337,6 +1337,7 @@ private CompletableFuture enqueueConsumerRebalanceListenerCallback(Consume Set partitions) { SortedSet sortedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); sortedPartitions.addAll(partitions); + CompletableBackgroundEvent event = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions); backgroundEventHandler.add(event); log.debug("The event to trigger the {} method execution was enqueued successfully", methodName.fullyQualifiedMethodName()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java index 3347002cc6fea..9621e34ef5b94 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java @@ -17,14 +17,13 @@ package org.apache.kafka.clients.consumer.internals.events; import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.utils.Timer; import java.util.List; import java.util.Map; public abstract class AbstractTopicMetadataEvent extends CompletableApplicationEvent>> { - protected AbstractTopicMetadataEvent(final Type type, final Timer timer) { - super(type, timer); + protected AbstractTopicMetadataEvent(final Type type, final long deadlineMs) { + super(type, deadlineMs); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java index bda18e642105b..8fe1702c85bd6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java @@ -16,11 +16,9 @@ */ package org.apache.kafka.clients.consumer.internals.events; -import org.apache.kafka.common.utils.Timer; - public class AllTopicsMetadataEvent extends AbstractTopicMetadataEvent { - public AllTopicsMetadataEvent(final Timer timer) { - super(Type.ALL_TOPICS_METADATA, timer); + public AllTopicsMetadataEvent(final long deadlineMs) { + super(Type.ALL_TOPICS_METADATA, deadlineMs); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java index cb2616fda478c..1e082e11978be 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.internals.IdempotentCloser; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Timer; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; @@ -32,7 +31,6 @@ import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; /** @@ -49,6 +47,7 @@ public class ApplicationEventHandler implements Closeable { public ApplicationEventHandler(final LogContext logContext, final Time time, final BlockingQueue applicationEventQueue, + final CompletableEventReaper applicationEventReaper, final Supplier applicationEventProcessorSupplier, final Supplier networkClientDelegateSupplier, final Supplier requestManagersSupplier) { @@ -56,6 +55,8 @@ public ApplicationEventHandler(final LogContext logContext, this.applicationEventQueue = applicationEventQueue; this.networkThread = new ConsumerNetworkThread(logContext, time, + applicationEventQueue, + applicationEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier); @@ -99,17 +100,16 @@ public long maximumTimeToWait() { * *

    * - * See {@link ConsumerUtils#getResult(Future, Timer)} and {@link Future#get(long, TimeUnit)} for more details. + * See {@link ConsumerUtils#getResult(Future)} for more details. * * @param event A {@link CompletableApplicationEvent} created by the polling thread * @return Value that is the result of the event * @param Type of return value of the event */ - public T addAndGet(final CompletableApplicationEvent event, final Timer timer) { + public T addAndGet(final CompletableApplicationEvent event) { Objects.requireNonNull(event, "CompletableApplicationEvent provided to addAndGet must be non-null"); - Objects.requireNonNull(timer, "Timer provided to addAndGet must be non-null"); add(event); - return ConsumerUtils.getResult(event.future(), timer); + return ConsumerUtils.getResult(event.future()); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index d5cb1c04b38f4..7ee0c09d40df9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -33,7 +33,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -42,31 +41,20 @@ * An {@link EventProcessor} that is created and executes in the {@link ConsumerNetworkThread network thread} * which processes {@link ApplicationEvent application events} generated by the application thread. */ -public class ApplicationEventProcessor extends EventProcessor { +public class ApplicationEventProcessor implements EventProcessor { private final Logger log; private final ConsumerMetadata metadata; private final RequestManagers requestManagers; public ApplicationEventProcessor(final LogContext logContext, - final BlockingQueue applicationEventQueue, final RequestManagers requestManagers, final ConsumerMetadata metadata) { - super(logContext, applicationEventQueue); this.log = logContext.logger(ApplicationEventProcessor.class); this.requestManagers = requestManagers; this.metadata = metadata; } - /** - * Process the events—if any—that were produced by the application thread. It is possible that when processing - * an event generates an error. In such cases, the processor will log an exception, but we do not want those - * errors to be propagated to the caller. - */ - public boolean process() { - return process((event, error) -> error.ifPresent(e -> log.warn("Error processing event {}", e.getMessage(), e))); - } - @SuppressWarnings({"CyclomaticComplexity"}) @Override public void process(ApplicationEvent event) { @@ -273,7 +261,7 @@ private void process(final ConsumerRebalanceListenerCallbackCompletedEvent event manager.consumerRebalanceListenerCallbackCompleted(event); } - private void process(final CommitOnCloseEvent event) { + private void process(@SuppressWarnings("unused") final CommitOnCloseEvent event) { if (!requestManagers.commitRequestManager.isPresent()) return; log.debug("Signal CommitRequestManager closing"); @@ -309,7 +297,6 @@ private void process(final LeaveOnCloseEvent event) { */ public static Supplier supplier(final LogContext logContext, final ConsumerMetadata metadata, - final BlockingQueue applicationEventQueue, final Supplier requestManagersSupplier) { return new CachedSupplier() { @Override @@ -317,7 +304,6 @@ protected ApplicationEventProcessor create() { RequestManagers requestManagers = requestManagersSupplier.get(); return new ApplicationEventProcessor( logContext, - applicationEventQueue, requestManagers, metadata ); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java index 1da7b84039ab8..dc863b0ee659a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java @@ -18,7 +18,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Timer; import java.util.Collections; import java.util.Map; @@ -30,11 +29,6 @@ public abstract class CommitEvent extends CompletableApplicationEvent { */ private final Map offsets; - protected CommitEvent(final Type type, final Map offsets, final Timer timer) { - super(type, timer); - this.offsets = validate(offsets); - } - protected CommitEvent(final Type type, final Map offsets, final long deadlineMs) { super(type, deadlineMs); this.offsets = validate(offsets); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java index dae9e9f1017ba..dffac12902177 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java @@ -16,9 +16,6 @@ */ package org.apache.kafka.clients.consumer.internals.events; -import org.apache.kafka.common.utils.Timer; - -import java.util.Objects; import java.util.concurrent.CompletableFuture; /** @@ -32,13 +29,9 @@ public abstract class CompletableApplicationEvent extends ApplicationEvent im private final CompletableFuture future; private final long deadlineMs; - protected CompletableApplicationEvent(final Type type, final Timer timer) { - super(type); - this.future = new CompletableFuture<>(); - Objects.requireNonNull(timer); - this.deadlineMs = timer.remainingMs() + timer.currentTimeMs(); - } - + /** + * Note: the {@code deadlineMs} is the future time of expiration, not a timeout. + */ protected CompletableApplicationEvent(final Type type, final long deadlineMs) { super(type); this.future = new CompletableFuture<>(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java index 1a58515a5cbce..d02010496e545 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java @@ -27,10 +27,15 @@ public abstract class CompletableBackgroundEvent extends BackgroundEvent implements CompletableEvent { private final CompletableFuture future; + private final long deadlineMs; - protected CompletableBackgroundEvent(final Type type) { + /** + * Note: the {@code deadlineMs} is the future time of expiration, not a timeout. + */ + protected CompletableBackgroundEvent(final Type type, final long deadlineMs) { super(type); this.future = new CompletableFuture<>(); + this.deadlineMs = deadlineMs; } @Override @@ -38,8 +43,13 @@ public CompletableFuture future() { return future; } + @Override + public long deadlineMs() { + return deadlineMs; + } + @Override protected String toStringBase() { - return super.toStringBase() + ", future=" + future; + return super.toStringBase() + ", future=" + future + ", deadlineMs=" + deadlineMs; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java index 97559d8cb9be2..20231b0f99a10 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java @@ -16,9 +16,112 @@ */ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +import static java.util.Objects.requireNonNull; +/** + * {@code CompletableEvent} is an interface that is used by both {@link CompletableApplicationEvent} and + * {@link CompletableBackgroundEvent} for common processing and logic. A {@code CompletableEvent} is one that + * allows the caller to get the {@link #future() future} related to the event and the event's + * {@link #deadlineMs() expiration timestamp}. + * + * @param Return type for the event when completed + */ public interface CompletableEvent { + /** + * Returns the {@link CompletableFuture future} associated with this event. Any event will have some related + * logic that is executed on its behalf. The event can complete in one of the following ways: + * + *

      + *
    • + * Success: when the logic for the event completes successfully, the data generated by that event + * (if applicable) is passed to {@link CompletableFuture#complete(Object)}. In the case where the generic + * bound type is specified as {@link Void}, {@code null} is provided.
    • + *
    • + * Error: when the the event logic generates an error, the error is passed to + * {@link CompletableFuture#completeExceptionally(Throwable)}. + *
    • + *
    • + * Timeout: when the time spent executing the event logic exceeds the {@link #deadlineMs() deadline}, an + * instance of {@link TimeoutException} should be created and passed to + * {@link CompletableFuture#completeExceptionally(Throwable)}. This also occurs when an event remains + * incomplete when the consumer closes. + *
    • + *
    + * + * @return Future on which the caller may block or query for completion + * + * @see CompletableEventReaper + */ CompletableFuture future(); + + /** + * This is the deadline that represents the absolute wall clock time by which any event-specific execution should + * complete. This is not a timeout value. After this time has passed, + * {@link CompletableFuture#completeExceptionally(Throwable)} will be invoked with an instance of + * {@link TimeoutException}. + * + * @return Absolute time for event to be completed + * + * @see CompletableEventReaper + */ + long deadlineMs(); + + /** + * Calculate the deadline timestamp based on {@link Timer#currentTimeMs()} and {@link Timer#remainingMs()}. + * + * @param timer Timer + * + * @return Absolute time by which event should be completed + */ + static long calculateDeadlineMs(final Timer timer) { + requireNonNull(timer); + return calculateDeadlineMs(timer.currentTimeMs(), timer.remainingMs()); + } + + /** + * Calculate the deadline timestamp based on {@link Timer#currentTimeMs()} and {@link Duration#toMillis()}. + * + * @param time Time + * @param duration Duration + * + * @return Absolute time by which event should be completed + */ + static long calculateDeadlineMs(final Time time, final Duration duration) { + return calculateDeadlineMs(requireNonNull(time).milliseconds(), requireNonNull(duration).toMillis()); + } + + /** + * Calculate the deadline timestamp based on {@link Timer#currentTimeMs()} and timeout. + * + * @param time Time + * @param timeoutMs Timeout, in milliseconds + * + * @return Absolute time by which event should be completed + */ + static long calculateDeadlineMs(final Time time, final long timeoutMs) { + return calculateDeadlineMs(requireNonNull(time).milliseconds(), timeoutMs); + } + + /** + * Calculate the deadline timestamp based on the current time and timeout. + * + * @param currentTimeMs Current time, in milliseconds + * @param timeoutMs Timeout, in milliseconds + * + * @return Absolute time by which event should be completed + */ + static long calculateDeadlineMs(final long currentTimeMs, final long timeoutMs) { + if (currentTimeMs > Long.MAX_VALUE - timeoutMs) + return Long.MAX_VALUE; + else + return currentTimeMs + timeoutMs; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java new file mode 100644 index 0000000000000..545a03df8b30d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** + * {@code CompletableEventReaper} is responsible for tracking {@link CompletableEvent time-bound events} and removing + * any that exceed their {@link CompletableEvent#deadlineMs() deadline} (unless they've already completed). This + * mechanism is used by the {@link AsyncKafkaConsumer} to enforce the timeout provided by the user in its API + * calls (e.g. {@link AsyncKafkaConsumer#commitSync(Duration)}). + */ +public class CompletableEventReaper { + + private final Logger log; + + /** + * List of tracked events that are candidates for expiration. + */ + private final List> tracked; + + public CompletableEventReaper(LogContext logContext) { + this.log = logContext.logger(CompletableEventReaper.class); + this.tracked = new ArrayList<>(); + } + + /** + * Adds a new {@link CompletableEvent event} to track for later completion/expiration. + * + * @param event Event to track + */ + public void add(CompletableEvent event) { + tracked.add(Objects.requireNonNull(event, "Event to track must be non-null")); + } + + /** + * This method performs a two-step process to "complete" {@link CompletableEvent events} that have either expired + * or completed normally: + * + *
      + *
    1. + * For each tracked event which has exceeded its {@link CompletableEvent#deadlineMs() deadline}, an + * instance of {@link TimeoutException} is created and passed to + * {@link CompletableFuture#completeExceptionally(Throwable)}. + *
    2. + *
    3. + * For each tracked event of which its {@link CompletableEvent#future() future} is already in the + * {@link CompletableFuture#isDone() done} state, it will be removed from the list of tracked events. + *
    4. + *
    + * + *

    + * + * This method should be called at regular intervals, based upon the needs of the resource that owns the reaper. + * + * @param currentTimeMs Current time with which to compare against the + * {@link CompletableEvent#deadlineMs() expiration time} + */ + public void reap(long currentTimeMs) { + Consumer> expireEvent = event -> { + long pastDueMs = currentTimeMs - event.deadlineMs(); + TimeoutException error = new TimeoutException(String.format("%s was %s ms past its expiration of %s", event.getClass().getSimpleName(), pastDueMs, event.deadlineMs())); + + if (event.future().completeExceptionally(error)) { + log.debug("Event {} completed exceptionally since its expiration of {} passed {} ms ago", event, event.deadlineMs(), pastDueMs); + } else { + log.trace("Event {} not completed exceptionally since it was previously completed", event); + } + }; + + // First, complete (exceptionally) any events that have passed their deadline AND aren't already complete. + tracked.stream() + .filter(e -> !e.future().isDone()) + .filter(e -> currentTimeMs >= e.deadlineMs()) + .forEach(expireEvent); + // Second, remove any events that are already complete, just to make sure we don't hold references. This will + // include any events that finished successfully as well as any events we just completed exceptionally above. + tracked.removeIf(e -> e.future().isDone()); + } + + /** + * It is possible for the {@link AsyncKafkaConsumer#close() consumer to close} before completing the processing of + * all the events in the queue. In this case, we need to + * {@link CompletableFuture#completeExceptionally(Throwable) expire} any remaining events. + * + *

    + * + * Check each of the {@link #add(CompletableEvent) previously-added} {@link CompletableEvent completable events}, + * and for any that are incomplete, expire them. Also check the core event queue for any incomplete events and + * likewise expire them. + * + *

    + * + * Note: because this is called in the context of {@link AsyncKafkaConsumer#close() closing consumer}, + * don't take the deadline into consideration, just close it regardless. + * + * @param events Events from a queue that have not yet been tracked that also need to be reviewed + */ + public void reap(Collection events) { + Objects.requireNonNull(events, "Event queue to reap must be non-null"); + + Consumer> expireEvent = event -> { + TimeoutException error = new TimeoutException(String.format("%s could not be completed before the consumer closed", event.getClass().getSimpleName())); + + if (event.future().completeExceptionally(error)) { + log.debug("Event {} completed exceptionally since the consumer is closing", event); + } else { + log.trace("Event {} not completed exceptionally since it was completed prior to the consumer closing", event); + } + }; + + tracked.stream() + .filter(e -> !e.future().isDone()) + .forEach(expireEvent); + tracked.clear(); + + events.stream() + .filter(e -> e instanceof CompletableEvent) + .map(e -> (CompletableEvent) e) + .filter(e -> !e.future().isDone()) + .forEach(expireEvent); + events.clear(); + } + + public int size() { + return tracked.size(); + } + + public boolean contains(CompletableEvent event) { + return event != null && tracked.contains(event); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java index 6ce833580c88d..ecb9eedab22c7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java @@ -39,7 +39,7 @@ public class ConsumerRebalanceListenerCallbackNeededEvent extends CompletableBac public ConsumerRebalanceListenerCallbackNeededEvent(final ConsumerRebalanceListenerMethodName methodName, final SortedSet partitions) { - super(Type.CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED); + super(Type.CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED, Long.MAX_VALUE); this.methodName = Objects.requireNonNull(methodName); this.partitions = Collections.unmodifiableSortedSet(partitions); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java index 21916034b37b6..1c0bb0305989e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java @@ -16,111 +16,26 @@ */ package org.apache.kafka.clients.consumer.internals.events; -import org.apache.kafka.clients.consumer.internals.ConsumerUtils; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.internals.IdempotentCloser; -import org.apache.kafka.common.utils.LogContext; -import org.slf4j.Logger; - -import java.io.Closeable; -import java.util.LinkedList; -import java.util.List; -import java.util.Objects; -import java.util.Optional; import java.util.concurrent.BlockingQueue; /** - * An {@link EventProcessor} is the means by which events produced by thread A are - * processed by thread B. By definition, threads A and B run in parallel to - * each other, so a mechanism is needed with which to receive and process the events from the other thread. That - * communication channel is formed around {@link BlockingQueue a shared queue} into which thread A - * enqueues events and thread B reads and processes those events. + * An {@code EventProcessor} is the means by which events are processed, the meaning of which is left + * intentionally loose. This is in large part to keep the {@code EventProcessor} focused on what it means to process + * the events, and not linking itself too closely with the rest of the surrounding application. + * + *

    + * + * The {@code EventProcessor} is envisaged as a stateless service that acts as a conduit, receiving an event and + * dispatching to another block of code to process. The semantic meaning of each event is different, so the + * {@code EventProcessor} will need to interact with other parts of the system that maintain state. The + * implementation should not be concerned with the mechanism by which an event arrived for processing. While the + * events are shuffled around the consumer subsystem by means of {@link BlockingQueue shared queues}, it should + * be considered an anti-pattern to need to know how it arrived or what happens after its is processed. */ -public abstract class EventProcessor implements Closeable { - - private final Logger log; - private final BlockingQueue eventQueue; - private final IdempotentCloser closer; - - protected EventProcessor(final LogContext logContext, final BlockingQueue eventQueue) { - this.log = logContext.logger(EventProcessor.class); - this.eventQueue = eventQueue; - this.closer = new IdempotentCloser(); - } - - public abstract boolean process(); - - protected abstract void process(T event); - - @Override - public void close() { - closer.close(this::closeInternal, () -> log.warn("The event processor was already closed")); - } - - protected interface ProcessHandler { - - void onProcess(T event, Optional error); - } - - /** - * Drains all available events from the queue, and then processes them in order. If any errors are thrown while - * processing the individual events, these are submitted to the given {@link ProcessHandler}. - */ - protected boolean process(ProcessHandler processHandler) { - closer.assertOpen("The processor was previously closed, so no further processing can occur"); - - List events = drain(); - - if (events.isEmpty()) - return false; - - for (T event : events) { - try { - Objects.requireNonNull(event, "Attempted to process a null event"); - process(event); - processHandler.onProcess(event, Optional.empty()); - } catch (Throwable t) { - KafkaException error = ConsumerUtils.maybeWrapAsKafkaException(t); - processHandler.onProcess(event, Optional.of(error)); - } - } - - return true; - } - - /** - * It is possible for the consumer to close before complete processing all the events in the queue. In - * this case, we need to throw an exception to notify the user the consumer is closed. - */ - private void closeInternal() { - log.trace("Closing event processor"); - List incompleteEvents = drain(); - - if (incompleteEvents.isEmpty()) - return; - - KafkaException exception = new KafkaException("The consumer is closed"); - - // Check each of the events and if it has a Future that is incomplete, complete it exceptionally. - incompleteEvents - .stream() - .filter(e -> e instanceof CompletableEvent) - .map(e -> ((CompletableEvent) e).future()) - .filter(f -> !f.isDone()) - .forEach(f -> { - log.debug("Completing {} with exception {}", f, exception.getMessage()); - f.completeExceptionally(exception); - }); - - log.debug("Discarding {} events because the consumer is closing", incompleteEvents.size()); - } +public interface EventProcessor { /** - * Moves all the events from the queue to the returned list. + * Process an event that is received. */ - private List drain() { - LinkedList events = new LinkedList<>(); - eventQueue.drainTo(events); - return events; - } + void process(T event); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java index 980a8f1104261..785736791a7c7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java @@ -18,7 +18,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Timer; import java.util.Collections; import java.util.Map; @@ -31,8 +30,8 @@ public class FetchCommittedOffsetsEvent extends CompletableApplicationEvent partitions; - public FetchCommittedOffsetsEvent(final Set partitions, final Timer timer) { - super(Type.FETCH_COMMITTED_OFFSETS, timer); + public FetchCommittedOffsetsEvent(final Set partitions, final long deadlineMs) { + super(Type.FETCH_COMMITTED_OFFSETS, deadlineMs); this.partitions = Collections.unmodifiableSet(partitions); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java index e77b4dfb2893c..647265a1500c8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java @@ -16,11 +16,9 @@ */ package org.apache.kafka.clients.consumer.internals.events; -import org.apache.kafka.common.utils.Timer; - public class LeaveOnCloseEvent extends CompletableApplicationEvent { - public LeaveOnCloseEvent(final Timer timer) { - super(Type.LEAVE_ON_CLOSE, timer); + public LeaveOnCloseEvent(final long deadlineMs) { + super(Type.LEAVE_ON_CLOSE, deadlineMs); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java index 3df4719a7b065..8ae2f1ea57612 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java @@ -19,7 +19,6 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Timer; import java.util.Collections; import java.util.HashMap; @@ -38,9 +37,9 @@ public class ListOffsetsEvent extends CompletableApplicationEvent timestampToSearch, - Timer timer, + long deadlineMs, boolean requireTimestamps) { - super(Type.LIST_OFFSETS, timer); + super(Type.LIST_OFFSETS, deadlineMs); this.timestampsToSearch = Collections.unmodifiableMap(timestampToSearch); this.requireTimestamps = requireTimestamps; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java index 65893b62ecaa5..86dbb80c0f0ac 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java @@ -17,8 +17,6 @@ package org.apache.kafka.clients.consumer.internals.events; -import org.apache.kafka.common.utils.Timer; - /** * Event for resetting offsets for all assigned partitions that require it. This is an * asynchronous event that generates ListOffsets requests, and completes by updating in-memory @@ -26,7 +24,7 @@ */ public class ResetPositionsEvent extends CompletableApplicationEvent { - public ResetPositionsEvent(final Timer timer) { - super(Type.RESET_POSITIONS, timer); + public ResetPositionsEvent(final long deadlineMs) { + super(Type.RESET_POSITIONS, deadlineMs); } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java index 87945616ea71b..7dc7a023a8d01 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java @@ -18,7 +18,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Timer; import java.util.Map; @@ -28,7 +27,7 @@ */ public class SyncCommitEvent extends CommitEvent { - public SyncCommitEvent(final Map offsets, final Timer timer) { - super(Type.COMMIT_SYNC, offsets, timer); + public SyncCommitEvent(final Map offsets, final long deadlineMs) { + super(Type.COMMIT_SYNC, offsets, deadlineMs); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java index 33e1270ce6040..9758ae0efa0f0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java @@ -16,16 +16,14 @@ */ package org.apache.kafka.clients.consumer.internals.events; -import org.apache.kafka.common.utils.Timer; - import java.util.Objects; public class TopicMetadataEvent extends AbstractTopicMetadataEvent { private final String topic; - public TopicMetadataEvent(final String topic, final Timer timer) { - super(Type.TOPIC_METADATA, timer); + public TopicMetadataEvent(final String topic, final long deadlineMs) { + super(Type.TOPIC_METADATA, deadlineMs); this.topic = Objects.requireNonNull(topic); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java index 0b988370014a5..327feaa22f69b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java @@ -17,8 +17,6 @@ package org.apache.kafka.clients.consumer.internals.events; -import org.apache.kafka.common.utils.Timer; - /** * Application event triggered when a user calls the unsubscribe API. This will make the consumer * release all its assignments and send a heartbeat request to leave the consumer group. @@ -28,8 +26,8 @@ */ public class UnsubscribeEvent extends CompletableApplicationEvent { - public UnsubscribeEvent(final Timer timer) { - super(Type.UNSUBSCRIBE, timer); + public UnsubscribeEvent(final long deadlineMs) { + super(Type.UNSUBSCRIBE, deadlineMs); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java index 21e7f3cf6eba1..a93ff9859a58e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java @@ -17,8 +17,6 @@ package org.apache.kafka.clients.consumer.internals.events; -import org.apache.kafka.common.utils.Timer; - /** * Event for validating offsets for all assigned partitions for which a leader change has been * detected. This is an asynchronous event that generates OffsetForLeaderEpoch requests, and @@ -26,7 +24,7 @@ */ public class ValidatePositionsEvent extends CompletableApplicationEvent { - public ValidatePositionsEvent(final Timer timer) { - super(Type.VALIDATE_POSITIONS, timer); + public ValidatePositionsEvent(final long deadlineMs) { + super(Type.VALIDATE_POSITIONS, deadlineMs); } } \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index b9f64445dcfb3..eab9f3e5ac12d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; @@ -150,6 +151,7 @@ public class AsyncKafkaConsumerTest { private final ApplicationEventHandler applicationEventHandler = mock(ApplicationEventHandler.class); private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); private final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); + private final CompletableEventReaper backgroundEventReaper = mock(CompletableEventReaper.class); @AfterEach public void resetAll() { @@ -190,7 +192,8 @@ private AsyncKafkaConsumer newConsumer(ConsumerConfig config) { new StringDeserializer(), new StringDeserializer(), time, - (a, b, c, d, e, f) -> applicationEventHandler, + (a, b, c, d, e, f, g) -> applicationEventHandler, + a -> backgroundEventReaper, (a, b, c, d, e, f, g) -> fetchCollector, (a, b, c, d) -> metadata, backgroundEventQueue @@ -218,6 +221,7 @@ private AsyncKafkaConsumer newConsumer( time, applicationEventHandler, backgroundEventQueue, + backgroundEventReaper, rebalanceListenerInvoker, new Metrics(), subscriptions, @@ -318,6 +322,7 @@ private static Stream commitExceptionSupplier() { @Test public void testCommitAsyncWithFencedException() { consumer = newConsumer(); + completeCommitSyncApplicationEventSuccessfully(); final Map offsets = mockTopicPartitionOffset(); MockCommitCallback callback = new MockCommitCallback(); @@ -339,7 +344,7 @@ public void testCommitted() { completeFetchedCommittedOffsetApplicationEventSuccessfully(topicPartitionOffsets); assertEquals(topicPartitionOffsets, consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000))); - verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class), any()); + verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class)); final Metric metric = consumer.metrics() .get(consumer.metricsRegistry().metricName("committed-time-ns-total", "consumer-metrics")); assertTrue((double) metric.metricValue() > 0); @@ -361,7 +366,7 @@ public void testCommittedLeaderEpochUpdate() { verify(metadata).updateLastSeenEpochIfNewer(t0, 2); verify(metadata).updateLastSeenEpochIfNewer(t2, 3); - verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class), any()); + verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class)); } @Test @@ -369,7 +374,7 @@ public void testCommittedExceptionThrown() { consumer = newConsumer(); Map offsets = mockTopicPartitionOffset(); when(applicationEventHandler.addAndGet( - any(FetchCommittedOffsetsEvent.class), any())).thenAnswer(invocation -> { + any(FetchCommittedOffsetsEvent.class))).thenAnswer(invocation -> { CompletableApplicationEvent event = invocation.getArgument(0); assertInstanceOf(FetchCommittedOffsetsEvent.class, event); throw new KafkaException("Test exception"); @@ -387,6 +392,7 @@ public void testWakeupBeforeCallingPoll() { doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + completeCommitSyncApplicationEventSuccessfully(); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); consumer.assign(singleton(tp)); @@ -408,6 +414,7 @@ public void testWakeupAfterEmptyFetch() { }).doAnswer(invocation -> Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + completeCommitSyncApplicationEventSuccessfully(); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); consumer.assign(singleton(tp)); @@ -431,6 +438,7 @@ public void testWakeupAfterNonEmptyFetch() { }).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class)); Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + completeCommitSyncApplicationEventSuccessfully(); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); consumer.assign(singleton(tp)); @@ -486,6 +494,7 @@ public void testClearWakeupTriggerAfterPoll() { .when(fetchCollector).collectFetch(any(FetchBuffer.class)); Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + completeCommitSyncApplicationEventSuccessfully(); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); consumer.assign(singleton(tp)); @@ -558,6 +567,7 @@ public void testCommitAsyncLeaderEpochUpdate() { singletonList(new RoundRobinAssignor()), "group-id", "client-id"); + completeCommitSyncApplicationEventSuccessfully(); final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); HashMap topicPartitionOffsets = new HashMap<>(); @@ -759,9 +769,9 @@ public void testEnsureShutdownExecutedCommitAsyncCallbacks() { @Test public void testVerifyApplicationEventOnShutdown() { consumer = newConsumer(); - doReturn(null).when(applicationEventHandler).addAndGet(any(), any()); + doReturn(null).when(applicationEventHandler).addAndGet(any()); consumer.close(); - verify(applicationEventHandler).addAndGet(any(LeaveOnCloseEvent.class), any()); + verify(applicationEventHandler).addAndGet(any(LeaveOnCloseEvent.class)); verify(applicationEventHandler).add(any(CommitOnCloseEvent.class)); } @@ -804,7 +814,7 @@ public void testFailedPartitionRevocationOnClose() { subscriptions.assignFromSubscribed(singleton(tp)); doThrow(new KafkaException()).when(listener).onPartitionsRevoked(eq(singleton(tp))); assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO)); - verify(applicationEventHandler, never()).addAndGet(any(LeaveOnCloseEvent.class), any()); + verify(applicationEventHandler, never()).addAndGet(any(LeaveOnCloseEvent.class)); verify(listener).onPartitionsRevoked(eq(singleton(tp))); assertEquals(emptySet(), subscriptions.assignedPartitions()); } @@ -827,6 +837,7 @@ public void testCompleteQuietly() { @Test public void testAutoCommitSyncEnabled() { + completeCommitSyncApplicationEventSuccessfully(); SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); consumer = newConsumer( mock(FetchBuffer.class), @@ -839,7 +850,7 @@ public void testAutoCommitSyncEnabled() { consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class)); subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); subscriptions.seek(new TopicPartition("topic", 0), 100); - consumer.maybeAutoCommitSync(true, time.timer(100)); + consumer.autoCommitSync(time.timer(100)); verify(applicationEventHandler).add(any(SyncCommitEvent.class)); } @@ -857,7 +868,6 @@ public void testAutoCommitSyncDisabled() { consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class)); subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); subscriptions.seek(new TopicPartition("topic", 0), 100); - consumer.maybeAutoCommitSync(false, time.timer(100)); verify(applicationEventHandler, never()).add(any(SyncCommitEvent.class)); } @@ -936,8 +946,9 @@ public void testBeginningOffsets() { consumer = newConsumer(); Map expectedOffsets = mockOffsetAndTimestamp(); - when(applicationEventHandler.addAndGet(any(ListOffsetsEvent.class), any())).thenAnswer(invocation -> { - Timer timer = invocation.getArgument(1); + when(applicationEventHandler.addAndGet(any(ListOffsetsEvent.class))).thenAnswer(invocation -> { + ListOffsetsEvent event = invocation.getArgument(0); + Timer timer = time.timer(event.deadlineMs() - time.milliseconds()); if (timer.remainingMs() == 0) { fail("Timer duration should not be zero."); } @@ -950,7 +961,7 @@ public void testBeginningOffsets() { assertTrue(result.containsKey(key)); assertEquals(value.offset(), result.get(key)); }); - verify(applicationEventHandler).addAndGet(any(ListOffsetsEvent.class), any(Timer.class)); + verify(applicationEventHandler).addAndGet(any(ListOffsetsEvent.class)); } @Test @@ -960,26 +971,23 @@ public void testBeginningOffsetsThrowsKafkaExceptionForUnderlyingExecutionFailur Throwable eventProcessingFailure = new KafkaException("Unexpected failure " + "processing List Offsets event"); doThrow(eventProcessingFailure).when(applicationEventHandler).addAndGet( - any(ListOffsetsEvent.class), - any()); + any(ListOffsetsEvent.class)); Throwable consumerError = assertThrows(KafkaException.class, () -> consumer.beginningOffsets(partitions, Duration.ofMillis(1))); assertEquals(eventProcessingFailure, consumerError); - verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class), - ArgumentMatchers.isA(Timer.class)); + verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class)); } @Test public void testBeginningOffsetsTimeoutOnEventProcessingTimeout() { consumer = newConsumer(); - doThrow(new TimeoutException()).when(applicationEventHandler).addAndGet(any(), any()); + doThrow(new TimeoutException()).when(applicationEventHandler).addAndGet(any()); assertThrows(TimeoutException.class, () -> consumer.beginningOffsets( Collections.singletonList(new TopicPartition("t1", 0)), Duration.ofMillis(1))); - verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class), - ArgumentMatchers.isA(Timer.class)); + verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class)); } @Test @@ -1014,15 +1022,14 @@ public void testOffsetsForTimes() { Map expectedResult = mockOffsetAndTimestamp(); Map timestampToSearch = mockTimestampToSearch(); - doReturn(expectedResult).when(applicationEventHandler).addAndGet(any(), any()); + doReturn(expectedResult).when(applicationEventHandler).addAndGet(any()); Map result = assertDoesNotThrow(() -> consumer.offsetsForTimes(timestampToSearch, Duration.ofMillis(1))); expectedResult.forEach((key, value) -> { OffsetAndTimestamp expected = value.buildOffsetAndTimestamp(); assertEquals(expected, result.get(key)); }); - verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class), - ArgumentMatchers.isA(Timer.class)); + verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class)); } // This test ensures same behaviour as the current consumer when offsetsForTimes is called @@ -1049,8 +1056,7 @@ public void testOffsetsForTimesWithZeroTimeout() { Map result = assertDoesNotThrow(() -> consumer.offsetsForTimes(timestampToSearch, Duration.ZERO)); assertEquals(expectedResult, result); - verify(applicationEventHandler, never()).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class), - ArgumentMatchers.isA(Timer.class)); + verify(applicationEventHandler, never()).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class)); } @Test @@ -1059,13 +1065,12 @@ public void testWakeupCommitted() { final Map offsets = mockTopicPartitionOffset(); doAnswer(invocation -> { CompletableApplicationEvent event = invocation.getArgument(0); - Timer timer = invocation.getArgument(1); assertInstanceOf(FetchCommittedOffsetsEvent.class, event); assertTrue(event.future().isCompletedExceptionally()); - return ConsumerUtils.getResult(event.future(), timer); + return ConsumerUtils.getResult(event.future()); }) .when(applicationEventHandler) - .addAndGet(any(FetchCommittedOffsetsEvent.class), any(Timer.class)); + .addAndGet(any(FetchCommittedOffsetsEvent.class)); consumer.wakeup(); assertThrows(WakeupException.class, () -> consumer.committed(offsets.keySet())); @@ -1216,6 +1221,7 @@ public void testNoInterceptorCommitAsyncFailed() { @Test public void testRefreshCommittedOffsetsSuccess() { consumer = newConsumer(); + completeCommitSyncApplicationEventSuccessfully(); TopicPartition partition = new TopicPartition("t1", 1); Set partitions = Collections.singleton(partition); Map committedOffsets = Collections.singletonMap(partition, new OffsetAndMetadata(10L)); @@ -1661,20 +1667,20 @@ private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean co consumer.poll(Duration.ZERO); verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ValidatePositionsEvent.class), ArgumentMatchers.isA(Timer.class)); + .addAndGet(ArgumentMatchers.isA(ValidatePositionsEvent.class)); if (committedOffsetsEnabled) { // Verify there was an FetchCommittedOffsets event and no ResetPositions event verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class), ArgumentMatchers.isA(Timer.class)); + .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class)); verify(applicationEventHandler, never()) - .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class), ArgumentMatchers.isA(Timer.class)); + .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class)); } else { // Verify there was not any FetchCommittedOffsets event but there should be a ResetPositions verify(applicationEventHandler, never()) - .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class), ArgumentMatchers.isA(Timer.class)); + .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class)); verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class), ArgumentMatchers.isA(Timer.class)); + .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class)); } } @@ -1689,11 +1695,11 @@ private void testRefreshCommittedOffsetsSuccess(Set partitions, consumer.poll(Duration.ZERO); verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ValidatePositionsEvent.class), ArgumentMatchers.isA(Timer.class)); + .addAndGet(ArgumentMatchers.isA(ValidatePositionsEvent.class)); verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class), ArgumentMatchers.isA(Timer.class)); + .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class)); verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class), ArgumentMatchers.isA(Timer.class)); + .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class)); } @Test @@ -1730,7 +1736,7 @@ public void testLongPollWaitIsLimited() { } /** - * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(EventProcessor, Future, Timer) processBackgroundEvents} + * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer) processBackgroundEvents} * handles the case where the {@link Future} takes a bit of time to complete, but does within the timeout. */ @Test @@ -1756,16 +1762,14 @@ public void testProcessBackgroundEventsWithInitialDelay() throws Exception { return null; }).when(future).get(any(Long.class), any(TimeUnit.class)); - try (EventProcessor processor = mock(EventProcessor.class)) { - consumer.processBackgroundEvents(processor, future, timer); + consumer.processBackgroundEvents(future, timer); - // 800 is the 1000 ms timeout (above) minus the 200 ms delay for the two incremental timeouts/retries. - assertEquals(800, timer.remainingMs()); - } + // 800 is the 1000 ms timeout (above) minus the 200 ms delay for the two incremental timeouts/retries. + assertEquals(800, timer.remainingMs()); } /** - * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(EventProcessor, Future, Timer) processBackgroundEvents} + * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer) processBackgroundEvents} * handles the case where the {@link Future} is already complete when invoked, so it doesn't have to wait. */ @Test @@ -1776,17 +1780,15 @@ public void testProcessBackgroundEventsWithoutDelay() { // Create a future that is already completed. CompletableFuture future = CompletableFuture.completedFuture(null); - try (EventProcessor processor = mock(EventProcessor.class)) { - consumer.processBackgroundEvents(processor, future, timer); + consumer.processBackgroundEvents(future, timer); - // Because we didn't need to perform a timed get, we should still have every last millisecond - // of our initial timeout. - assertEquals(1000, timer.remainingMs()); - } + // Because we didn't need to perform a timed get, we should still have every last millisecond + // of our initial timeout. + assertEquals(1000, timer.remainingMs()); } /** - * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(EventProcessor, Future, Timer) processBackgroundEvents} + * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer) processBackgroundEvents} * handles the case where the {@link Future} does not complete within the timeout. */ @Test @@ -1801,12 +1803,10 @@ public void testProcessBackgroundEventsTimesOut() throws Exception { throw new java.util.concurrent.TimeoutException("Intentional timeout"); }).when(future).get(any(Long.class), any(TimeUnit.class)); - try (EventProcessor processor = mock(EventProcessor.class)) { - assertThrows(TimeoutException.class, () -> consumer.processBackgroundEvents(processor, future, timer)); + assertThrows(TimeoutException.class, () -> consumer.processBackgroundEvents(future, timer)); - // Because we forced our mocked future to continuously time out, we should have no time remaining. - assertEquals(0, timer.remainingMs()); - } + // Because we forced our mocked future to continuously time out, we should have no time remaining. + assertEquals(0, timer.remainingMs()); } /** @@ -1835,7 +1835,31 @@ public void testPollThrowsInterruptExceptionIfInterrupted() { } assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); } - + + @Test + void testReaperInvokedInClose() { + consumer = newConsumer(); + consumer.close(); + verify(backgroundEventReaper).reap(backgroundEventQueue); + } + + @Test + void testReaperInvokedInUnsubscribe() { + consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); + consumer.unsubscribe(); + verify(backgroundEventReaper).reap(time.milliseconds()); + } + + @Test + void testReaperInvokedInPoll() { + consumer = newConsumer(); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + consumer.subscribe(Collections.singletonList("topic")); + consumer.poll(Duration.ZERO); + verify(backgroundEventReaper).reap(time.milliseconds()); + } + private Map mockTopicPartitionOffset() { final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); @@ -1898,13 +1922,13 @@ private void completeCommitSyncApplicationEventSuccessfully() { private void completeFetchedCommittedOffsetApplicationEventSuccessfully(final Map committedOffsets) { doReturn(committedOffsets) .when(applicationEventHandler) - .addAndGet(any(FetchCommittedOffsetsEvent.class), any(Timer.class)); + .addAndGet(any(FetchCommittedOffsetsEvent.class)); } private void completeFetchedCommittedOffsetApplicationEventExceptionally(Exception ex) { doThrow(ex) .when(applicationEventHandler) - .addAndGet(any(FetchCommittedOffsetsEvent.class), any(Timer.class)); + .addAndGet(any(FetchCommittedOffsetsEvent.class)); } private void completeUnsubscribeApplicationEventSuccessfully() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index 0e68b5df95051..8c3f97dd64379 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -23,6 +23,8 @@ import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent; import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.CompletableEvent; +import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent; @@ -32,6 +34,7 @@ import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FindCoordinatorRequest; @@ -41,7 +44,6 @@ import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.RequestTestUtils; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Timer; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; @@ -55,6 +57,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Optional; import java.util.concurrent.BlockingQueue; @@ -62,6 +65,8 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS; import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS; +import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation; +import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -70,7 +75,9 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -78,7 +85,7 @@ public class ConsumerNetworkThreadTest { - private ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder testBuilder; + private ConsumerTestBuilder testBuilder; private Time time; private ConsumerMetadata metadata; private NetworkClientDelegate networkClient; @@ -88,11 +95,12 @@ public class ConsumerNetworkThreadTest { private CommitRequestManager commitRequestManager; private CoordinatorRequestManager coordinatorRequestManager; private ConsumerNetworkThread consumerNetworkThread; + private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); private MockClient client; @BeforeEach public void setup() { - testBuilder = new ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder(); + testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); time = testBuilder.time; metadata = testBuilder.metadata; networkClient = testBuilder.networkClientDelegate; @@ -102,14 +110,24 @@ public void setup() { commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); offsetsRequestManager = testBuilder.offsetsRequestManager; coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); - consumerNetworkThread = testBuilder.consumerNetworkThread; + consumerNetworkThread = new ConsumerNetworkThread( + testBuilder.logContext, + time, + testBuilder.applicationEventQueue, + applicationEventReaper, + () -> applicationEventProcessor, + () -> testBuilder.networkClientDelegate, + () -> testBuilder.requestManagers + ); consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { - if (testBuilder != null) + if (testBuilder != null) { testBuilder.close(); + consumerNetworkThread.close(Duration.ZERO); + } } @Test @@ -157,8 +175,7 @@ public void testAsyncCommitEvent() { @Test public void testSyncCommitEvent() { - Timer timer = time.timer(100); - ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), timer); + ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), calculateDeadlineMs(time, 100)); applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); verify(applicationEventProcessor).process(any(SyncCommitEvent.class)); @@ -168,8 +185,7 @@ public void testSyncCommitEvent() { @ValueSource(booleans = {true, false}) public void testListOffsetsEventIsProcessed(boolean requireTimestamp) { Map timestamps = Collections.singletonMap(new TopicPartition("topic1", 1), 5L); - Timer timer = time.timer(100); - ApplicationEvent e = new ListOffsetsEvent(timestamps, timer, requireTimestamp); + ApplicationEvent e = new ListOffsetsEvent(timestamps, calculateDeadlineMs(time, 100), requireTimestamp); applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); verify(applicationEventProcessor).process(any(ListOffsetsEvent.class)); @@ -178,8 +194,7 @@ public void testListOffsetsEventIsProcessed(boolean requireTimestamp) { @Test public void testResetPositionsEventIsProcessed() { - Timer timer = time.timer(100); - ResetPositionsEvent e = new ResetPositionsEvent(timer); + ResetPositionsEvent e = new ResetPositionsEvent(calculateDeadlineMs(time, 100)); applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); verify(applicationEventProcessor).process(any(ResetPositionsEvent.class)); @@ -190,8 +205,7 @@ public void testResetPositionsEventIsProcessed() { public void testResetPositionsProcessFailureIsIgnored() { doThrow(new NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); - Timer timer = time.timer(100); - ResetPositionsEvent event = new ResetPositionsEvent(timer); + ResetPositionsEvent event = new ResetPositionsEvent(calculateDeadlineMs(time, 100)); applicationEventsQueue.add(event); assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); @@ -200,8 +214,7 @@ public void testResetPositionsProcessFailureIsIgnored() { @Test public void testValidatePositionsEventIsProcessed() { - Timer timer = time.timer(100); - ValidatePositionsEvent e = new ValidatePositionsEvent(timer); + ValidatePositionsEvent e = new ValidatePositionsEvent(calculateDeadlineMs(time, 100)); applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class)); @@ -226,8 +239,7 @@ public void testAssignmentChangeEvent() { @Test void testFetchTopicMetadata() { - Timer timer = time.timer(Long.MAX_VALUE); - applicationEventsQueue.add(new TopicMetadataEvent("topic", timer)); + applicationEventsQueue.add(new TopicMetadataEvent("topic", Long.MAX_VALUE)); consumerNetworkThread.runOnce(); verify(applicationEventProcessor).process(any(TopicMetadataEvent.class)); } @@ -282,6 +294,22 @@ void testEnsureMetadataUpdateOnPoll() { @Test void testEnsureEventsAreCompleted() { + // Mimic the logic of CompletableEventReaper.reap(Collection): + doAnswer(__ -> { + Iterator i = applicationEventsQueue.iterator(); + + while (i.hasNext()) { + ApplicationEvent event = i.next(); + + if (event instanceof CompletableEvent) + ((CompletableEvent) event).future().completeExceptionally(new TimeoutException()); + + i.remove(); + } + + return null; + }).when(applicationEventReaper).reap(any(Collection.class)); + Node node = metadata.fetch().nodes().get(0); coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); @@ -294,12 +322,23 @@ void testEnsureEventsAreCompleted() { applicationEventsQueue.add(event2); assertFalse(future.isDone()); assertFalse(applicationEventsQueue.isEmpty()); - consumerNetworkThread.cleanup(); assertTrue(future.isCompletedExceptionally()); assertTrue(applicationEventsQueue.isEmpty()); } + @Test + void testCleanupInvokesReaper() { + consumerNetworkThread.cleanup(); + verify(applicationEventReaper).reap(applicationEventsQueue); + } + + @Test + void testRunOnceInvokesReaper() { + consumerNetworkThread.runOnce(); + verify(applicationEventReaper).reap(any(Long.class)); + } + private void prepareOffsetCommitRequest(final Map expectedOffsets, final Errors error, final boolean disconnected) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java index 3095b0c6ed91f..9f6fd4a764b0a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java @@ -38,7 +38,6 @@ import org.apache.kafka.common.utils.Timer; import java.io.Closeable; -import java.time.Duration; import java.util.HashMap; import java.util.Optional; import java.util.Properties; @@ -270,7 +269,6 @@ public ConsumerTestBuilder(Optional groupInfo, boolean enableA ); this.applicationEventProcessor = spy(new ApplicationEventProcessor( logContext, - applicationEventQueue, requestManagers, metadata ) @@ -287,32 +285,6 @@ public ConsumerTestBuilder(Optional groupInfo, boolean enableA @Override public void close() { closeQuietly(requestManagers, RequestManagers.class.getSimpleName()); - closeQuietly(applicationEventProcessor, ApplicationEventProcessor.class.getSimpleName()); - } - - public static class ConsumerNetworkThreadTestBuilder extends ConsumerTestBuilder { - - final ConsumerNetworkThread consumerNetworkThread; - - public ConsumerNetworkThreadTestBuilder() { - this(createDefaultGroupInformation()); - } - - public ConsumerNetworkThreadTestBuilder(Optional groupInfo) { - super(groupInfo); - this.consumerNetworkThread = new ConsumerNetworkThread( - logContext, - time, - () -> applicationEventProcessor, - () -> networkClientDelegate, - () -> requestManagers - ); - } - - @Override - public void close() { - consumerNetworkThread.close(Duration.ZERO); - } } public static class GroupInformation { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index fdcb0cdc39953..451743ae2ad83 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -39,6 +38,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -77,7 +77,6 @@ public void setup() { ); processor = new ApplicationEventProcessor( new LogContext(), - applicationEventQueue, requestManagers, metadata ); @@ -93,8 +92,7 @@ public void testPrepClosingCommitEvents() { @Test public void testPrepClosingLeaveGroupEvent() { - Timer timer = time.timer(100); - LeaveOnCloseEvent event = new LeaveOnCloseEvent(timer); + LeaveOnCloseEvent event = new LeaveOnCloseEvent(calculateDeadlineMs(time, 100)); when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager); when(membershipManager.leaveGroup()).thenReturn(CompletableFuture.completedFuture(null)); processor.process(event); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java new file mode 100644 index 0000000000000..460b7368fb9e6 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.ConsumerUtils; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class CompletableEventReaperTest { + + private final LogContext logContext = new LogContext(); + private final Time time = new MockTime(); + private final CompletableEventReaper reaper = new CompletableEventReaper(logContext); + + @Test + public void testExpired() { + // Add a new event to the reaper. + long timeoutMs = 100; + UnsubscribeEvent event = new UnsubscribeEvent(calculateDeadlineMs(time.milliseconds(), timeoutMs)); + reaper.add(event); + + // Without any time passing, we check the reaper and verify that the event is not done amd is still + // being tracked. + reaper.reap(time.milliseconds()); + assertFalse(event.future().isDone()); + assertEquals(1, reaper.size()); + + // Sleep for at least 1 ms. *more* than the timeout so that the event is considered expired. + time.sleep(timeoutMs + 1); + + // However, until we actually invoke the reaper, the event isn't complete and is still being tracked. + assertFalse(event.future().isDone()); + assertEquals(1, reaper.size()); + + // Call the reaper and validate that the event is now "done" (expired), the correct exception type is + // thrown, and the event is no longer tracked. + reaper.reap(time.milliseconds()); + assertTrue(event.future().isDone()); + assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event.future())); + assertEquals(0, reaper.size()); + } + + @Test + public void testCompleted() { + // Add a new event to the reaper. + long timeoutMs = 100; + UnsubscribeEvent event = new UnsubscribeEvent(calculateDeadlineMs(time.milliseconds(), timeoutMs)); + reaper.add(event); + + // Without any time passing, we check the reaper and verify that the event is not done amd is still + // being tracked. + reaper.reap(time.milliseconds()); + assertFalse(event.future().isDone()); + assertEquals(1, reaper.size()); + + // We'll cause the event to be completed normally. Note that because we haven't called the reaper, the + // event is still being tracked. + event.future().complete(null); + assertTrue(event.future().isDone()); + assertEquals(1, reaper.size()); + + // To ensure we don't accidentally expire an event that completed normally, sleep past the timeout. + time.sleep(timeoutMs + 1); + + // Call the reaper and validate that the event is not considered expired, but is still no longer tracked. + reaper.reap(time.milliseconds()); + assertTrue(event.future().isDone()); + assertNull(ConsumerUtils.getResult(event.future())); + assertEquals(0, reaper.size()); + } + + @Test + public void testCompletedAndExpired() { + // Add two events to the reaper. One event will be completed, the other we will let expire. + long timeoutMs = 100; + UnsubscribeEvent event1 = new UnsubscribeEvent(calculateDeadlineMs(time.milliseconds(), timeoutMs)); + UnsubscribeEvent event2 = new UnsubscribeEvent(calculateDeadlineMs(time.milliseconds(), timeoutMs)); + reaper.add(event1); + reaper.add(event2); + + // Without any time passing, we check the reaper and verify that the event is not done amd is still + // being tracked. + reaper.reap(time.milliseconds()); + assertFalse(event1.future().isDone()); + assertFalse(event2.future().isDone()); + assertEquals(2, reaper.size()); + + // We'll cause the first event to be completed normally, but then sleep past the timer deadline. + event1.future().complete(null); + assertTrue(event1.future().isDone()); + + time.sleep(timeoutMs + 1); + + // Though the first event is completed, it's still being tracked, along with the second expired event. + assertEquals(2, reaper.size()); + + // Validate that the first (completed) event is not expired, but the second one is expired. In either case, + // both should be completed and neither should be tracked anymore. + reaper.reap(time.milliseconds()); + assertTrue(event1.future().isDone()); + assertTrue(event2.future().isDone()); + assertNull(ConsumerUtils.getResult(event1.future())); + assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event2.future())); + assertEquals(0, reaper.size()); + } + + @Test + public void testIncompleteQueue() { + long timeoutMs = 100; + UnsubscribeEvent event1 = new UnsubscribeEvent(calculateDeadlineMs(time.milliseconds(), timeoutMs)); + UnsubscribeEvent event2 = new UnsubscribeEvent(calculateDeadlineMs(time.milliseconds(), timeoutMs)); + + Collection> queue = new ArrayList<>(); + queue.add(event1); + queue.add(event2); + + // Complete one of our events, just to make sure it isn't inadvertently canceled. + event1.future().complete(null); + + // In this test, our events aren't tracked in the reaper, just in the queue. + assertEquals(0, reaper.size()); + assertEquals(2, queue.size()); + + // Go ahead and reap the incomplete from the queue. + reaper.reap(queue); + + // The first event was completed, so we didn't expire it in the reaper. + assertTrue(event1.future().isDone()); + assertFalse(event1.future().isCancelled()); + assertNull(ConsumerUtils.getResult(event1.future())); + + // The second event was incomplete, so it was expired. + assertTrue(event2.future().isCompletedExceptionally()); + assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event2.future())); + + // Because the events aren't tracked in the reaper *and* the queue is cleared as part of the + // cancellation process, our data structures should both be the same as above. + assertEquals(0, reaper.size()); + assertEquals(0, queue.size()); + } + + @Test + public void testIncompleteTracked() { + // This queue is just here to test the case where the queue is empty. + Collection> queue = new ArrayList<>(); + + // Add two events for the reaper to track. + long timeoutMs = 100; + UnsubscribeEvent event1 = new UnsubscribeEvent(calculateDeadlineMs(time.milliseconds(), timeoutMs)); + UnsubscribeEvent event2 = new UnsubscribeEvent(calculateDeadlineMs(time.milliseconds(), timeoutMs)); + reaper.add(event1); + reaper.add(event2); + + // Complete one of our events, just to make sure it isn't inadvertently canceled. + event1.future().complete(null); + + // In this test, our events are tracked exclusively in the reaper, not the queue. + assertEquals(2, reaper.size()); + + // Go ahead and reap the incomplete events. Both sets should be zero after that. + reaper.reap(queue); + assertEquals(0, reaper.size()); + assertEquals(0, queue.size()); + + // The first event was completed, so we didn't cancel it in the reaper. + assertTrue(event1.future().isDone()); + assertNull(ConsumerUtils.getResult(event1.future())); + + // The second event was incomplete, so it was canceled. + assertTrue(event2.future().isCompletedExceptionally()); + assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event2.future())); + } +}