Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Try fix topic link a closed ledger #38

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1920,6 +1920,11 @@ protected BrokerService newBrokerService(PulsarService pulsar) throws Exception
return new BrokerService(pulsar, ioEventLoopGroup);
}

@VisibleForTesting
public void setTransactionExecutorProvider(TransactionBufferProvider transactionBufferProvider) {
this.transactionBufferProvider = transactionBufferProvider;
}

private CompactionServiceFactory loadCompactionServiceFactory() {
String compactionServiceFactoryClassName = config.getCompactionServiceFactoryClassName();
var compactionServiceFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1004,33 +1004,6 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, boolean createIfMissing,
Map<String, String> properties) {
try {
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString());
if (topicFuture != null) {
if (topicFuture.isCompletedExceptionally()
|| (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent())) {
// Exceptional topics should be recreated.
topics.remove(topicName.toString(), topicFuture);
} else {
// a non-existing topic in the cache shouldn't prevent creating a topic
if (createIfMissing) {
if (topicFuture.isDone() && topicFuture.getNow(Optional.empty()).isPresent()) {
return topicFuture;
} else {
return topicFuture.thenCompose(value -> {
if (!value.isPresent()) {
// retry and create topic
return getTopic(topicName, createIfMissing, properties);
} else {
// in-progress future completed successfully
return CompletableFuture.completedFuture(value);
}
});
}
} else {
return topicFuture;
}
}
}
final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent);
if (isPersistentTopic) {
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName)
Expand Down Expand Up @@ -1283,7 +1256,7 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
}).exceptionally(ex -> {
log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(ex);
});
return null;
Expand All @@ -1299,7 +1272,7 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
topicFuture.complete(Optional.of(nonPersistentTopic));
// after get metadata return success, we should delete this topic from this broker, because this topic not
// owner by this broker and it don't initialize and checkReplication
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
pulsar.getExecutor().execute(() -> topics.remove(topic));
return null;
});

Expand Down Expand Up @@ -1534,10 +1507,12 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
final CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.createFutureWithTimeout(
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(),
() -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
log.info("create topic future: {} class: {}", topic, topicFuture + "@" + System.identityHashCode(topicFuture));
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load persistent topic {}", topic);
}
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(new NotAllowedException(
"Broker is unable to load persistent topic"));
return topicFuture;
Expand All @@ -1556,6 +1531,7 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
// do not recreate topic if topic is already migrated and deleted by broker
// so, avoid creating a new topic if migration is already started
if (ex != null && (ex.getCause() instanceof TopicMigratedException)) {
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(ex.getCause());
return null;
}
Expand All @@ -1570,6 +1546,7 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
}
}
}).exceptionally(ex -> {
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(ex.getCause());
return null;
});
Expand Down Expand Up @@ -1623,15 +1600,15 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean
finalProperties, topicPolicies)
).exceptionally(throwable -> {
log.warn("[{}] Read topic property failed", topic, throwable);
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(throwable);
return null;
});
} else {
// namespace is being unloaded
String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic);
log.warn(msg);
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
}
}).exceptionally(ex -> {
Expand Down Expand Up @@ -1662,7 +1639,7 @@ private void createPersistentTopic(final String topic, boolean createIfMissing,
if (isTransactionInternalName(topicName)) {
String msg = String.format("Can not create transaction system topic %s", topic);
log.warn(msg);
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(new NotAllowedException(msg));
return;
}
Expand Down Expand Up @@ -1744,6 +1721,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ " topic", topic, FutureUtil.getException(topicFuture));
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, ex) -> {
topics.remove(topic);
if (ex != null) {
log.warn("[{}] Get an error when closing topic.",
topic, ex);
Expand All @@ -1760,6 +1738,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ " Removing topic from topics list {}, {}", topic, ex);
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, closeEx) -> {
topics.remove(topic);
if (closeEx != null) {
log.warn("[{}] Get an error when closing topic.",
topic, closeEx);
Expand All @@ -1771,7 +1750,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
});
} catch (PulsarServerException e) {
log.warn("Failed to create topic {}: {}", topic, e.getMessage());
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(e);
}
}
Expand All @@ -1784,7 +1763,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
topicFuture.complete(Optional.empty());
} else {
log.warn("Failed to create topic {}", topic, exception);
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(new PersistenceException(exception));
}
}
Expand All @@ -1794,7 +1773,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception);
// remove topic from topics-map in different thread to avoid possible deadlock if
// createPersistentTopic-thread only tries to handle this future-result
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(exception);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -1434,13 +1432,6 @@ public void testCleanupTopic() throws Exception {
// Ok
}

