Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage #21423

Merged
merged 22 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -862,4 +862,8 @@ default void skipNonRecoverableLedger(long ledgerId){}
* @return whether this cursor is closed.
*/
boolean isClosed();

default boolean isCursorDataFullyPersistable() {
return true;
poorbarcode marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@ public Map<String, Long> getProperties() {
return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
}

@Override
public boolean isCursorDataFullyPersistable() {
return individualDeletedMessages.size() <= config.getMaxUnackedRangesToPersist();
}

@Override
public Map<String, String> getCursorProperties() {
return cursorProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getDispatchRate().updateTopicValue(DispatchRateImpl.normalize(data.getDispatchRate()));
topicPolicies.getSchemaValidationEnforced().updateTopicValue(data.getSchemaValidationEnforced());
topicPolicies.getEntryFilters().updateTopicValue(data.getEntryFilters());
topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled()
.updateTopicValue(data.getDispatcherPauseOnAckStatePersistentEnabled());
this.subscriptionPolicies = data.getSubscriptionPolicies();

updateEntryFilters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
Expand Down Expand Up @@ -141,6 +142,14 @@ default boolean checkAndUnblockIfStuck() {
return false;
}

/**
* A callback hook after acknowledge messages.
* @param exOfDeletion the ex of {@link org.apache.bookkeeper.mledger.ManagedCursor#asyncDelete},
* {@link ManagedCursor#asyncClearBacklog} or {@link ManagedCursor#asyncSkipEntries)}.
* @param ctxOfDeletion the param ctx of calling {@link org.apache.bookkeeper.mledger.ManagedCursor#asyncDelete},
* {@link ManagedCursor#asyncClearBacklog} or {@link ManagedCursor#asyncSkipEntries)}.
*/
default void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion){}

default long getFilterProcessedMsgCount() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;

/***
* A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
* Note: do not use this field to confirm whether the delivery should be paused,
* please call {@link PersistentDispatcherMultipleConsumers#shouldPauseOnAckStatePersist}.
*/
public class BlockDispatcherSignatureOnCursorDataCanNotFullyPersist {

/**
* Used to mark that dispatching was paused at least once in the earlier time, due to the cursor data can not be
* fully persistent.
* Why need this filed? It just prevents that
* {@link PersistentDispatcherMultipleConsumers#afterAckMessages(Throwable, Object)} calls
* {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} every time, it can avoid too many CPU circles.
* We just call {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} after the dispatching has been
* paused at least once earlier.
*/
private volatile boolean markerPausedAtLeastOnce;

/**
* Used to mark some acknowledgements were executed.
* Because there is a race condition might cause dispatching stuck, the steps to reproduce the issue is like below:
* - {@link #markerPausedAtLeastOnce} is "false" now.
* - Thread-reading-entries: there are too many ack holes, so start to pause dispatching
* - Thread-ack: acked all messages.
poorbarcode marked this conversation as resolved.
Show resolved Hide resolved
* - Since {@link #markerPausedAtLeastOnce} is "false", skip to trigger a new reading.
* - Thread-reading-entries: Set {@link #markerPausedAtLeastOnce} to "true" and discard the reading task.
* - No longer to trigger a new reading.
* So we add this field to solve the issue:
* - Check ack holes in {@link org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#individualDeletedMessages}.
* - If there is any new acknowledgements when doing the check, redo the check.
*/
private volatile boolean markerNewAcknowledged;

public boolean hasNewAcknowledged() {
return markerNewAcknowledged;
}

public boolean hasPausedAtLeastOnce() {
return markerPausedAtLeastOnce;
}

/** Calling when any messages have been acked. **/
public void markNewAcknowledged() {
/**
* Why not use `compare and swap` here?
* If "markNewAcknowledged" has been override by a "clearMakerNewAcknowledged", it represents that the check
* {@link org.apache.bookkeeper.mledger.ManagedCursor#isCursorDataFullyPersistable()} runs after
* {@link PersistentDispatcherMultipleConsumers#afterAckMessages(Throwable, Object)}, so the thread of read more
* entries would get the newest value of
* {@link org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#individualDeletedMessages}, so the result of
* {@link PersistentDispatcherMultipleConsumers#shouldPauseOnAckStatePersist} would be correct.
*/
markerNewAcknowledged = true;
}

public void clearMakerNewAcknowledged() {
/**
* Why not use `compare and swap` here?
* If "clearMakerNewAcknowledged" has been override by a "markNewAcknowledged", do not worry, it just might
* trigger a new loop for of the method
* {@link PersistentDispatcherMultipleConsumers#shouldPauseOnAckStatePersist}. Everything would be right.
*/
markerNewAcknowledged = false;
}

/** Calling when a dispatching discarded due to cursor data can not be fully persistent. **/
public void markPaused() {
/**
* Why not use `compare and swap` here?
* If "markPaused" has been override by a "clearMarkerAtLeastPausedOnce", it means the method
* {@link PersistentDispatcherMultipleConsumers#afterAckMessages(Throwable, Object)} will trigger a new reading,
* and the state "markerAtLeastPausedOnce" will be reset by the new reading.
*/
markerPausedAtLeastOnce = true;
}

public void clearMarkerPaused() {
/**
* Why not use `compare and swap` here?
* If "clearMarkerAtLeastPausedOnce" has been override by a "markPaused", it just effects that the next
* {@link PersistentDispatcherMultipleConsumers#afterAckMessages(Throwable, Object)} will trigger a new reading
* caused by the wrong state "markerAtLeastPausedOnce". It is not important.
*/
markerPausedAtLeastOnce = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
"totalUnackedMessages");
protected volatile int totalUnackedMessages = 0;
/**
* A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
* Note: do not use this field to confirm whether the delivery should be paused,
poorbarcode marked this conversation as resolved.
Show resolved Hide resolved
* please call {@link #shouldPauseOnAckStatePersist}.
*/
private BlockDispatcherSignatureOnCursorDataCanNotFullyPersist blockSignatureOnCursorDataCanNotFullyPersist =
new BlockDispatcherSignatureOnCursorDataCanNotFullyPersist();
private volatile int blockedDispatcherOnUnackedMsgs = FALSE;
protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
Expand All @@ -123,6 +130,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected final ExecutorService dispatchMessagesThread;
private final SharedConsumerAssignor assignor;


protected enum ReadType {
Normal, Replay
}
Expand Down Expand Up @@ -271,9 +279,17 @@ public synchronized void readMoreEntries() {
if (isSendInProgress()) {
// we cannot read more entries while sending the previous batch
// otherwise we could re-read the same entries and send duplicates
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to sending in-progress.",
topic.getName(), getSubscriptionName());
}
return;
}
if (shouldPauseDeliveryForDelayTracker()) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to pause delivery for delay tracker.",
topic.getName(), getSubscriptionName());
}
return;
}
if (topic.isTransferring()) {
Expand Down Expand Up @@ -322,6 +338,13 @@ public synchronized void readMoreEntries() {
totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription());
}
} else if (!havePendingRead) {
if (shouldPauseOnAckStatePersist(ReadType.Normal)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to blocked on ack state persistent.",
topic.getName(), getSubscriptionName());
}
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
consumerList.size());
Expand Down Expand Up @@ -359,6 +382,41 @@ public synchronized void readMoreEntries() {
}
}

