Skip to content

Commit

Permalink
Properly implement reusing TestZKServer in another context
Browse files Browse the repository at this point in the history
- ensures that it has a separate session
  • Loading branch information
lhotari committed Feb 17, 2025
1 parent a631efa commit 2c7b042
Showing 1 changed file with 37 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import lombok.Builder;
import lombok.Getter;
import lombok.Singular;
import lombok.SneakyThrows;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
Expand Down Expand Up @@ -162,6 +163,10 @@ public class PulsarTestContext implements AutoCloseable {

private final MockZooKeeper mockZooKeeperGlobal;

private final TestZKServer testZKServer;

private final TestZKServer testZKServerGlobal;

private final SpyConfig spyConfig;

private final boolean startable;
Expand Down Expand Up @@ -410,6 +415,11 @@ public Builder reuseMockBookkeeperAndMetadataStores(PulsarTestContext otherConte
if (otherContext.getMockZooKeeperGlobal() != null) {
mockZooKeeperGlobal(otherContext.getMockZooKeeperGlobal());
}
} else if (otherContext.getTestZKServer() != null) {
testZKServer(otherContext.getTestZKServer());
if (otherContext.getTestZKServerGlobal() != null) {
testZKServerGlobal(otherContext.getTestZKServerGlobal());
}
} else {
localMetadataStore(NonClosingProxyHandler.createNonClosingProxy(otherContext.getLocalMetadataStore(),
MetadataStoreExtended.class
Expand Down Expand Up @@ -509,20 +519,10 @@ public Builder withTestZookeeper() {
*/
public Builder withTestZookeeper(boolean useSeparateGlobalZk) {
try {
TestZKServer localZk = createTestZookeeper();
MetadataStoreExtended localStore =
createTestZookeeperMetadataStore(localZk, MetadataStoreConfig.METADATA_STORE);
localMetadataStore(localStore);
MetadataStoreExtended configStore;
testZKServer(createTestZookeeper());
if (useSeparateGlobalZk) {
TestZKServer globalZk = createTestZookeeper();
configStore = createTestZookeeperMetadataStore(globalZk,
MetadataStoreConfig.CONFIGURATION_METADATA_STORE);
} else {
configStore =
createTestZookeeperMetadataStore(localZk, MetadataStoreConfig.CONFIGURATION_METADATA_STORE);
testZKServerGlobal(createTestZookeeper());
}
configurationMetadataStore(configStore);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -539,18 +539,6 @@ private TestZKServer createTestZookeeper() throws Exception {
return testZKServer;
}

private MetadataStoreExtended createTestZookeeperMetadataStore(TestZKServer zkServer,
String metadataStoreName) {
try {
MetadataStoreExtended store = MetadataStoreExtended.create("zk:" + zkServer.getConnectionString(),
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName).build());
registerCloseable(store);
return store;
} catch (MetadataStoreException e) {
throw new RuntimeException(e);
}
}

/**
* Applicable only when PulsarTestContext is not startable. This will configure mocks
* for PulsarTestResources and related classes.
Expand Down Expand Up @@ -738,6 +726,20 @@ private void initializeCommonPulsarServices(SpyConfig spyConfig) {
configurationMetadataStore(mockZookeeperMetadataStore);
}
}
} else if (super.testZKServer != null) {
MetadataStoreExtended testZookeeperMetadataStore =
createTestZookeeperMetadataStore(super.testZKServer, MetadataStoreConfig.METADATA_STORE);
if (super.localMetadataStore == null) {
localMetadataStore(testZookeeperMetadataStore);
}
if (super.configurationMetadataStore == null) {
if (super.testZKServerGlobal != null) {
configurationMetadataStore(createTestZookeeperMetadataStore(super.testZKServerGlobal,
MetadataStoreConfig.CONFIGURATION_METADATA_STORE));
} else {
configurationMetadataStore(testZookeeperMetadataStore);
}
}
} else {
try {
MetadataStoreExtended store = MetadataStoreFactoryImpl.createExtended("memory:local",
Expand Down Expand Up @@ -782,6 +784,17 @@ private MetadataStoreExtended createMockZookeeperMetadataStore(MockZooKeeper moc
return nonClosingProxy;
}

@SneakyThrows
private MetadataStoreExtended createTestZookeeperMetadataStore(TestZKServer zkServer,
String metadataStoreName) {
MetadataStoreExtended store = MetadataStoreExtended.create("zk:" + zkServer.getConnectionString(),
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName).build());
registerCloseable(store);
MetadataStoreExtended nonClosingProxy =
NonClosingProxyHandler.createNonClosingProxy(store, MetadataStoreExtended.class);
return nonClosingProxy;
}

protected abstract void initializePulsarServices(SpyConfig spyConfig, Builder builder);
}

Expand Down

0 comments on commit 2c7b042

Please sign in to comment.