diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 6ee35ad295fb5..8fb383cca4427 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1920,6 +1920,11 @@ protected BrokerService newBrokerService(PulsarService pulsar) throws Exception return new BrokerService(pulsar, ioEventLoopGroup); } + @VisibleForTesting + public void setTransactionExecutorProvider(TransactionBufferProvider transactionBufferProvider) { + this.transactionBufferProvider = transactionBufferProvider; + } + private CompactionServiceFactory loadCompactionServiceFactory() { String compactionServiceFactoryClassName = config.getCompactionServiceFactoryClassName(); var compactionServiceFactory = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6603e240ee7d9..6d422db2ec397 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1004,33 +1004,6 @@ public CompletableFuture> getTopic(final String topic, boolean c public CompletableFuture> getTopic(final TopicName topicName, boolean createIfMissing, Map properties) { try { - CompletableFuture> topicFuture = topics.get(topicName.toString()); - if (topicFuture != null) { - if (topicFuture.isCompletedExceptionally() - || (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent())) { - // Exceptional topics should be recreated. - topics.remove(topicName.toString(), topicFuture); - } else { - // a non-existing topic in the cache shouldn't prevent creating a topic - if (createIfMissing) { - if (topicFuture.isDone() && topicFuture.getNow(Optional.empty()).isPresent()) { - return topicFuture; - } else { - return topicFuture.thenCompose(value -> { - if (!value.isPresent()) { - // retry and create topic - return getTopic(topicName, createIfMissing, properties); - } else { - // in-progress future completed successfully - return CompletableFuture.completedFuture(value); - } - }); - } - } else { - return topicFuture; - } - } - } final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent); if (isPersistentTopic) { return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName) @@ -1283,7 +1256,7 @@ private CompletableFuture> createNonPersistentTopic(String topic }).exceptionally(ex -> { log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause()); nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> { - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(ex); }); return null; @@ -1299,7 +1272,7 @@ private CompletableFuture> createNonPersistentTopic(String topic topicFuture.complete(Optional.of(nonPersistentTopic)); // after get metadata return success, we should delete this topic from this broker, because this topic not // owner by this broker and it don't initialize and checkReplication - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic)); return null; }); @@ -1534,10 +1507,12 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S final CompletableFuture> topicFuture = FutureUtil.createFutureWithTimeout( Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); + log.info("create topic future: {} class: {}", topic, topicFuture + "@" + System.identityHashCode(topicFuture)); if (!pulsar.getConfiguration().isEnablePersistentTopics()) { if (log.isDebugEnabled()) { log.debug("Broker is unable to load persistent topic {}", topic); } + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(new NotAllowedException( "Broker is unable to load persistent topic")); return topicFuture; @@ -1556,6 +1531,7 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S // do not recreate topic if topic is already migrated and deleted by broker // so, avoid creating a new topic if migration is already started if (ex != null && (ex.getCause() instanceof TopicMigratedException)) { + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(ex.getCause()); return null; } @@ -1570,6 +1546,7 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S } } }).exceptionally(ex -> { + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(ex.getCause()); return null; }); @@ -1623,7 +1600,7 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean finalProperties, topicPolicies) ).exceptionally(throwable -> { log.warn("[{}] Read topic property failed", topic, throwable); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(throwable); return null; }); @@ -1631,7 +1608,7 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean // namespace is being unloaded String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); } }).exceptionally(ex -> { @@ -1662,7 +1639,7 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, if (isTransactionInternalName(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(new NotAllowedException(msg)); return; } @@ -1744,6 +1721,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + " topic", topic, FutureUtil.getException(topicFuture)); executor().submit(() -> { persistentTopic.close().whenComplete((ignore, ex) -> { + topics.remove(topic); if (ex != null) { log.warn("[{}] Get an error when closing topic.", topic, ex); @@ -1760,6 +1738,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + " Removing topic from topics list {}, {}", topic, ex); executor().submit(() -> { persistentTopic.close().whenComplete((ignore, closeEx) -> { + topics.remove(topic); if (closeEx != null) { log.warn("[{}] Get an error when closing topic.", topic, closeEx); @@ -1771,7 +1750,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { }); } catch (PulsarServerException e) { log.warn("Failed to create topic {}: {}", topic, e.getMessage()); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(e); } } @@ -1784,7 +1763,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { topicFuture.complete(Optional.empty()); } else { log.warn("Failed to create topic {}", topic, exception); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(new PersistenceException(exception)); } } @@ -1794,7 +1773,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception); // remove topic from topics-map in different thread to avoid possible deadlock if // createPersistentTopic-thread only tries to handle this future-result - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(exception); return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 765727aeac319..6084ab06b48d1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -19,12 +19,10 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; -import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -1434,13 +1432,6 @@ public void testCleanupTopic() throws Exception { // Ok } - final CompletableFuture> timedOutTopicFuture = topicFuture; - // timeout topic future should be removed from cache - retryStrategically((test) -> pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5, - 1000); - - assertNotEquals(timedOutTopicFuture, pulsar1.getBrokerService().getTopics().get(topicName)); - try { Consumer consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared) .subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS); @@ -1452,6 +1443,12 @@ public void testCleanupTopic() throws Exception { ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName + "-2"); mlFuture.complete(ml); + // Once ml is created, topic should be removed due timeout. + Awaitility.await().ignoreExceptions().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + assertNull(pulsar1.getBrokerService().getTopics().get(topicName)); + }); + + // Re-create topic will success. Consumer consumer = client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name") .subscriptionType(SubscriptionType.Shared).subscribeAsync() .get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index 7cd9da7574dbb..a399a6fd9f024 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.client.api; +import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; import java.util.List; @@ -27,6 +29,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; @@ -34,6 +37,9 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.TopicPolicyListener; +import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; +import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.compaction.CompactionServiceFactory; @@ -108,6 +114,67 @@ public void testNoOrphanTopicAfterCreateTimeout() throws Exception { pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds); } + @Test + public void testCloseLedgerThatTopicAfterCreateTimeout() throws Exception { + // Make the topic loading timeout faster. + long originalTopicLoadTimeoutSeconds = pulsar.getConfig().getTopicLoadTimeoutSeconds(); + int topicLoadTimeoutSeconds = 1; + pulsar.getConfig().setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds); + pulsar.getConfig().setBrokerDeduplicationEnabled(true); + pulsar.getConfig().setTransactionCoordinatorEnabled(true); + String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2"); + + // Mock message deduplication recovery speed topicLoadTimeoutSeconds + String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + + TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME; + mockZooKeeper.delay(topicLoadTimeoutSeconds * 1000, (op, path) -> { + if (mlPath.equals(path)) { + log.info("Topic load timeout: " + path); + return true; + } + return false; + }); + + // First load topic will trigger timeout + // The first topic load will trigger a timeout. When the topic closes, it will call transactionBuffer.close. + // Here, we simulate a sleep to ensure that the ledger is not immediately closed. + TransactionBufferProvider mockTransactionBufferProvider = new TransactionBufferProvider() { + @Override + public TransactionBuffer newTransactionBuffer(Topic originTopic) { + return new TransactionBufferDisable(originTopic) { + @SneakyThrows + @Override + public CompletableFuture closeAsync() { + Thread.sleep(500); + return super.closeAsync(); + } + }; + } + }; + TransactionBufferProvider originalTransactionBufferProvider = pulsar.getTransactionBufferProvider(); + pulsar.setTransactionExecutorProvider(mockTransactionBufferProvider); + CompletableFuture> firstLoad = pulsar.getBrokerService().getTopic(tpName, true); + Awaitility.await().ignoreExceptions().atMost(5, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + // assert first create topic timeout + .untilAsserted(() -> { + assertTrue(firstLoad.isCompletedExceptionally()); + }); + + // Once the first load topic times out, immediately to load the topic again. + Producer producer = pulsarClient.newProducer().topic(tpName).create(); + for (int i = 0; i < 100; i++) { + MessageId send = producer.send("msg".getBytes()); + assertNotNull(send); + } + + // set to back + pulsar.setTransactionExecutorProvider(originalTransactionBufferProvider); + pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds); + pulsar.getConfig().setBrokerDeduplicationEnabled(false); + pulsar.getConfig().setTransactionCoordinatorEnabled(false); + } + @Test public void testNoOrphanTopicIfInitFailed() throws Exception { String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");