private boolean shouldPauseOnAckStatePersist(ReadType readType) {
// Allows new consumers to consume redelivered messages caused by the just-closed consumer.
if (readType != ReadType.Normal) {
return false;
}
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
if (!((PersistentTopic) subscription.getTopic()).isDispatcherPauseOnAckStatePersistentEnabled()) {
return false;
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
}
if (cursor == null) {
return true;
}

/**
* Double check for "Cursor data can be fully persistent".
* - Clear the marker that represent some acknowledgements were executed.
* - Check whether dispatching should be paused due to cursor data is too large to persistent.
* - If dispatching should be paused, but some acknowledgements have been executed, re-calculate the result.
* - Mark delivery was paused at least once.
*/
// Clear the marker that represent some acknowledgements were executed.
blockSignatureOnCursorDataCanNotFullyPersist.clearMakerNewAcknowledged();
poorbarcode marked this conversation as resolved.
Show resolved Hide resolved
// Check whether dispatching should be paused due to cursor data is too large to persistent.
if (cursor.isCursorDataFullyPersistable()) {
return false;
}
// Mark delivery was paused at least once.
blockSignatureOnCursorDataCanNotFullyPersist.markPaused();
// At this pause, dispatching should be paused, but some acknowledgements have been executed.
// We should re-calculate the result.
if (blockSignatureOnCursorDataCanNotFullyPersist.hasNewAcknowledged()) {
return shouldPauseOnAckStatePersist(readType);
}
return true;
}

