Skip to content

Commit

Permalink
add dynamic port support
Browse files Browse the repository at this point in the history
  • Loading branch information
TaiJuWu committed Feb 2, 2025
1 parent 0516650 commit 9bf75ac
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 24 deletions.
6 changes: 3 additions & 3 deletions core/src/test/java/kafka/server/KraftVoterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ public void createController() throws Exception {
props.put("process.roles", "controller");
props.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, bootstrapControllers);
props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER");
props.put("controller.quorum.bootstrap.servers", "localhost:9000");
props.put("controller.listener.names", "CONTROLLER");
props.put("listeners", "CONTROLLER://localhost:3999");
props.put("advertised.listeners", "PLAINTEXT://localhost:4000");
props.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
System.err.println("KKK listeners " + props.get("listeners") + " advertised.listeners: " + props.get("advertised.listeners"));
System.err.println("KKK Bootstrap server " + bootstrapControllers);
props.put("listener.security.protocol.map", "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
cluster.createIsolatedController(props);
Collection<Integer> res = new ArrayList<>();
res.add(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.raft.DynamicVoters;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.ServerSocketFactory;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.config.KRaftConfigs;
Expand Down Expand Up @@ -175,7 +176,7 @@ private KafkaConfig createNodeConfig(TestKitNode node) throws IOException {
append("@").
append("localhost").
append(":").
append(socketFactoryManager.getOrCreatePortForListener(nodeId, controllerListenerName));
append(socketFactoryManager.getOrCreatePortForListener(nodeId, controllerListenerName, false));
prefix = ",";
}
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, quorumVoterStringBuilder.toString());
Expand Down Expand Up @@ -234,7 +235,7 @@ public KafkaClusterTestKit build() throws Exception {
try {
baseDirectory = new File(nodes.baseDirectory());
for (TestKitNode node : nodes.controllerNodes().values()) {
socketFactoryManager.getOrCreatePortForListener(node.id(), controllerListenerName);
socketFactoryManager.getOrCreatePortForListener(node.id(), controllerListenerName, false);
}
for (TestKitNode node : nodes.controllerNodes().values()) {
setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList());
Expand All @@ -247,7 +248,7 @@ public KafkaClusterTestKit build() throws Exception {
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
Collections.emptyList(),
faultHandlerFactory,
socketFactoryManager.getOrCreateSocketFactory(node.id())
socketFactoryManager.getOrCreateSocketFactory(node.id(), false)
);
ControllerServer controller = null;
try {
Expand Down Expand Up @@ -275,7 +276,7 @@ public KafkaClusterTestKit build() throws Exception {
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
Collections.emptyList(),
faultHandlerFactory,
socketFactoryManager.getOrCreateSocketFactory(node.id())
socketFactoryManager.getOrCreateSocketFactory(node.id(), false)
);
jointServers.put(node.id(), sharedServer);
}
Expand Down Expand Up @@ -390,14 +391,14 @@ public void format() throws Exception {
try {
for (ControllerServer controller : controllers.values()) {
futures.add(executorService.submit(() -> {
formatNode(controller.sharedServer().metaPropsEnsemble(), true);
formatNode(controller.sharedServer().metaPropsEnsemble(), true, false);
}));
}
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
BrokerServer broker = entry.getValue();
futures.add(executorService.submit(() -> {
formatNode(broker.sharedServer().metaPropsEnsemble(),
!nodes.isCombined(nodes().brokerNodes().get(entry.getKey()).id()));
!nodes.isCombined(nodes().brokerNodes().get(entry.getKey()).id()), false);
}));
}
for (Future<?> future: futures) {
Expand All @@ -411,14 +412,30 @@ public void format() throws Exception {
}
}

public ControllerServer createIsolatedController(Map<String, String> props) {
public ControllerServer createIsolatedController(Map<String, String> props) throws IOException {
props.put(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG,
Long.toString(TimeUnit.MINUTES.toMillis(10)));
props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller");
props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER");

if (props.containsKey(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG) ||
props.containsKey(SocketServerConfigs.LISTENERS_CONFIG) ||
props.containsKey(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)) {
throw new RuntimeException("");
}

int nodeId = Integer.parseInt(props.get(KRaftConfigs.NODE_ID_CONFIG));
ServerSocketFactory serverSocketFactor = socketFactoryManager.getOrCreateSocketFactory(nodeId, true);
int controllerPort = socketFactoryManager.getOrCreatePortForListener(nodeId, "CONTROLLER", true);
int plaintextPort = socketFactoryManager.getOrCreatePortForListener(nodeId, "PLAINTEXT", true);
props.put("controller.listener.names", "CONTROLLER");
props.put("listeners", String.format("CONTROLLER://localhost:%d", controllerPort));
props.put("advertised.listeners", String.format("PLAINTEXT://localhost:%d", plaintextPort));

KafkaConfig config = new KafkaConfig(props);
TestKitNode node = nodes.createControllerNode(config, false);
MetaPropertiesEnsemble metaPropsEnsemble = node.initialMetaPropertiesEnsemble();
formatNode(metaPropsEnsemble, true);
formatNode(metaPropsEnsemble, true, true);

SharedServer sharedServer = new SharedServer(
config,
Expand All @@ -428,7 +445,7 @@ public ControllerServer createIsolatedController(Map<String, String> props) {
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(new ArrayList<>())),
QuorumConfig.parseBootstrapServers(config.quorumConfig().bootstrapServers()),
faultHandlerFactory,
socketFactoryManager.getOrCreateSocketFactory(node.id())
serverSocketFactor
);
ControllerServer controller = null;
try {
Expand All @@ -450,7 +467,8 @@ public ControllerServer createIsolatedController(Map<String, String> props) {

private void formatNode(
MetaPropertiesEnsemble ensemble,
boolean writeMetadataDirectory
boolean writeMetadataDirectory,
boolean isDynamic
) {
try {
Formatter formatter = new Formatter();
Expand Down Expand Up @@ -482,7 +500,7 @@ private void formatNode(
String prefix = "";
for (TestKitNode controllerNode : nodes.controllerNodes().values()) {
int port = socketFactoryManager.
getOrCreatePortForListener(controllerNode.id(), controllerListenerName);
getOrCreatePortForListener(controllerNode.id(), controllerListenerName, isDynamic);
dynamicVotersBuilder.append(prefix);
prefix = ",";
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,28 @@ public ServerSocketChannel openServerSocket(

/**
* Maps node IDs to socket factory objects.
* Protected by the object lock.
*/
private final Map<Integer, PreboundSocketFactory> factories = new HashMap<>();

/**
* Maps node IDs to socket factory objects after initial flow.
*/
private final Map<Integer, PreboundSocketFactory> dunamicFactories = new HashMap<>();

/**
* Maps node IDs to maps of listener names to ports.
* Protected by the object lock.
*/
private final Map<Integer, Map<String, ServerSocketChannel>> sockets = new HashMap<>();

/**
* Maps node IDs to maps of listener names to ports after initialization.
* The node id maybe same with sockets.
* Protected by the object lock.
*/
private final Map<Integer, Map<String, ServerSocketChannel>> dynamicSockets = new HashMap<>();


/**
* Maps node IDs to set of the listeners that were used.
* Protected by the object lock.
Expand Down Expand Up @@ -129,8 +141,14 @@ public synchronized ServerSocketChannel getSocketForListenerAndMarkAsUsed(
*
* @return The socket factory.
*/
public synchronized ServerSocketFactory getOrCreateSocketFactory(int nodeId) {
return factories.computeIfAbsent(nodeId, __ -> new PreboundSocketFactory(nodeId));
public synchronized ServerSocketFactory getOrCreateSocketFactory(int nodeId, boolean isDynamic) {
Map<Integer, PreboundSocketFactory> checkedFactories = factories;

if (isDynamic) {
checkedFactories = dunamicFactories;
}

return checkedFactories.computeIfAbsent(nodeId, __ -> new PreboundSocketFactory(nodeId));
}

/**
Expand All @@ -143,10 +161,17 @@ public synchronized ServerSocketFactory getOrCreateSocketFactory(int nodeId) {
*/
public synchronized int getOrCreatePortForListener(
int nodeId,
String listener
String listener,
boolean isDynamic
) throws IOException {
Map<Integer, Map<String, ServerSocketChannel>> checkedsockets = sockets;

if (isDynamic) {
checkedsockets = dynamicSockets;
}

Map<String, ServerSocketChannel> socketsForNode =
sockets.computeIfAbsent(nodeId, __ -> new HashMap<>());
checkedsockets.computeIfAbsent(nodeId, __ -> new HashMap<>());
ServerSocketChannel socketChannel = socketsForNode.get(listener);
if (socketChannel == null) {
if (closed) {
Expand All @@ -172,9 +197,14 @@ public synchronized void close() throws Exception {
// Close all sockets that haven't been used by a SocketServer. (We don't want to close the
// ones that have been used by a SocketServer because that is the responsibility of that
// SocketServer.)
closeSocket(sockets);
closeSocket(dynamicSockets);
}

private void closeSocket(Map<Integer, Map<String, ServerSocketChannel>> sockets) {
for (Entry<Integer, Map<String, ServerSocketChannel>> socketsEntry : sockets.entrySet()) {
Set<String> usedListeners = usedSockets.getOrDefault(
socketsEntry.getKey(), Collections.emptySet());
socketsEntry.getKey(), Collections.emptySet());
for (Entry<String, ServerSocketChannel> entry : socketsEntry.getValue().entrySet()) {
if (!usedListeners.contains(entry.getKey())) {
Utils.closeQuietly(entry.getValue(), "serverSocketChannel");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,7 @@ public void testCreateOutOfClusterController() throws Exception {
Map<String, String> props = new HashMap<>();
props.put("node.id", "2");
props.put("controller.quorum.bootstrap.servers", "localhost:9000");
props.put("controller.listener.names", "CONTROLLER");
props.put("listeners", "CONTROLLER://localhost:3999");
props.put("advertised.listeners", "PLAINTEXT://localhost:4000");
props.put("listener.security.protocol.map", "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT");

try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes).build()) {
assertDoesNotThrow(() -> cluster.createIsolatedController(props));
}
Expand Down

0 comments on commit 9bf75ac

Please sign in to comment.