Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix the broker registery cannot recover from the metadata node deletion #23359

Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@
import java.util.function.BiConsumer;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;

/**
* Responsible for registering the current Broker lookup info to
* the distributed store (e.g. Zookeeper) for broker discovery.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Unstable
public interface BrokerRegistry extends AutoCloseable {

/**
Expand All @@ -47,7 +51,7 @@ public interface BrokerRegistry extends AutoCloseable {
/**
* Register local broker to metadata store.
*/
void register() throws MetadataStoreException;
CompletableFuture<Void> registerAsync();

/**
* Unregister the broker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
Expand Down Expand Up @@ -69,17 +70,26 @@ protected enum State {
Init,
Started,
Registered,
Unregistering,
Closed
}

private State state;
@VisibleForTesting
final AtomicReference<State> state = new AtomicReference<>(State.Init);

public BrokerRegistryImpl(PulsarService pulsar) {
this.pulsar = pulsar;
this.conf = pulsar.getConfiguration();
this.brokerLookupDataMetadataCache = pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class);
this.scheduler = pulsar.getLoadManagerExecutor();
this.listeners = new ArrayList<>();
// The registered node is an ephemeral node that could be deleted when the metadata store client's session
// is expired. In this case, we should register again.
this.listeners.add((broker, notificationType) -> {
if (notificationType == NotificationType.Deleted && getBrokerId().equals(broker)) {
registerAsync();
}
});
this.brokerIdKeyPath = keyPath(pulsar.getBrokerId());
this.brokerLookupData = new BrokerLookupData(
pulsar.getWebServiceAddress(),
Expand All @@ -94,44 +104,45 @@ public BrokerRegistryImpl(PulsarService pulsar) {
System.currentTimeMillis(),
pulsar.getBrokerVersion(),
pulsar.getConfig().lookupProperties());
this.state = State.Init;
}

@Override
public synchronized void start() throws PulsarServerException {
if (this.state != State.Init) {
return;
if (!this.state.compareAndSet(State.Init, State.Started)) {
throw new PulsarServerException("Cannot start the broker registry in state " + state.get());
}
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
try {
this.state = State.Started;
this.register();
} catch (MetadataStoreException e) {
throw new PulsarServerException(e);
this.registerAsync().get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
throw PulsarServerException.from(e);
}
}

@Override
public boolean isStarted() {
return this.state == State.Started || this.state == State.Registered;
final var state = this.state.get();
return state == State.Started || state == State.Registered;
}

@Override
public synchronized void register() throws MetadataStoreException {
if (this.state == State.Started) {
try {
brokerLookupDataMetadataCache.put(brokerIdKeyPath, brokerLookupData, EnumSet.of(CreateOption.Ephemeral))
.get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
this.state = State.Registered;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw MetadataStoreException.unwrap(e);
}
public CompletableFuture<Void> registerAsync() {
final var state = this.state.get();
if (state != State.Started && state != State.Registered) {
log.info("[{}] Skip registering self because the state is {}", getBrokerId(), state);
return CompletableFuture.completedFuture(null);
}
log.info("[{}] Started registering self to {} (state: {})", getBrokerId(), brokerIdKeyPath, state);
return brokerLookupDataMetadataCache.put(brokerIdKeyPath, brokerLookupData, EnumSet.of(CreateOption.Ephemeral))
.thenAccept(__ -> {
this.state.set(State.Registered);
log.info("[{}] Finished registering self", getBrokerId());
});
}

@Override
public synchronized void unregister() throws MetadataStoreException {
if (this.state == State.Registered) {
if (state.compareAndSet(State.Registered, State.Unregistering)) {
try {
brokerLookupDataMetadataCache.delete(brokerIdKeyPath)
.get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
Expand All @@ -144,7 +155,7 @@ public synchronized void unregister() throws MetadataStoreException {
} catch (InterruptedException | TimeoutException e) {
throw MetadataStoreException.unwrap(e);
} finally {
this.state = State.Started;
state.set(State.Started);
}
}
}
Expand Down Expand Up @@ -191,7 +202,7 @@ public synchronized void addListener(BiConsumer<String, NotificationType> listen

@Override
public synchronized void close() throws PulsarServerException {
if (this.state == State.Closed) {
if (this.state.get() == State.Closed) {
return;
}
try {
Expand All @@ -200,7 +211,7 @@ public synchronized void close() throws PulsarServerException {
} catch (Exception ex) {
log.error("Unexpected error when unregistering the broker registry", ex);
} finally {
this.state = State.Closed;
this.state.set(State.Closed);
}
}

Expand Down Expand Up @@ -238,7 +249,7 @@ protected static String keyPath(String brokerId) {
}

private void checkState() throws IllegalStateException {
if (this.state == State.Closed) {
if (this.state.get() == State.Closed) {
throw new IllegalStateException("The registry already closed.");
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class BrokerRegistryIntegrationTest {

private static final String clusterName = "test";
private final int zkPort = PortManager.nextFreePort();
private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextFreePort);
private PulsarService pulsar;
private BrokerRegistry brokerRegistry;
private String brokerMetadataPath;

@BeforeClass
protected void setup() throws Exception {
bk.start();
pulsar = new PulsarService(brokerConfig());
pulsar.start();
final var admin = pulsar.getAdminClient();
admin.clusters().createCluster(clusterName, ClusterData.builder().build());
admin.tenants().createTenant("public", TenantInfo.builder()
.allowedClusters(Collections.singleton(clusterName)).build());
admin.namespaces().createNamespace("public/default");
brokerRegistry = ((ExtensibleLoadManagerWrapper) pulsar.getLoadManager().get()).get().getBrokerRegistry();
brokerMetadataPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getBrokerId();
}

@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
if (pulsar != null) {
pulsar.close();
}
bk.stop();
}

@Test
public void testRecoverFromNodeDeletion() throws Exception {
// Simulate the case that the node was somehow deleted (e.g. by session timeout)
Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> Assert.assertEquals(
brokerRegistry.getAvailableBrokersAsync().join(), List.of(pulsar.getBrokerId())));
pulsar.getLocalMetadataStore().delete(brokerMetadataPath, Optional.empty());
Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> Assert.assertEquals(
brokerRegistry.getAvailableBrokersAsync().join(), List.of(pulsar.getBrokerId())));

// If the node is deleted by unregister(), it should not recreate the path
brokerRegistry.unregister();
Thread.sleep(3000);
Assert.assertTrue(brokerRegistry.getAvailableBrokersAsync().get().isEmpty());
Demogorgon314 marked this conversation as resolved.
Show resolved Hide resolved

// Restore the normal state
brokerRegistry.registerAsync().get();
Assert.assertEquals(brokerRegistry.getAvailableBrokersAsync().get(), List.of(pulsar.getBrokerId()));
}

@Test
public void testRegisterAgain() throws Exception {
Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> Assert.assertEquals(
brokerRegistry.getAvailableBrokersAsync().join(), List.of(pulsar.getBrokerId())));
final var metadataStore = pulsar.getLocalMetadataStore();
final var oldResult = metadataStore.get(brokerMetadataPath).get().orElseThrow();
log.info("Old result: {} {}", new String(oldResult.getValue()), oldResult.getStat().getVersion());
brokerRegistry.registerAsync().get();

Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> {
final var newResult = metadataStore.get(brokerMetadataPath).get().orElseThrow();
log.info("New result: {} {}", new String(newResult.getValue()), newResult.getStat().getVersion());
Assert.assertTrue(newResult.getStat().getVersion() > oldResult.getStat().getVersion());
Assert.assertEquals(newResult.getValue(), oldResult.getValue());
});
}

private ServiceConfiguration brokerConfig() {
final var config = new ServiceConfiguration();
config.setClusterName(clusterName);
config.setAdvertisedAddress("localhost");
config.setBrokerServicePort(Optional.of(0));
config.setWebServicePort(Optional.of(0));
config.setMetadataStoreUrl("zk:127.0.0.1:" + bk.getZookeeperPort());
config.setManagedLedgerDefaultWriteQuorum(1);
config.setManagedLedgerDefaultAckQuorum(1);
config.setManagedLedgerDefaultEnsembleSize(1);
config.setDefaultNumberOfNamespaceBundles(16);
config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
config.setLoadBalancerDebugModeEnabled(true);
config.setBrokerShutdownTimeoutMs(100);
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public void testCloseRegister() throws Exception {
assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Started);

// Check state after re-register.
brokerRegistry.register();
brokerRegistry.registerAsync().get();
assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Registered);

// Check state after close.
Expand Down Expand Up @@ -396,8 +396,8 @@ public void testKeyPath() {
assertEquals(keyPath, LOADBALANCE_BROKERS_ROOT + "/brokerId");
}

public BrokerRegistryImpl.State getState(BrokerRegistryImpl brokerRegistry) {
return WhiteboxImpl.getInternalState(brokerRegistry, BrokerRegistryImpl.State.class);
private static BrokerRegistryImpl.State getState(BrokerRegistryImpl brokerRegistry) {
return brokerRegistry.state.get();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,7 @@ public void testDeployAndRollbackLoadManager() throws Exception {
producer.send("t1");

// Test re-register broker and check the lookup result
loadManager4.getBrokerRegistry().register();
loadManager4.getBrokerRegistry().registerAsync().get();

result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
assertNotNull(result);
Expand Down Expand Up @@ -1423,7 +1423,7 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception {
producer.send("t1");

// Test re-register broker and check the lookup result
loadManager4.getBrokerRegistry().register();
loadManager4.getBrokerRegistry().registerAsync().get();

result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
assertNotNull(result);
Expand Down
Loading