@Override
protected void reScheduleRead() {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
Expand Down Expand Up @@ -996,6 +1054,23 @@ public void addUnAckedMessages(int numberOfMessages) {
topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
}

@Override
public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) {
/**
* - Mark an acknowledgement were executed.
* - If there was no previous pause due to cursor data is too large to persist, we don't need to manually
* trigger a new read. This can avoid too many CPU circles.
* - Clear the marker that represent delivery was paused at least once in the earlier time.
*/
blockSignatureOnCursorDataCanNotFullyPersist.markNewAcknowledged();
if (blockSignatureOnCursorDataCanNotFullyPersist.hasPausedAtLeastOnce()
&& cursor.isCursorDataFullyPersistable()) {
// clear paused count, and trigger a new reading.
blockSignatureOnCursorDataCanNotFullyPersist.clearMarkerPaused();
readMoreEntriesAsync();
}
}
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved

public boolean isBlockedDispatcherOnUnackedMsgs() {
return blockedDispatcherOnUnackedMsgs == TRUE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,15 @@ private void readMoreEntries(Consumer consumer) {
// so skip reading more entries if currently there is no active consumer.
if (null == consumer) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read for the topic, Due to the current consumer is null", topic.getName());
log.debug("[{}] [{}] Skipping read for the topic, Due to the current consumer is null", topic.getName(),
getSubscriptionName());
}
return;
}
if (havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
log.debug("[{}] [{}] Skipping read for the topic, Due to we have pending read.", topic.getName(),
getSubscriptionName());
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,17 +456,24 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {

private final DeleteCallback deleteCallback = new DeleteCallback() {
@Override
public void deleteComplete(Object position) {
public void deleteComplete(Object context) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Deleted message at {}", topicName, subName, position);
// The value of the param "context" is a position.
log.debug("[{}][{}] Deleted message at {}", topicName, subName, context);
}
// Signal the dispatchers to give chance to take extra actions
notifyTheMarkDeletePositionMoveForwardIfNeeded((PositionImpl) position);
if (dispatcher != null) {
dispatcher.afterAckMessages(null, context);
}
notifyTheMarkDeletePositionMoveForwardIfNeeded((PositionImpl) context);
}

@Override
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}][{}] Failed to delete message at {}: {}", topicName, subName, ctx, exception);
if (dispatcher != null) {
dispatcher.afterAckMessages(exception, ctx);
}
}
};

Expand Down Expand Up @@ -645,6 +652,7 @@ public void clearBacklogComplete(Object ctx) {
future.complete(null);
}
});
dispatcher.afterAckMessages(null, ctx);
} else {
future.complete(null);
}
Expand All @@ -654,6 +662,9 @@ public void clearBacklogComplete(Object ctx) {
public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}][{}] Failed to clear backlog", topicName, subName, exception);
future.completeExceptionally(exception);
if (dispatcher != null) {
dispatcher.afterAckMessages(exception, ctx);
}
}
}, null);

Expand All @@ -677,13 +688,19 @@ public void skipEntriesComplete(Object ctx) {
numMessagesToSkip, cursor.getNumberOfEntriesInBacklog(false));
}
future.complete(null);
if (dispatcher != null) {
dispatcher.afterAckMessages(null, ctx);
}
}

@Override
public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}][{}] Failed to skip {} messages", topicName, subName, numMessagesToSkip,
exception);
future.completeExceptionally(exception);
if (dispatcher != null) {
dispatcher.afterAckMessages(exception, ctx);
}
}
}, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3728,6 +3728,11 @@ public int getMaxUnackedMessagesOnSubscription() {
return topicPolicies.getMaxUnackedMessagesOnSubscription().get();
}

public boolean isDispatcherPauseOnAckStatePersistentEnabled() {
Boolean b = topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled().get();
return b == null ? false : b.booleanValue();
}

@Override
public void onUpdate(TopicPolicies policies) {
if (log.isDebugEnabled()) {
Expand Down
Loading
Loading