From 8a910b8ac9940f8739e79ce588ac42844c8642d4 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 16 Jan 2025 22:08:44 -0800 Subject: [PATCH] [fix][broker] Revert "[fix][broker] Cancel possible pending replay read in cancelPendingRead (#23384)" (#23855) (cherry picked from commit ea56ada4f3985c93b93c64d1361b3111cd98a37f) (cherry picked from commit 7387653d48cf70cc9d8eab97294e5dec25f768da) --- .../broker/service/AbstractDispatcherMultipleConsumers.java | 4 ++++ .../persistent/PersistentDispatcherMultipleConsumers.java | 3 +-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java index 9fc6b9581a3ac..60908a013c1ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java @@ -71,6 +71,10 @@ public SubType getType() { public abstract boolean isConsumerAvailable(Consumer consumer); + /** + * Cancel a possible pending read that is a Managed Cursor waiting to be notified for more entries. + * This won't cancel any other pending reads that are currently in progress. + */ protected void cancelPendingRead() {} /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 9e8483be1a701..ae844b5784456 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -553,9 +553,8 @@ public synchronized CompletableFuture disconnectAllConsumers(boolean isRes @Override protected void cancelPendingRead() { - if ((havePendingRead || havePendingReplayRead) && cursor.cancelPendingReadRequest()) { + if (havePendingRead && cursor.cancelPendingReadRequest()) { havePendingRead = false; - havePendingReplayRead = false; } }