Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] The topic might reference a closed ledger #22739

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1920,6 +1920,11 @@ protected BrokerService newBrokerService(PulsarService pulsar) throws Exception
return new BrokerService(pulsar, ioEventLoopGroup);
}

@VisibleForTesting
public void setTransactionExecutorProvider(TransactionBufferProvider transactionBufferProvider) {
this.transactionBufferProvider = transactionBufferProvider;
}

private CompactionServiceFactory loadCompactionServiceFactory() {
String compactionServiceFactoryClassName = config.getCompactionServiceFactoryClassName();
var compactionServiceFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,9 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString());
if (topicFuture != null) {
if (topicFuture.isCompletedExceptionally()
&& FutureUtil.getException(topicFuture).get().equals(FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION)) {
return FutureUtil.failedFuture(FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
dao-jun marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +1010 to +1011
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to suggest to follow the solution from this PR #22283

  1. The broker should clean up all the topicFutures for any exceptions finally
  2. The new get topic operation should not remove the topicFuture that created by previous get topic operation
  3. The topicFuture should only be created if there is no previous topicFuture. If the previous topicFuture is not removed from the map yet, the broker should always use the existing topicFuture (waiting for completion or return error to the client side directly)

The existing code has mixed them which will introduce contention for the get topic operation. Now, we checked the topic load timeout exception, but don't know how many others we also need to check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with these points. Actually, I tried do that: https://github.com/shibd/pulsar/pull/38/files

But, I got some tests that not pass, So, I used this short-term solution.

I'm going to continue to look into it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new solution refer to this PR: #22860

} else if (topicFuture.isCompletedExceptionally()
|| (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent())) {
// Exceptional topics should be recreated.
topics.remove(topicName.toString(), topicFuture);
Expand Down Expand Up @@ -1744,6 +1747,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ " topic", topic, FutureUtil.getException(topicFuture));
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, ex) -> {
topics.remove(topic, topicFuture);
if (ex != null) {
log.warn("[{}] Get an error when closing topic.",
topic, ex);
Expand All @@ -1760,6 +1764,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ " Removing topic from topics list {}, {}", topic, ex);
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, closeEx) -> {
topics.remove(topic, topicFuture);
if (closeEx != null) {
log.warn("[{}] Get an error when closing topic.",
topic, closeEx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -1434,13 +1432,6 @@
// Ok
}

final CompletableFuture<Optional<Topic>> timedOutTopicFuture = topicFuture;
// timeout topic future should be removed from cache
retryStrategically((test) -> pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unit test assumes a manager ledger that can never be completed to mock topic create timeout, which does not make sense in a production scenario.

Remove this assertion, and add an assertion afterward: Once ml is completed, the topic should be removed due to timeout.

1000);

assertNotEquals(timedOutTopicFuture, pulsar1.getBrokerService().getTopics().get(topicName));

try {
Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS);
Expand All @@ -1452,6 +1443,12 @@
ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName + "-2");
mlFuture.complete(ml);

// Once ml is created, topic should be removed due timeout.
Awaitility.await().ignoreExceptions().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
assertNull(pulsar1.getBrokerService().getTopics().get(topicName));

Check failure on line 1448 in pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java

View workflow job for this annotation

GitHub Actions / CI - Unit - Brokers - Broker Group 1

ReplicatorTest.testCleanupTopic

Assertion condition defined as a org.apache.pulsar.broker.service.ReplicatorTest expected [null] but found [java.util.concurrent.CompletableFuture@5c4d00d8[Completed normally]] within 5 seconds.
});

// Re-create topic will success.
Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).subscribeAsync()
.get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pulsar.client.api;

import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.util.List;
Expand All @@ -27,13 +29,17 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.compaction.CompactionServiceFactory;
Expand Down Expand Up @@ -108,6 +114,67 @@ public void testNoOrphanTopicAfterCreateTimeout() throws Exception {
pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds);
}

@Test
public void testCloseLedgerThatTopicAfterCreateTimeout() throws Exception {
// Make the topic loading timeout faster.
long originalTopicLoadTimeoutSeconds = pulsar.getConfig().getTopicLoadTimeoutSeconds();
int topicLoadTimeoutSeconds = 1;
pulsar.getConfig().setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
pulsar.getConfig().setBrokerDeduplicationEnabled(true);
pulsar.getConfig().setTransactionCoordinatorEnabled(true);
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2");

// Mock message deduplication recovery speed topicLoadTimeoutSeconds
String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" +
TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME;
mockZooKeeper.delay(topicLoadTimeoutSeconds * 1000, (op, path) -> {
if (mlPath.equals(path)) {
log.info("Topic load timeout: " + path);
return true;
}
return false;
});

// First load topic will trigger timeout
// The first topic load will trigger a timeout. When the topic closes, it will call transactionBuffer.close.
// Here, we simulate a sleep to ensure that the ledger is not immediately closed.
TransactionBufferProvider mockTransactionBufferProvider = new TransactionBufferProvider() {
@Override
public TransactionBuffer newTransactionBuffer(Topic originTopic) {
return new TransactionBufferDisable(originTopic) {
@SneakyThrows
@Override
public CompletableFuture<Void> closeAsync() {
Thread.sleep(500);
return super.closeAsync();
}
};
}
};
TransactionBufferProvider originalTransactionBufferProvider = pulsar.getTransactionBufferProvider();
pulsar.setTransactionExecutorProvider(mockTransactionBufferProvider);
CompletableFuture<Optional<Topic>> firstLoad = pulsar.getBrokerService().getTopic(tpName, true);
Awaitility.await().ignoreExceptions().atMost(5, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
// assert first create topic timeout
.untilAsserted(() -> {
assertTrue(firstLoad.isCompletedExceptionally());
});

// Once the first load topic times out, immediately to load the topic again.
Producer<byte[]> producer = pulsarClient.newProducer().topic(tpName).create();
for (int i = 0; i < 100; i++) {
MessageId send = producer.send("msg".getBytes());
assertNotNull(send);
}

// set to back
pulsar.setTransactionExecutorProvider(originalTransactionBufferProvider);
pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds);
pulsar.getConfig().setBrokerDeduplicationEnabled(false);
pulsar.getConfig().setTransactionCoordinatorEnabled(false);
}

@Test
public void testNoOrphanTopicIfInitFailed() throws Exception {
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
Expand Down
Loading