Skip to content

Commit

Permalink
[fix][meta] Fix ephemeral handling of ZK nodes and fix MockZooKeeper …
Browse files Browse the repository at this point in the history
…ephemeral and ZK stat handling (#23988)

(cherry picked from commit df51972)
  • Loading branch information
lhotari committed Feb 17, 2025
1 parent b56dfc7 commit 67ae209
Show file tree
Hide file tree
Showing 22 changed files with 1,202 additions and 755 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,15 +305,15 @@ public static void fillNamespaceToBundlesMap(final Set<String> bundles,
public static String getBundleRangeFromBundleName(String bundleName) {
// the bundle format is property/cluster/namespace/0x00000000_0xFFFFFFFF
int pos = bundleName.lastIndexOf("/");
checkArgument(pos != -1);
checkArgument(pos != -1, "Invalid bundle name format: %s", bundleName);
return bundleName.substring(pos + 1);
}

// From a full bundle name, extract the namespace name.
public static String getNamespaceNameFromBundleName(String bundleName) {
// the bundle format is property/cluster/namespace/0x00000000_0xFFFFFFFF
int pos = bundleName.lastIndexOf('/');
checkArgument(pos != -1);
checkArgument(pos != -1, "Invalid bundle name format: %s", bundleName);
return bundleName.substring(0, pos);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ private void updateBundleData() {
for (String bundle : bundleData.keySet()) {
if (!activeBundles.contains(bundle)){
bundleData.remove(bundle);
if (pulsar.getLeaderElectionService().isLeader()){
if (pulsar.getLeaderElectionService() != null && pulsar.getLeaderElectionService().isLeader()){
deleteBundleDataFromMetadataStore(bundle);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ protected static boolean isLeaderBroker(PulsarService pulsar) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return true;
}
return pulsar.getLeaderElectionService().isLeader();
return pulsar.getLeaderElectionService() != null && pulsar.getLeaderElectionService().isLeader();
}

public void validateTenantOperation(String tenant, TenantOperation operation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,11 @@ public int size() {

public void validateBundle(NamespaceBundle nsBundle) throws Exception {
int idx = Arrays.binarySearch(partitions, nsBundle.getLowerEndpoint());
checkArgument(idx >= 0, "Cannot find bundle in the bundles list");
checkArgument(nsBundle.getUpperEndpoint().equals(bundles.get(idx).getUpperEndpoint()),
"Invalid upper boundary for bundle");
checkArgument(idx >= 0, "Cannot find bundle %s in the bundles list", nsBundle);
NamespaceBundle foundBundle = bundles.get(idx);
Long upperEndpoint = foundBundle.getUpperEndpoint();
checkArgument(nsBundle.getUpperEndpoint().equals(upperEndpoint),
"Invalid upper boundary for bundle %s. Expected upper boundary of %s", nsBundle, foundBundle);
}

public NamespaceBundle getFullBundle() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ public static String getTlsFileForClient(String name) {

protected boolean enableBrokerInterceptor = false;

// Set to true in test's constructor to use a real Zookeeper (TestZKServer)
protected boolean useTestZookeeper;

public MockedPulsarServiceBaseTest() {
resetConfig();
}
Expand Down Expand Up @@ -309,7 +312,14 @@ protected void afterPulsarStart(PulsarService pulsar) throws Exception {
* @throws Exception if an error occurs
*/
protected void restartBroker() throws Exception {
restartBroker(null);
}

protected void restartBroker(Consumer<ServiceConfiguration> configurationChanger) throws Exception {
stopBroker();
if (configurationChanger != null) {
configurationChanger.accept(conf);
}
startBroker();
}

Expand Down Expand Up @@ -400,7 +410,6 @@ protected PulsarTestContext.Builder createPulsarTestContextBuilder(ServiceConfig
PulsarTestContext.Builder builder = PulsarTestContext.builder()
.spyByDefault()
.config(conf)
.withMockZookeeper(true)
.pulsarServiceCustomizer(pulsarService -> {
try {
beforePulsarStart(pulsarService);
Expand All @@ -409,9 +418,25 @@ protected PulsarTestContext.Builder createPulsarTestContextBuilder(ServiceConfig
}
})
.brokerServiceCustomizer(this::customizeNewBrokerService);
configureMetadataStores(builder);
return builder;
}

/**
* Configures the metadata stores for the PulsarTestContext.Builder instance.
* Set useTestZookeeper to true in the test's constructor to use TestZKServer which is a real ZooKeeper
* implementation.
*
* @param builder the PulsarTestContext.Builder instance to configure
*/
protected void configureMetadataStores(PulsarTestContext.Builder builder) {
if (useTestZookeeper) {
builder.withTestZookeeper();
} else {
builder.withMockZookeeper(true);
}
}

/**
* This method can be used in test classes for creating additional PulsarTestContext instances
* that share the same mock ZooKeeper and BookKeeper instances as the main PulsarTestContext instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@

package org.apache.pulsar.broker.testcontext;

import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
Expand All @@ -33,6 +31,7 @@
import lombok.Builder;
import lombok.Getter;
import lombok.Singular;
import lombok.SneakyThrows;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
Expand All @@ -56,16 +55,19 @@
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.TestZKServer;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.MockZooKeeperSession;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.jetbrains.annotations.NotNull;
import org.mockito.Mockito;
import org.mockito.internal.util.MockUtil;
Expand Down Expand Up @@ -150,6 +152,10 @@ public class PulsarTestContext implements AutoCloseable {

private final MockZooKeeper mockZooKeeperGlobal;

private final TestZKServer testZKServer;

private final TestZKServer testZKServerGlobal;

private final SpyConfig spyConfig;

private final boolean startable;
Expand Down Expand Up @@ -377,6 +383,11 @@ public Builder reuseMockBookkeeperAndMetadataStores(PulsarTestContext otherConte
if (otherContext.getMockZooKeeperGlobal() != null) {
mockZooKeeperGlobal(otherContext.getMockZooKeeperGlobal());
}
} else if (otherContext.getTestZKServer() != null) {
testZKServer(otherContext.getTestZKServer());
if (otherContext.getTestZKServerGlobal() != null) {
testZKServerGlobal(otherContext.getTestZKServerGlobal());
}
} else {
localMetadataStore(NonClosingProxyHandler.createNonClosingProxy(otherContext.getLocalMetadataStore(),
MetadataStoreExtended.class
Expand Down Expand Up @@ -436,17 +447,56 @@ public Builder withMockZookeeper(boolean useSeparateGlobalZk) {
}

private MockZooKeeper createMockZooKeeper() throws Exception {
MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
List<ACL> dummyAclList = new ArrayList<>(0);
MockZooKeeper zk = MockZooKeeper.newInstance();
initializeZookeeper(zk);
registerCloseable(zk::shutdown);
return zk;
}

private static void initializeZookeeper(ZooKeeper zk) throws KeeperException, InterruptedException {
ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000,
"".getBytes(StandardCharsets.UTF_8), dummyAclList, CreateMode.PERSISTENT);
"".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(StandardCharsets.UTF_8), dummyAclList,
zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}

registerCloseable(zk::shutdown);
return zk;
/**
* Configure this PulsarTestContext to use a test ZooKeeper instance which is
* shared for both the local and configuration metadata stores.
*
* @return the builder
*/
public Builder withTestZookeeper() {
return withTestZookeeper(false);
}

/**
* Configure this PulsarTestContext to use a test ZooKeeper instance.
*
* @param useSeparateGlobalZk if true, the global (configuration) zookeeper will be a separate instance
* @return the builder
*/
public Builder withTestZookeeper(boolean useSeparateGlobalZk) {
try {
testZKServer(createTestZookeeper());
if (useSeparateGlobalZk) {
testZKServerGlobal(createTestZookeeper());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return this;
}

private TestZKServer createTestZookeeper() throws Exception {
TestZKServer testZKServer = new TestZKServer();
try (ZooKeeper zkc = new ZooKeeper(testZKServer.getConnectionString(), 5000, event -> {
})) {
initializeZookeeper(zkc);
}
registerCloseable(testZKServer);
return testZKServer;
}

/**
Expand Down Expand Up @@ -628,6 +678,20 @@ private void initializeCommonPulsarServices(SpyConfig spyConfig) {
configurationMetadataStore(mockZookeeperMetadataStore);
}
}
} else if (super.testZKServer != null) {
MetadataStoreExtended testZookeeperMetadataStore =
createTestZookeeperMetadataStore(super.testZKServer, MetadataStoreConfig.METADATA_STORE);
if (super.localMetadataStore == null) {
localMetadataStore(testZookeeperMetadataStore);
}
if (super.configurationMetadataStore == null) {
if (super.testZKServerGlobal != null) {
configurationMetadataStore(createTestZookeeperMetadataStore(super.testZKServerGlobal,
MetadataStoreConfig.CONFIGURATION_METADATA_STORE));
} else {
configurationMetadataStore(testZookeeperMetadataStore);
}
}
} else {
try {
MetadataStoreExtended store = MetadataStoreFactoryImpl.createExtended("memory:local",
Expand Down Expand Up @@ -672,6 +736,17 @@ private MetadataStoreExtended createMockZookeeperMetadataStore(MockZooKeeper moc
return nonClosingProxy;
}

@SneakyThrows
private MetadataStoreExtended createTestZookeeperMetadataStore(TestZKServer zkServer,
String metadataStoreName) {
MetadataStoreExtended store = MetadataStoreExtended.create("zk:" + zkServer.getConnectionString(),
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName).build());
registerCloseable(store);
MetadataStoreExtended nonClosingProxy =
NonClosingProxyHandler.createNonClosingProxy(store, MetadataStoreExtended.class);
return nonClosingProxy;
}

protected abstract void initializePulsarServices(SpyConfig spyConfig, Builder builder);
}

Expand Down
Loading

0 comments on commit 67ae209

Please sign in to comment.