From 340a19655ab949c39d1d888bc782fe5ece566b84 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 22 Jul 2024 08:45:23 +0800 Subject: [PATCH] [improve] [broker] high CPU usage caused by list topics under namespace (#23049) (cherry picked from commit 3e4f338e91877fb2e4592aa9abc3aced6d4e50c7) --- .../broker/namespace/NamespaceService.java | 23 +++++++++++++++++++ .../pulsar/broker/service/ServerCnx.java | 4 ++-- .../pulsar/broker/service/ServerCnxTest.java | 2 ++ 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index e9eacf570e381..8c97555205da4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -38,6 +38,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -50,6 +51,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -92,6 +94,7 @@ import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; +import org.apache.pulsar.common.topics.TopicList; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.MetadataCache; @@ -164,6 +167,9 @@ public enum AddressType { .register(); + private ConcurrentHashMap>> inProgressQueryUserTopics = + new ConcurrentHashMap<>(); + /** * Default constructor. * @@ -1206,6 +1212,23 @@ public CompletableFuture> getListOfTopics(NamespaceName namespaceNa } } + public CompletableFuture> getListOfUserTopics(NamespaceName namespaceName, Mode mode) { + String key = String.format("%s://%s", mode, namespaceName); + final MutableBoolean initializedByCurrentThread = new MutableBoolean(); + CompletableFuture> queryRes = inProgressQueryUserTopics.computeIfAbsent(key, k -> { + initializedByCurrentThread.setTrue(); + return getListOfTopics(namespaceName, mode).thenApplyAsync(list -> { + return TopicList.filterSystemTopic(list); + }, pulsar.getExecutor()); + }); + if (initializedByCurrentThread.getValue()) { + queryRes.whenComplete((ignore, ex) -> { + inProgressQueryUserTopics.remove(key, queryRes); + }); + } + return queryRes; + } + public CompletableFuture> getAllPartitions(NamespaceName namespaceName) { return getPartitions(namespaceName, TopicDomain.persistent) .thenCombine(getPartitions(namespaceName, TopicDomain.non_persistent), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 473dbecd0c7fd..1d8dbe6ae90f4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2125,11 +2125,11 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet if (lookupSemaphore.tryAcquire()) { isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> { if (isAuthorized) { - getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode) + getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName, mode) .thenAccept(topics -> { boolean filterTopics = false; // filter system topic - List filteredTopics = TopicList.filterSystemTopic(topics); + List filteredTopics = topics; if (enableSubscriptionPatternEvaluation && topicsPattern.isPresent()) { if (topicsPattern.get().length() <= maxSubscriptionPatternLength) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 3b4f6a6f2e499..525f375db8ec2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -264,6 +264,8 @@ public void setup() throws Exception { doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any()); doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics( NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL); + doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics( + NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL); doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfPersistentTopics( NamespaceName.get("use", "ns-abc"));