Skip to content

Commit

Permalink
[improve broker] Stop dispatch messages if the individual acks will b…
Browse files Browse the repository at this point in the history
…e lost in the persistent storage
  • Loading branch information
poorbarcode committed Dec 20, 2023
1 parent 181b20b commit 223cb55
Show file tree
Hide file tree
Showing 11 changed files with 339 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -862,4 +862,8 @@ default void skipNonRecoverableLedger(long ledgerId){}
* @return whether this cursor is closed.
*/
boolean isClosed();

default boolean isMetadataTooLargeToPersist() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@ public Map<String, Long> getProperties() {
return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
}

@Override
public boolean isMetadataTooLargeToPersist() {
return individualDeletedMessages.size() > config.getMaxUnackedRangesToPersist();
}

@Override
public Map<String, String> getCursorProperties() {
return cursorProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getDispatchRate().updateTopicValue(DispatchRateImpl.normalize(data.getDispatchRate()));
topicPolicies.getSchemaValidationEnforced().updateTopicValue(data.getSchemaValidationEnforced());
topicPolicies.getEntryFilters().updateTopicValue(data.getEntryFilters());
topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled()
.updateTopicValue(data.getDispatcherPauseOnAckStatePersistentEnabled());
this.subscriptionPolicies = data.getSubscriptionPolicies();

updateEntryFilters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ default boolean checkAndUnblockIfStuck() {
return false;
}

/**
* A callback hook after acknowledge messages.
* If acknowledge successfully, {@param position} will not be null, and {@param position} and {@param ctx} will be
* null.
* If acknowledge failed. {@param position} will be null, and {@param position} and {@param ctx} will not be null.
*/
default void afterAckMessages(Object position, Throwable error, Object ctx){}

default long getFilterProcessedMsgCount() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,17 @@ public synchronized void readMoreEntries() {
if (isSendInProgress()) {
// we cannot read more entries while sending the previous batch
// otherwise we could re-read the same entries and send duplicates
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to sending in-progress.",
topic.getName(), getSubscriptionName());
}
return;
}
if (shouldPauseDeliveryForDelayTracker()) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to pause delivery for delay tracker.",
topic.getName(), getSubscriptionName());
}
return;
}
if (topic.isTransferring()) {
Expand Down Expand Up @@ -322,6 +330,13 @@ public synchronized void readMoreEntries() {
totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription());
}
} else if (!havePendingRead) {
if (shouldPauseOnAckStatePersist(ReadType.Normal)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to blocked on ack state persistent.",
topic.getName(), getSubscriptionName());
}
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
consumerList.size());
Expand Down Expand Up @@ -359,6 +374,19 @@ public synchronized void readMoreEntries() {
}
}

private boolean shouldPauseOnAckStatePersist(ReadType readType) {
if (readType != ReadType.Normal) {
return false;
}
if (!((PersistentTopic) subscription.getTopic()).isDispatcherPauseOnAckStatePersistentEnabled()) {
return false;
}
if (cursor == null) {
return true;
}
return cursor.isMetadataTooLargeToPersist();
}

@Override
protected void reScheduleRead() {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
Expand Down Expand Up @@ -996,6 +1024,13 @@ public void addUnAckedMessages(int numberOfMessages) {
topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
}

@Override
public void afterAckMessages(Object position, Throwable error, Object ctx) {
if (!cursor.isMetadataTooLargeToPersist()) {
readMoreEntriesAsync();
}
}

public boolean isBlockedDispatcherOnUnackedMsgs() {
return blockedDispatcherOnUnackedMsgs == TRUE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.google.common.collect.Range;
import io.netty.util.Recycler;
import java.util.Iterator;
import java.util.List;
Expand All @@ -36,6 +37,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentWaitCallbackException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
Expand Down Expand Up @@ -319,13 +321,22 @@ private void readMoreEntries(Consumer consumer) {
// so skip reading more entries if currently there is no active consumer.
if (null == consumer) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read for the topic, Due to the current consumer is null", topic.getName());
log.debug("[{}] [{}] Skipping read for the topic, Due to the current consumer is null", topic.getName(),
getSubscriptionName());
}
return;
}
if (havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
log.debug("[{}] [{}] Skipping read for the topic, Due to we have pending read.", topic.getName(),
getSubscriptionName());
}
return;
}
if (shouldPauseOnAckStatePersist()) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to blocked on ack state persistent.",
topic.getName(), getSubscriptionName());
}
return;
}
Expand Down Expand Up @@ -378,6 +389,30 @@ private void readMoreEntries(Consumer consumer) {
}
}

private boolean shouldPauseOnAckStatePersist() {
if (!((PersistentTopic) subscription.getTopic()).isDispatcherPauseOnAckStatePersistentEnabled()) {
return false;
}
if (cursor == null) {
return true;
}
if (!cursor.isMetadataTooLargeToPersist()) {
return false;
}
// The cursor state is too large to persist, let us check whether the read is a replay read.
Range<PositionImpl> lastIndividualDeletedRange = cursor.getLastIndividualDeletedRange();
if (lastIndividualDeletedRange == null) {
// lastIndividualDeletedRange is null means the read is not replay read.
return true;
}
// If read position is less than the last acked position, it means the read is a replay read.
PositionImpl lastAckedPosition = lastIndividualDeletedRange.upperEndpoint();
Position readPosition = cursor.getReadPosition();
boolean readPositionIsSmall =
lastAckedPosition.compareTo(readPosition.getLedgerId(), readPosition.getEntryId()) > 0;
return !readPositionIsSmall;
}

@Override
protected void reScheduleRead() {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
Expand Down Expand Up @@ -549,6 +584,13 @@ public void addUnAckedMessages(int unAckMessages) {
// No-op
}

@Override
public void afterAckMessages(Object position, Throwable error, Object ctx) {
if (!shouldPauseOnAckStatePersist()) {
readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this));
}
}

@Override
public RedeliveryTracker getRedeliveryTracker() {
return redeliveryTracker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,12 +461,14 @@ public void deleteComplete(Object position) {
log.debug("[{}][{}] Deleted message at {}", topicName, subName, position);
}
// Signal the dispatchers to give chance to take extra actions
dispatcher.afterAckMessages(position, null, null);
notifyTheMarkDeletePositionMoveForwardIfNeeded((PositionImpl) position);
}

@Override
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}][{}] Failed to delete message at {}: {}", topicName, subName, ctx, exception);
dispatcher.afterAckMessages(null, exception, ctx);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3728,6 +3728,11 @@ public int getMaxUnackedMessagesOnSubscription() {
return topicPolicies.getMaxUnackedMessagesOnSubscription().get();
}

public boolean isDispatcherPauseOnAckStatePersistentEnabled() {
Boolean b = topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled().get();
return b == null ? false : b.booleanValue();
}

@Override
public void onUpdate(TopicPolicies policies) {
if (log.isDebugEnabled()) {
Expand Down
Loading

0 comments on commit 223cb55

Please sign in to comment.