Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage #21423

Merged
merged 22 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -862,4 +862,8 @@ default void skipNonRecoverableLedger(long ledgerId){}
* @return whether this cursor is closed.
*/
boolean isClosed();

default boolean isCursorDataFullyPersistable() {
return true;
poorbarcode marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@ public Map<String, Long> getProperties() {
return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
}

@Override
public boolean isCursorDataFullyPersistable() {
return individualDeletedMessages.size() <= config.getMaxUnackedRangesToPersist();
}

@Override
public Map<String, String> getCursorProperties() {
return cursorProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getDispatchRate().updateTopicValue(DispatchRateImpl.normalize(data.getDispatchRate()));
topicPolicies.getSchemaValidationEnforced().updateTopicValue(data.getSchemaValidationEnforced());
topicPolicies.getEntryFilters().updateTopicValue(data.getEntryFilters());
topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled()
.updateTopicValue(data.getDispatcherPauseOnAckStatePersistentEnabled());
this.subscriptionPolicies = data.getSubscriptionPolicies();

updateEntryFilters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
Expand Down Expand Up @@ -141,6 +142,14 @@ default boolean checkAndUnblockIfStuck() {
return false;
}

/**
* A callback hook after acknowledge messages.
* @param exOfDeletion the ex of {@link org.apache.bookkeeper.mledger.ManagedCursor#asyncDelete},
* {@link ManagedCursor#asyncClearBacklog} or {@link ManagedCursor#asyncSkipEntries)}.
* @param ctxOfDeletion the param ctx of calling {@link org.apache.bookkeeper.mledger.ManagedCursor#asyncDelete},
* {@link ManagedCursor#asyncClearBacklog} or {@link ManagedCursor#asyncSkipEntries)}.
*/
default void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion){}

default long getFilterProcessedMsgCount() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,17 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
"totalUnackedMessages");
protected volatile int totalUnackedMessages = 0;
/**
* A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
* Note: It is a tool that helps determine whether it should trigger a new reading after acknowledgments to avoid
* too many CPU circles, see {@link #afterAckMessages(Throwable, Object)} for more details. Do not use this
* to confirm whether the delivery should be paused, please call {@link #shouldPauseOnAckStatePersist}.
*/
protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
BLOCKED_DISPATCHER_ON_CURSOR_DATA_CAN_NOT_FULLY_PERSIST_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
"blockedDispatcherOnCursorDataCanNotFullyPersist");
private volatile int blockedDispatcherOnCursorDataCanNotFullyPersist = FALSE;
private volatile int blockedDispatcherOnUnackedMsgs = FALSE;
protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
Expand All @@ -123,6 +134,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected final ExecutorService dispatchMessagesThread;
private final SharedConsumerAssignor assignor;


protected enum ReadType {
Normal, Replay
}
Expand Down Expand Up @@ -271,9 +283,17 @@ public synchronized void readMoreEntries() {
if (isSendInProgress()) {
// we cannot read more entries while sending the previous batch
// otherwise we could re-read the same entries and send duplicates
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to sending in-progress.",
topic.getName(), getSubscriptionName());
}
return;
}
if (shouldPauseDeliveryForDelayTracker()) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to pause delivery for delay tracker.",
topic.getName(), getSubscriptionName());
}
return;
}
if (topic.isTransferring()) {
Expand Down Expand Up @@ -322,6 +342,13 @@ public synchronized void readMoreEntries() {
totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription());
}
} else if (!havePendingRead) {
if (shouldPauseOnAckStatePersist(ReadType.Normal)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to blocked on ack state persistent.",
topic.getName(), getSubscriptionName());
}
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
consumerList.size());
Expand Down Expand Up @@ -359,6 +386,20 @@ public synchronized void readMoreEntries() {
}
}

private boolean shouldPauseOnAckStatePersist(ReadType readType) {
// Allows new consumers to consume redelivered messages caused by the just-closed consumer.
if (readType != ReadType.Normal) {
return false;
}
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
if (!((PersistentTopic) subscription.getTopic()).isDispatcherPauseOnAckStatePersistentEnabled()) {
return false;
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
}
if (cursor == null) {
return true;
}
return blockedDispatcherOnCursorDataCanNotFullyPersist == TRUE;
}

@Override
protected void reScheduleRead() {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
Expand Down Expand Up @@ -996,6 +1037,29 @@ public void addUnAckedMessages(int numberOfMessages) {
topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
}

@Override
public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) {
if (blockedDispatcherOnCursorDataCanNotFullyPersist == TRUE) {
if (cursor.isCursorDataFullyPersistable()) {
// If there was no previous pause due to cursor data is too large to persist, we don't need to manually
// trigger a new read. This can avoid too many CPU circles.
if (BLOCKED_DISPATCHER_ON_CURSOR_DATA_CAN_NOT_FULLY_PERSIST_UPDATER.compareAndSet(this, TRUE, FALSE)) {
readMoreEntriesAsync();
} else {
// Retry due to conflict update.
afterAckMessages(exOfDeletion, ctxOfDeletion);
}
}
} else {
if (!cursor.isCursorDataFullyPersistable()) {
if (BLOCKED_DISPATCHER_ON_CURSOR_DATA_CAN_NOT_FULLY_PERSIST_UPDATER.compareAndSet(this, FALSE, TRUE)) {
// Retry due to conflict update.
afterAckMessages(exOfDeletion, ctxOfDeletion);
}
}
}
}

