Skip to content

Commit

Permalink
Fix Issue apache#12885, Unordered consuming case in Key_Shared subscr…
Browse files Browse the repository at this point in the history
…iption (apache#12890)

(cherry picked from commit 73ef162)
  • Loading branch information
Jason918 authored and eolivelli committed Feb 3, 2022
1 parent 5d25a53 commit a0354a7
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul

protected volatile boolean havePendingRead = false;
protected volatile boolean havePendingReplayRead = false;
protected volatile PositionImpl minReplayedPosition = null;
protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;

Expand Down Expand Up @@ -243,6 +244,7 @@ public synchronized void readMoreEntries() {
}

havePendingReplayRead = true;
minReplayedPosition = messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null);
Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
// clear already acked positions from replay bucket
Expand All @@ -266,8 +268,8 @@ public synchronized void readMoreEntries() {
consumerList.size());
}
havePendingRead = true;
cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(),
this,
minReplayedPosition = getMessagesToReplayNow(1).stream().findFirst().orElse(null);
cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(), this,
ReadType.Normal, topic.getMaxReadPosition());
} else {
log.debug("[{}] Cannot schedule next read until previous one is done", name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,37 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
return;
}

// A corner case that we have to retry a readMoreEntries in order to preserver order delivery.
// This may happen when consumer closed. See issue #12885 for details.
if (!allowOutOfOrderDelivery) {
Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty() && this.minReplayedPosition != null) {
PositionImpl relayPosition = messagesToReplayNow.stream().findFirst().get();
// If relayPosition is a new entry wither smaller position is inserted for redelivery during this async
// read, it is possible that this relayPosition should dispatch to consumer first. So in order to
// preserver order delivery, we need to discard this read result, and try to trigger a replay read,
// that containing "relayPosition", by calling readMoreEntries.
if (relayPosition.compareTo(minReplayedPosition) < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, discard this "
+ "read and retry with readMoreEntries.",
name, relayPosition, minReplayedPosition, readType);
}
if (readType == ReadType.Normal) {
entries.forEach(entry -> {
long stickyKeyHash = getStickyKeyHash(entry);
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
entry.release();
});
} else if (readType == ReadType.Replay) {
entries.forEach(Entry::release);
}
readMoreEntries();
return;
}
}
}

nextStuckConsumers.clear();

final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();
Expand Down

0 comments on commit a0354a7

Please sign in to comment.