From 4862fee1275c4773211a56656e3b3c7b673a3fdb Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 12 Sep 2024 18:20:33 +0800 Subject: [PATCH 1/6] Revert "[improve][broker] Add retry for start service unit state channel (ExtensibleLoadManagerImpl only) (#23230)" This reverts commit 8bb30a1106e8bbe5a76c14932a59805a278b9dd4. --- .../extensions/ExtensibleLoadManagerImpl.java | 59 ++----------------- 1 file changed, 6 insertions(+), 53 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 40efa6390a78a..95882cfb21b3c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -99,10 +99,7 @@ import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.stats.Metrics; -import org.apache.pulsar.common.util.Backoff; -import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; import org.slf4j.Logger; @@ -125,10 +122,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024; - public static final int STARTUP_TIMEOUT_SECONDS = 30; - - public static final int MAX_RETRY = 5; - private static final String ELECTION_ROOT = "/loadbalance/extension/leader"; public static final Set INTERNAL_TOPICS = @@ -408,43 +401,10 @@ public void start() throws PulsarServerException { this.serviceUnitStateChannel.listen(splitManager); this.leaderElectionService.start(); pulsar.runWhenReadyForIncomingRequests(() -> { - Backoff backoff = new BackoffBuilder() - .setInitialTime(100, TimeUnit.MILLISECONDS) - .setMax(STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS) - .create(); - int retry = 0; - while (!Thread.currentThread().isInterrupted()) { - try { - brokerRegistry.register(); - this.serviceUnitStateChannel.start(); - break; - } catch (Exception e) { - log.warn("The broker:{} failed to start service unit state channel. Retrying {} th ...", - pulsar.getBrokerId(), ++retry, e); - try { - Thread.sleep(backoff.next()); - } catch (InterruptedException ex) { - log.warn("Interrupted while sleeping."); - // preserve thread's interrupt status - Thread.currentThread().interrupt(); - try { - pulsar.close(); - } catch (PulsarServerException exc) { - log.error("Failed to close pulsar service.", exc); - } - return; - } - failStarting(e); - if (retry >= MAX_RETRY) { - log.error("Failed to start the service unit state channel after retry {} th. " - + "Closing pulsar service.", retry, e); - try { - pulsar.close(); - } catch (PulsarServerException ex) { - log.error("Failed to close pulsar service.", ex); - } - } - } + try { + this.serviceUnitStateChannel.start(); + } catch (Exception e) { + failStarting(e); } }); this.antiAffinityGroupPolicyHelper = @@ -538,15 +498,8 @@ private void failStarting(Exception ex) { this.brokerRegistry, ex); if (this.brokerRegistry != null) { try { - brokerRegistry.unregister(); - } catch (MetadataStoreException e) { - // ignore - } - } - if (this.serviceUnitStateChannel != null) { - try { - serviceUnitStateChannel.close(); - } catch (IOException e) { + brokerRegistry.close(); + } catch (PulsarServerException e) { // ignore } } From 2a8ecc0ea849d6dce0a61b5c8c33c2a455651a2c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 12 Sep 2024 21:39:50 +0800 Subject: [PATCH 2/6] Fail fast when starting the extensible load manager --- .../pulsar/broker/PulsarServerException.java | 17 +++++++ .../apache/pulsar/broker/PulsarService.java | 2 +- .../extensions/ExtensibleLoadManagerImpl.java | 46 +++++++++---------- 3 files changed, 39 insertions(+), 26 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java index 2235b9a7128b8..d7c0d0adb3afc 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker; import java.io.IOException; +import java.util.concurrent.CompletionException; public class PulsarServerException extends IOException { private static final long serialVersionUID = 1; @@ -44,4 +45,20 @@ public NotFoundException(Throwable t) { super(t); } } + + public static PulsarServerException from(Throwable throwable) { + if (throwable instanceof CompletionException) { + return from(throwable.getCause()); + } + if (throwable instanceof PulsarServerException pulsarServerException) { + return pulsarServerException; + } else { + return new PulsarServerException(throwable); + } + } + + // Wrap this checked exception into a specific unchecked exception + public static CompletionException toUncheckedException(PulsarServerException e) { + return new CompletionException(e); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index b2e67bf4883dd..425e7dafa1bf8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1080,7 +1080,7 @@ public void start() throws PulsarServerException { state = State.Started; } catch (Exception e) { LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e); - PulsarServerException startException = new PulsarServerException(e); + PulsarServerException startException = PulsarServerException.from(e); readyForIncomingRequestsFuture.completeExceptionally(startException); throw startException; } finally { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 95882cfb21b3c..a1157360ab147 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -81,7 +81,6 @@ import org.apache.pulsar.broker.loadbalance.extensions.scheduler.SplitScheduler; import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; -import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory; import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy; import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategyFactory; @@ -400,13 +399,7 @@ public void start() throws PulsarServerException { this.serviceUnitStateChannel.listen(unloadManager); this.serviceUnitStateChannel.listen(splitManager); this.leaderElectionService.start(); - pulsar.runWhenReadyForIncomingRequests(() -> { - try { - this.serviceUnitStateChannel.start(); - } catch (Exception e) { - failStarting(e); - } - }); + this.antiAffinityGroupPolicyHelper = new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel); antiAffinityGroupPolicyHelper.listenFailureDomainUpdate(); @@ -415,15 +408,10 @@ public void start() throws PulsarServerException { SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar); this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies); this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper)); - - try { - this.brokerLoadDataStore = LoadDataStoreFactory - .create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class); - this.topBundlesLoadDataStore = LoadDataStoreFactory - .create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class); - } catch (LoadDataStoreException e) { - throw new PulsarServerException(e); - } + this.brokerLoadDataStore = LoadDataStoreFactory + .create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class); + this.topBundlesLoadDataStore = LoadDataStoreFactory + .create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class); this.context = LoadManagerContextImpl.builder() .configuration(conf) @@ -447,6 +435,7 @@ public void start() throws PulsarServerException { pulsar.runWhenReadyForIncomingRequests(() -> { try { + this.serviceUnitStateChannel.start(); var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis(); this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor() @@ -484,28 +473,30 @@ public void start() throws PulsarServerException { this.initWaiter.complete(null); this.started = true; log.info("Started load manager."); - } catch (Exception ex) { - failStarting(ex); + } catch (Throwable e) { + failStarting(e); } }); - } catch (Exception ex) { + } catch (Throwable ex) { failStarting(ex); } } - private void failStarting(Exception ex) { - log.error("Failed to start the extensible load balance and close broker registry {}.", - this.brokerRegistry, ex); + private void failStarting(Throwable throwable) { if (this.brokerRegistry != null) { try { brokerRegistry.close(); } catch (PulsarServerException e) { - // ignore + // If close failed, this broker might still exist in the metadata store. Then it could be found by other + // brokers as an available broker. Hence, print a warning log for it. + log.warn("Failed to close the broker registry: {}", e.getMessage()); } } - initWaiter.completeExceptionally(ex); + initWaiter.completeExceptionally(new InterruptedException()); // exit the background thread gracefully + throw PulsarServerException.toUncheckedException(PulsarServerException.from(throwable)); } + @Override public void initialize(PulsarService pulsar) { this.pulsar = pulsar; @@ -865,6 +856,8 @@ synchronized void playLeader() { unloadScheduler.start(); serviceUnitStateChannel.scheduleOwnershipMonitor(); break; + } catch (InterruptedException ignored) { + return; } catch (Throwable e) { log.warn("The broker:{} failed to set the role. Retrying {} th ...", pulsar.getBrokerId(), ++retry, e); @@ -912,6 +905,8 @@ synchronized void playFollower() { topBundlesLoadDataStore.close(); topBundlesLoadDataStore.startProducer(); break; + } catch (InterruptedException ignored) { + return; } catch (Throwable e) { log.warn("The broker:{} failed to set the role. Retrying {} th ...", pulsar.getBrokerId(), ++retry, e); @@ -994,6 +989,7 @@ protected void monitor() { playFollower(); } } + } catch (InterruptedException ignored) { } catch (Throwable e) { log.error("Failed to get the channel ownership.", e); } From 9a97599047e34ebe2104aeb054244ca880d09da1 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 12 Sep 2024 22:35:16 +0800 Subject: [PATCH 3/6] Add LoadManagerFailFastTest --- .../extensions/ExtensibleLoadManagerImpl.java | 36 ++++-- .../extensions/LoadManagerFailFastTest.java | 114 ++++++++++++++++++ 2 files changed, 139 insertions(+), 11 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index a1157360ab147..b95661f14b28d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -204,7 +205,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS private final ConcurrentHashMap>> lookupRequests = new ConcurrentHashMap<>(); - private final CompletableFuture initWaiter = new CompletableFuture<>(); + private final CompletableFuture initWaiter = new CompletableFuture<>(); /** * Get all the bundles that are owned by this broker. @@ -377,7 +378,7 @@ public void start() throws PulsarServerException { return; } try { - this.brokerRegistry = new BrokerRegistryImpl(pulsar); + this.brokerRegistry = createBrokerRegistry(pulsar); this.leaderElectionService = new LeaderElectionService( pulsar.getCoordinationService(), pulsar.getBrokerId(), pulsar.getSafeWebServiceAddress(), ELECTION_ROOT, @@ -392,7 +393,7 @@ public void start() throws PulsarServerException { }); }); }); - this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar); + this.serviceUnitStateChannel = createServiceUnitStateChannel(pulsar); this.brokerRegistry.start(); this.splitManager = new SplitManager(splitCounter); this.unloadManager = new UnloadManager(unloadCounter, pulsar.getBrokerId()); @@ -470,7 +471,7 @@ public void start() throws PulsarServerException { MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS); this.splitScheduler.start(); - this.initWaiter.complete(null); + this.initWaiter.complete(true); this.started = true; log.info("Started load manager."); } catch (Throwable e) { @@ -492,7 +493,7 @@ private void failStarting(Throwable throwable) { log.warn("Failed to close the broker registry: {}", e.getMessage()); } } - initWaiter.completeExceptionally(new InterruptedException()); // exit the background thread gracefully + initWaiter.complete(false); // exit the background thread gracefully throw PulsarServerException.toUncheckedException(PulsarServerException.from(throwable)); } @@ -841,7 +842,9 @@ synchronized void playLeader() { boolean becameFollower = false; while (!Thread.currentThread().isInterrupted()) { try { - initWaiter.get(); + if (!initWaiter.get()) { + return; + } if (!serviceUnitStateChannel.isChannelOwner()) { becameFollower = true; break; @@ -856,8 +859,6 @@ synchronized void playLeader() { unloadScheduler.start(); serviceUnitStateChannel.scheduleOwnershipMonitor(); break; - } catch (InterruptedException ignored) { - return; } catch (Throwable e) { log.warn("The broker:{} failed to set the role. Retrying {} th ...", pulsar.getBrokerId(), ++retry, e); @@ -893,7 +894,9 @@ synchronized void playFollower() { boolean becameLeader = false; while (!Thread.currentThread().isInterrupted()) { try { - initWaiter.get(); + if (!initWaiter.get()) { + return; + } if (serviceUnitStateChannel.isChannelOwner()) { becameLeader = true; break; @@ -966,7 +969,9 @@ private List getIgnoredCommandMetrics(String advertisedBrokerAddress) { @VisibleForTesting protected void monitor() { try { - initWaiter.get(); + if (!initWaiter.get()) { + return; + } // Monitor role // Periodically check the role in case ZK watcher fails. @@ -989,7 +994,6 @@ protected void monitor() { playFollower(); } } - } catch (InterruptedException ignored) { } catch (Throwable e) { log.error("Failed to get the channel ownership.", e); } @@ -1022,4 +1026,14 @@ private void closeInternalTopics() { log.warn("Failed to wait for closing internal topics", e); } } + + @VisibleForTesting + protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) { + return new BrokerRegistryImpl(pulsar); + } + + @VisibleForTesting + protected ServiceUnitStateChannel createServiceUnitStateChannel(PulsarService pulsar) { + return new ServiceUnitStateChannelImpl(pulsar); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java new file mode 100644 index 0000000000000..f389ac070583e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import java.util.Optional; +import lombok.Cleanup; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; +import org.apache.pulsar.common.util.PortManager; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class LoadManagerFailFastTest { + + private static final String cluster = "test"; + private final int zkPort = PortManager.nextLockedFreePort(); + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextLockedFreePort); + private final ServiceConfiguration config = new ServiceConfiguration(); + + @BeforeClass + protected void setup() throws Exception { + bk.start(); + config.setClusterName(cluster); + config.setAdvertisedAddress("localhost"); + config.setBrokerServicePort(Optional.of(0)); + config.setWebServicePort(Optional.of(0)); + config.setMetadataStoreUrl("zk:localhost:" + zkPort); + } + + @AfterClass + protected void cleanup() throws Exception { + bk.stop(); + } + + @Test(timeOut = 30000) + public void testBrokerRegistryFailure() throws Exception { + config.setLoadManagerClassName(BrokerRegistryLoadManager.class.getName()); + @Cleanup final var pulsar = new PulsarService(config); + try { + pulsar.start(); + Assert.fail(); + } catch (PulsarServerException e) { + Assert.assertNull(e.getCause()); + Assert.assertEquals(e.getMessage(), "Cannot start BrokerRegistry"); + } + } + + @Test(timeOut = 30000) + public void testServiceUnitStateChannelFailure() throws Exception { + config.setLoadManagerClassName(ChannelLoadManager.class.getName()); + @Cleanup final var pulsar = new PulsarService(config); + try { + pulsar.start(); + Assert.fail(); + } catch (PulsarServerException e) { + Assert.assertNull(e.getCause()); + Assert.assertEquals(e.getMessage(), "Cannot start ServiceUnitStateChannel"); + } + } + + private static class BrokerRegistryLoadManager extends ExtensibleLoadManagerImpl { + + @Override + protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) { + final var mockBrokerRegistry = Mockito.mock(BrokerRegistryImpl.class); + try { + Mockito.doThrow(new PulsarServerException("Cannot start BrokerRegistry")).when(mockBrokerRegistry) + .start(); + } catch (PulsarServerException e) { + throw new RuntimeException(e); + } + return mockBrokerRegistry; + } + } + + private static class ChannelLoadManager extends ExtensibleLoadManagerImpl { + + @Override + protected ServiceUnitStateChannel createServiceUnitStateChannel(PulsarService pulsar) { + final var channel = Mockito.mock(ServiceUnitStateChannelImpl.class); + try { + Mockito.doThrow(new PulsarServerException("Cannot start ServiceUnitStateChannel")).when(channel) + .start(); + } catch (PulsarServerException e) { + throw new RuntimeException(e); + } + Mockito.doAnswer(__ -> null).when(channel).listen(Mockito.any()); + return channel; + } + } +} From 8cb9f21048c3c994ebe0eadaace76dfa92cc00c0 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 12 Sep 2024 22:40:13 +0800 Subject: [PATCH 4/6] Assert the broker is unregistered --- .../loadbalance/extensions/LoadManagerFailFastTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java index f389ac070583e..41f7161fd315f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java @@ -23,11 +23,13 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.mockito.Mockito; +import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -66,6 +68,8 @@ public void testBrokerRegistryFailure() throws Exception { Assert.assertNull(e.getCause()); Assert.assertEquals(e.getMessage(), "Cannot start BrokerRegistry"); } + Assert.assertTrue(pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get() + .isEmpty()); } @Test(timeOut = 30000) @@ -79,6 +83,8 @@ public void testServiceUnitStateChannelFailure() throws Exception { Assert.assertNull(e.getCause()); Assert.assertEquals(e.getMessage(), "Cannot start ServiceUnitStateChannel"); } + Awaitility.await().untilAsserted(() -> Assert.assertTrue(pulsar.getLocalMetadataStore() + .getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get().isEmpty())); } private static class BrokerRegistryLoadManager extends ExtensibleLoadManagerImpl { From 19e47afe24cccf60c5d77d1d2fb1d892ce493991 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 12 Sep 2024 22:56:27 +0800 Subject: [PATCH 5/6] Remove outdated code --- .../loadbalance/extensions/ExtensibleLoadManagerImpl.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index b95661f14b28d..59ee4a802e1e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -908,8 +908,6 @@ synchronized void playFollower() { topBundlesLoadDataStore.close(); topBundlesLoadDataStore.startProducer(); break; - } catch (InterruptedException ignored) { - return; } catch (Throwable e) { log.warn("The broker:{} failed to set the role. Retrying {} th ...", pulsar.getBrokerId(), ++retry, e); From db8cf29e35aa8900950ffc64987b8c17da3f5b34 Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Thu, 12 Sep 2024 10:10:57 -0700 Subject: [PATCH 6/6] Fix checkstyle --- .../loadbalance/extensions/ExtensibleLoadManagerImpl.java | 1 - .../broker/loadbalance/extensions/LoadManagerFailFastTest.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 59ee4a802e1e4..8e34f2f697fb1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -36,7 +36,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java index 41f7161fd315f..a400bf733e557 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java @@ -28,8 +28,8 @@ import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.awaitility.Awaitility; import org.mockito.Mockito; -import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass;