Skip to content

Commit

Permalink
Fix Issue #12885, Unordered consuming case in Key_Shared subscription (
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason918 authored Nov 30, 2021
1 parent 1b20a97 commit 73ef162
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul

protected volatile boolean havePendingRead = false;
protected volatile boolean havePendingReplayRead = false;
protected volatile PositionImpl minReplayedPosition = null;
protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;

Expand Down Expand Up @@ -244,6 +245,7 @@ public synchronized void readMoreEntries() {
}

havePendingReplayRead = true;
minReplayedPosition = messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null);
Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
// clear already acked positions from replay bucket
Expand All @@ -267,6 +269,7 @@ public synchronized void readMoreEntries() {
consumerList.size());
}
havePendingRead = true;
minReplayedPosition = getMessagesToReplayNow(1).stream().findFirst().orElse(null);
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
ReadType.Normal, topic.getMaxReadPosition());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,37 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
return;
}

// A corner case that we have to retry a readMoreEntries in order to preserver order delivery.
// This may happen when consumer closed. See issue #12885 for details.
if (!allowOutOfOrderDelivery) {
Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty() && this.minReplayedPosition != null) {
PositionImpl relayPosition = messagesToReplayNow.stream().findFirst().get();
// If relayPosition is a new entry wither smaller position is inserted for redelivery during this async
// read, it is possible that this relayPosition should dispatch to consumer first. So in order to
// preserver order delivery, we need to discard this read result, and try to trigger a replay read,
// that containing "relayPosition", by calling readMoreEntries.
if (relayPosition.compareTo(minReplayedPosition) < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, discard this "
+ "read and retry with readMoreEntries.",
name, relayPosition, minReplayedPosition, readType);
}
if (readType == ReadType.Normal) {
entries.forEach(entry -> {
long stickyKeyHash = getStickyKeyHash(entry);
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
entry.release();
});
} else if (readType == ReadType.Replay) {
entries.forEach(Entry::release);
}
readMoreEntries();
return;
}
}
}

nextStuckConsumers.clear();

final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();
Expand Down

0 comments on commit 73ef162

Please sign in to comment.