diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 20442d9ad393a..9a88471ee900b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -82,6 +82,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul protected volatile boolean havePendingRead = false; protected volatile boolean havePendingReplayRead = false; + protected volatile PositionImpl minReplayedPosition = null; protected boolean shouldRewindBeforeReadingOrReplaying = false; protected final String name; @@ -243,6 +244,7 @@ public synchronized void readMoreEntries() { } havePendingReplayRead = true; + minReplayedPosition = messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null); Set deletedMessages = topic.isDelayedDeliveryEnabled() ? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow); // clear already acked positions from replay bucket @@ -266,8 +268,8 @@ public synchronized void readMoreEntries() { consumerList.size()); } havePendingRead = true; - cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(), - this, + minReplayedPosition = getMessagesToReplayNow(1).stream().findFirst().orElse(null); + cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(), this, ReadType.Normal, topic.getMaxReadPosition()); } else { log.debug("[{}] Cannot schedule next read until previous one is done", name); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 5fc62246b46b5..ee3a8b7ba10e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -169,6 +169,37 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { return; } + // A corner case that we have to retry a readMoreEntries in order to preserver order delivery. + // This may happen when consumer closed. See issue #12885 for details. + if (!allowOutOfOrderDelivery) { + Set messagesToReplayNow = this.getMessagesToReplayNow(1); + if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty() && this.minReplayedPosition != null) { + PositionImpl relayPosition = messagesToReplayNow.stream().findFirst().get(); + // If relayPosition is a new entry wither smaller position is inserted for redelivery during this async + // read, it is possible that this relayPosition should dispatch to consumer first. So in order to + // preserver order delivery, we need to discard this read result, and try to trigger a replay read, + // that containing "relayPosition", by calling readMoreEntries. + if (relayPosition.compareTo(minReplayedPosition) < 0) { + if (log.isDebugEnabled()) { + log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, discard this " + + "read and retry with readMoreEntries.", + name, relayPosition, minReplayedPosition, readType); + } + if (readType == ReadType.Normal) { + entries.forEach(entry -> { + long stickyKeyHash = getStickyKeyHash(entry); + addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); + entry.release(); + }); + } else if (readType == ReadType.Replay) { + entries.forEach(Entry::release); + } + readMoreEntries(); + return; + } + } + } + nextStuckConsumers.clear(); final Map> groupedEntries = localGroupedEntries.get();