final CompletableFuture<Optional<Topic>> timedOutTopicFuture = topicFuture;
// timeout topic future should be removed from cache
retryStrategically((test) -> pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5,
1000);

assertNotEquals(timedOutTopicFuture, pulsar1.getBrokerService().getTopics().get(topicName));

try {
Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS);
Expand All @@ -1452,6 +1443,12 @@ public void testCleanupTopic() throws Exception {
ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName + "-2");
mlFuture.complete(ml);

// Once ml is created, topic should be removed due timeout.
Awaitility.await().ignoreExceptions().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
assertNull(pulsar1.getBrokerService().getTopics().get(topicName));
});

// Re-create topic will success.
Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).subscribeAsync()
.get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pulsar.client.api;

import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.util.List;
Expand All @@ -27,13 +29,17 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.compaction.CompactionServiceFactory;
Expand Down Expand Up @@ -108,6 +114,67 @@ public void testNoOrphanTopicAfterCreateTimeout() throws Exception {
pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds);
}

@Test
public void testCloseLedgerThatTopicAfterCreateTimeout() throws Exception {
// Make the topic loading timeout faster.
long originalTopicLoadTimeoutSeconds = pulsar.getConfig().getTopicLoadTimeoutSeconds();
int topicLoadTimeoutSeconds = 1;
pulsar.getConfig().setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
pulsar.getConfig().setBrokerDeduplicationEnabled(true);
pulsar.getConfig().setTransactionCoordinatorEnabled(true);
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2");

// Mock message deduplication recovery speed topicLoadTimeoutSeconds
String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" +
TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME;
mockZooKeeper.delay(topicLoadTimeoutSeconds * 1000, (op, path) -> {
if (mlPath.equals(path)) {
log.info("Topic load timeout: " + path);
return true;
}
return false;
});

// First load topic will trigger timeout
// The first topic load will trigger a timeout. When the topic closes, it will call transactionBuffer.close.
// Here, we simulate a sleep to ensure that the ledger is not immediately closed.
TransactionBufferProvider mockTransactionBufferProvider = new TransactionBufferProvider() {
@Override
public TransactionBuffer newTransactionBuffer(Topic originTopic) {
return new TransactionBufferDisable(originTopic) {
@SneakyThrows
@Override
public CompletableFuture<Void> closeAsync() {
Thread.sleep(500);
return super.closeAsync();
}
};
}
};
TransactionBufferProvider originalTransactionBufferProvider = pulsar.getTransactionBufferProvider();
pulsar.setTransactionExecutorProvider(mockTransactionBufferProvider);
CompletableFuture<Optional<Topic>> firstLoad = pulsar.getBrokerService().getTopic(tpName, true);
Awaitility.await().ignoreExceptions().atMost(5, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
// assert first create topic timeout
.untilAsserted(() -> {
assertTrue(firstLoad.isCompletedExceptionally());
});

// Once the first load topic times out, immediately to load the topic again.
Producer<byte[]> producer = pulsarClient.newProducer().topic(tpName).create();
for (int i = 0; i < 100; i++) {
MessageId send = producer.send("msg".getBytes());
assertNotNull(send);
}

// set to back
pulsar.setTransactionExecutorProvider(originalTransactionBufferProvider);
pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds);
pulsar.getConfig().setBrokerDeduplicationEnabled(false);
pulsar.getConfig().setTransactionCoordinatorEnabled(false);
}

@Test
public void testNoOrphanTopicIfInitFailed() throws Exception {
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
Expand Down
Loading