public boolean isBlockedDispatcherOnUnackedMsgs() {
return blockedDispatcherOnUnackedMsgs == TRUE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,15 @@ private void readMoreEntries(Consumer consumer) {
// so skip reading more entries if currently there is no active consumer.
if (null == consumer) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read for the topic, Due to the current consumer is null", topic.getName());
log.debug("[{}] [{}] Skipping read for the topic, Due to the current consumer is null", topic.getName(),
getSubscriptionName());
}
return;
}
if (havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
log.debug("[{}] [{}] Skipping read for the topic, Due to we have pending read.", topic.getName(),
getSubscriptionName());
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,10 @@ public void markDeleteComplete(Object ctx) {
topicName, subName, newMD, oldMD);
}
// Signal the dispatchers to give chance to take extra actions
if (dispatcher != null) {
dispatcher.afterAckMessages(null, ctx);
}
// Signal the dispatchers to give chance to take extra actions
notifyTheMarkDeletePositionMoveForwardIfNeeded(oldMD);
}

Expand All @@ -451,22 +455,34 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Failed to mark delete for position {}: {}", topicName, subName, ctx, exception);
}
// Signal the dispatchers to give chance to take extra actions
if (dispatcher != null) {
dispatcher.afterAckMessages(null, ctx);
}
}
};

private final DeleteCallback deleteCallback = new DeleteCallback() {
@Override
public void deleteComplete(Object position) {
public void deleteComplete(Object context) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Deleted message at {}", topicName, subName, position);
// The value of the param "context" is a position.
log.debug("[{}][{}] Deleted message at {}", topicName, subName, context);
}
// Signal the dispatchers to give chance to take extra actions
notifyTheMarkDeletePositionMoveForwardIfNeeded((PositionImpl) position);
if (dispatcher != null) {
dispatcher.afterAckMessages(null, context);
}
notifyTheMarkDeletePositionMoveForwardIfNeeded((PositionImpl) context);
}

@Override
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}][{}] Failed to delete message at {}: {}", topicName, subName, ctx, exception);
// Signal the dispatchers to give chance to take extra actions
if (dispatcher != null) {
dispatcher.afterAckMessages(exception, ctx);
}
}
};

Expand Down Expand Up @@ -645,6 +661,7 @@ public void clearBacklogComplete(Object ctx) {
future.complete(null);
}
});
dispatcher.afterAckMessages(null, ctx);
} else {
future.complete(null);
}
Expand All @@ -654,6 +671,9 @@ public void clearBacklogComplete(Object ctx) {
public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}][{}] Failed to clear backlog", topicName, subName, exception);
future.completeExceptionally(exception);
if (dispatcher != null) {
dispatcher.afterAckMessages(exception, ctx);
}
}
}, null);

Expand All @@ -677,13 +697,19 @@ public void skipEntriesComplete(Object ctx) {
numMessagesToSkip, cursor.getNumberOfEntriesInBacklog(false));
}
future.complete(null);
if (dispatcher != null) {
dispatcher.afterAckMessages(null, ctx);
}
}

@Override
public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}][{}] Failed to skip {} messages", topicName, subName, numMessagesToSkip,
exception);
future.completeExceptionally(exception);
if (dispatcher != null) {
dispatcher.afterAckMessages(exception, ctx);
}
}
}, null);

Expand Down Expand Up @@ -808,6 +834,7 @@ public void resetComplete(Object ctx) {
}
if (dispatcher != null) {
dispatcher.cursorIsReset();
dispatcher.afterAckMessages(null, finalPosition);
}
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
future.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3728,6 +3728,11 @@ public int getMaxUnackedMessagesOnSubscription() {
return topicPolicies.getMaxUnackedMessagesOnSubscription().get();
}

public boolean isDispatcherPauseOnAckStatePersistentEnabled() {
Boolean b = topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled().get();
return b == null ? false : b.booleanValue();
}

@Override
public void onUpdate(TopicPolicies policies) {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand Down Expand Up @@ -104,12 +105,17 @@ public void setup() throws Exception {
doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing();
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
doReturn(false).when(configMock).isAllowOverrideEntryFilters();

pulsarMock = mock(PulsarService.class);
doReturn(configMock).when(pulsarMock).getConfiguration();

EntryFilterProvider mockEntryFilterProvider = mock(EntryFilterProvider.class);
when(mockEntryFilterProvider.getBrokerEntryFilters()).thenReturn(Collections.emptyList());

brokerMock = mock(BrokerService.class);
doReturn(pulsarMock).when(brokerMock).pulsar();
when(brokerMock.getEntryFilterProvider()).thenReturn(mockEntryFilterProvider);

HierarchyTopicPolicies topicPolicies = new HierarchyTopicPolicies();
topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(0);
Expand Down Expand Up @@ -149,6 +155,7 @@ public void setup() throws Exception {
);

subscriptionMock = mock(PersistentSubscription.class);
when(subscriptionMock.getTopic()).thenReturn(topicMock);
persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));
Expand Down
Loading
Loading