Skip to content

Commit

Permalink
Allow configuring advertised.listeners for controllers with Kafka ver…
Browse files Browse the repository at this point in the history
…sion 3.9

Signed-off-by: Gantigmaa Selenge <[email protected]>
  • Loading branch information
tinaselenge committed Sep 3, 2024
1 parent e815d6b commit 306f08c
Show file tree
Hide file tree
Showing 4 changed files with 324 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class KafkaBrokerConfigurationBuilder {
private final static String CONTROL_PLANE_LISTENER_NAME = "CONTROLPLANE-9090";
private final static String REPLICATION_LISTENER_NAME = "REPLICATION-9091";

// Kafka version that requires advertised.listeners configuration for controllers, previous versions do not allow this configuration for controllers.
private final static KafkaVersion KAFKA_VERSION_3_9 = new KafkaVersion("3.9.0", "", "", "", "", false, false, "");

// Names of environment variables placeholders replaced only in the running container
private final static String PLACEHOLDER_CERT_STORE_PASSWORD = "${CERTS_STORE_PASSWORD}";
private final static String PLACEHOLDER_RACK_ID = "${STRIMZI_RACK_ID}";
Expand Down Expand Up @@ -254,18 +257,19 @@ public KafkaBrokerConfigurationBuilder withKRaft(String clusterName, String name
* generate the per-broker configuration which uses actual broker IDs and addresses instead of just placeholders.
*
* @param clusterName Name of the cluster (important for the advertised hostnames)
* @param kafkaVersion Kafka version of the cluster
* @param namespace Namespace (important for generating the advertised hostname)
* @param kafkaListeners The listeners configuration from the Kafka CR
* @param advertisedHostnameProvider Lambda method which provides the advertised hostname for given listener and
* broker. This is used to configure the user-configurable listeners.
* @param advertisedPortProvider Lambda method which provides the advertised port for given listener and broker.
* This is used to configure the user-configurable listeners.
*
* @return Returns the builder instance
*/
@SuppressWarnings({"checkstyle:CyclomaticComplexity"})
public KafkaBrokerConfigurationBuilder withListeners(
String clusterName,
KafkaVersion kafkaVersion,
String namespace,
List<GenericKafkaListener> kafkaListeners,
Function<String, String> advertisedHostnameProvider,
Expand All @@ -281,8 +285,8 @@ public KafkaBrokerConfigurationBuilder withListeners(
if (node.controller() || (node.broker() && kafkaMetadataConfigState.isZooKeeperToMigration())) {
listeners.add(CONTROL_PLANE_LISTENER_NAME + "://0.0.0.0:9090");

// Control Plane listener to be advertised only with broker in ZooKeeper-based or migration but NOT when full KRaft only or mixed
if (node.broker() && kafkaMetadataConfigState.isZooKeeperToMigration()) {
// Control Plane listener to be advertised with broker in ZooKeeper-based or migration OR KRaft controller or mixed node with version 3.9+
if ((node.broker() && kafkaMetadataConfigState.isZooKeeperToMigration()) || (kafkaVersion.compareTo(KAFKA_VERSION_3_9) >= 0)) {
advertisedListeners.add(String.format("%s://%s:9090",
CONTROL_PLANE_LISTENER_NAME,
// Pod name constructed to be templatable for each individual ordinal
Expand Down Expand Up @@ -350,13 +354,18 @@ public KafkaBrokerConfigurationBuilder withListeners(
writer.println("listener.security.protocol.map=" + String.join(",", securityProtocol));
writer.println("listeners=" + String.join(",", listeners));

// Advertised listeners are not allowed on KRaft nodes with controller only role
if (!isKraftControllerOnly) {
writer.println("advertised.listeners=" + String.join(",", advertisedListeners));
writer.println("inter.broker.listener.name=" + REPLICATION_LISTENER_NAME);
} else if (node.controller() && kafkaMetadataConfigState.isZooKeeperToPostMigration()) {
// needed for KRaft controller only as well until post-migration because it needs to contact brokers
writer.println("inter.broker.listener.name=" + REPLICATION_LISTENER_NAME);
} else if (node.controller()) {
if (advertisedListeners.size() > 0) {
writer.println("advertised.listeners=" + String.join(",", advertisedListeners));
}

if (kafkaMetadataConfigState.isZooKeeperToPostMigration()) {
// needed for KRaft controller only as well until post-migration because it needs to contact brokers
writer.println("inter.broker.listener.name=" + REPLICATION_LISTENER_NAME);
}
}

// Control plane listener is on all ZooKeeper based brokers, needed during migration as well, when broker still using ZooKeeper but KRaft controllers are ready
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1775,6 +1775,7 @@ private String generatePerBrokerConfiguration(NodeRef node, KafkaPool pool, Map<
.withRackId(rack)
.withLogDirs(VolumeUtils.createVolumeMounts(pool.storage, false))
.withListeners(cluster,
kafkaVersion,
namespace,
listeners,
listenerId -> advertisedHostnames.get(node.nodeId()).get(listenerId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class KafkaVersionTestUtils {
public static final String LATEST_ZOOKEEPER_VERSION = "3.8.4";
public static final String LATEST_CHECKSUM = "ABCD1234";
public static final String LATEST_THIRD_PARTY_VERSION = "3.8.x";
public static final String KAFKA_390_VERSION = "3.9.0";
public static final String LATEST_KAFKA_IMAGE = KAFKA_IMAGE_STR + LATEST_KAFKA_VERSION;
public static final String LATEST_KAFKA_CONNECT_IMAGE = KAFKA_CONNECT_IMAGE_STR + LATEST_KAFKA_VERSION;
public static final String LATEST_KAFKA_MIRROR_MAKER_IMAGE = KAFKA_MIRROR_MAKER_IMAGE_STR + LATEST_KAFKA_VERSION;
Expand Down
Loading

0 comments on commit 306f08c

Please sign in to comment.