diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 106fd4422598f..5531ab1dbc330 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -165,8 +165,10 @@ class KafkaServer( var kafkaScheduler: KafkaScheduler = _ - var kraftControllerNodes: Seq[Node] = _ @volatile var metadataCache: ZkMetadataCache = _ + + @volatile var quorumControllerNodeProvider: RaftControllerNodeProvider = _ + var quotaManagers: QuotaFactory.QuotaManagers = _ val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config) @@ -324,20 +326,13 @@ class KafkaServer( remoteLogManagerOpt = createRemoteLogManager() - if (config.migrationEnabled) { - kraftControllerNodes = QuorumConfig.voterConnectionsToNodes( - QuorumConfig.parseVoterConnections(config.quorumVoters) - ).asScala - } else { - kraftControllerNodes = Seq.empty - } metadataCache = MetadataCache.zkMetadataCache( config.brokerId, config.interBrokerProtocolVersion, brokerFeatures, - kraftControllerNodes, config.migrationEnabled) - val controllerNodeProvider = new MetadataCacheControllerNodeProvider(metadataCache, config) + val controllerNodeProvider = new MetadataCacheControllerNodeProvider(metadataCache, config, + () => Option(quorumControllerNodeProvider).map(_.getControllerInfo())) /* initialize feature change listener */ _featureChangeListener = new FinalizedFeatureChangeListener(metadataCache, _zkClient) @@ -1075,6 +1070,8 @@ class KafkaServer( } _brokerState = BrokerState.NOT_RUNNING + quorumControllerNodeProvider = null + startupComplete.set(false) isShuttingDown.set(false) CoreUtils.swallow(AppInfoParser.unregisterAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics), this) diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 015e46a76523d..b8eda3fe4dc34 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -116,10 +116,9 @@ object MetadataCache { def zkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty(), - kraftControllerNodes: collection.Seq[Node] = collection.Seq.empty[Node], zkMigrationEnabled: Boolean = false) : ZkMetadataCache = { - new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, kraftControllerNodes, zkMigrationEnabled) + new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, zkMigrationEnabled) } def kRaftMetadataCache(brokerId: Int): KRaftMetadataCache = { diff --git a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala index 36997a4ea49e8..0017a5876af13 100644 --- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala @@ -55,22 +55,15 @@ trait ControllerNodeProvider { class MetadataCacheControllerNodeProvider( val metadataCache: ZkMetadataCache, - val config: KafkaConfig + val config: KafkaConfig, + val quorumControllerNodeProvider: () => Option[ControllerInformation] ) extends ControllerNodeProvider { private val zkControllerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName) private val zkControllerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) private val zkControllerSaslMechanism = config.saslMechanismInterBrokerProtocol - private val kraftControllerListenerName = if (config.controllerListenerNames.nonEmpty) - new ListenerName(config.controllerListenerNames.head) else null - private val kraftControllerSecurityProtocol = Option(kraftControllerListenerName) - .map( listener => config.effectiveListenerSecurityProtocolMap.getOrElse( - listener, SecurityProtocol.forName(kraftControllerListenerName.value()))) - .orNull - private val kraftControllerSaslMechanism = config.saslMechanismControllerProtocol - - private val emptyZkControllerInfo = ControllerInformation( + val emptyZkControllerInfo = ControllerInformation( None, zkControllerListenerName, zkControllerSecurityProtocol, @@ -85,12 +78,8 @@ class MetadataCacheControllerNodeProvider( zkControllerSecurityProtocol, zkControllerSaslMechanism, isZkController = true) - case KRaftCachedControllerId(id) => ControllerInformation( - metadataCache.getAliveBrokerNode(id, kraftControllerListenerName), - kraftControllerListenerName, - kraftControllerSecurityProtocol, - kraftControllerSaslMechanism, - isZkController = false) + case KRaftCachedControllerId(_) => + quorumControllerNodeProvider.apply().getOrElse(emptyZkControllerInfo) }.getOrElse(emptyZkControllerInfo) } } diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index d13f9e0b7f141..9616f059a0eb9 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -158,7 +158,6 @@ class ZkMetadataCache( brokerId: Int, metadataVersion: MetadataVersion, brokerFeatures: BrokerFeatures, - kraftControllerNodes: Seq[Node] = Seq.empty, zkMigrationEnabled: Boolean = false) extends MetadataCache with ZkFinalizedFeatureCache with Logging { @@ -182,8 +181,6 @@ class ZkMetadataCache( private val featureLock = new ReentrantLock() private val featureCond = featureLock.newCondition() - private val kraftControllerNodeMap = kraftControllerNodes.map(node => node.id() -> node).toMap - // This method is the main hotspot when it comes to the performance of metadata requests, // we should be careful about adding additional logic here. Relatedly, `brokers` is // `List[Integer]` instead of `List[Int]` to avoid a collection copy. @@ -350,11 +347,7 @@ class ZkMetadataCache( override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): Option[Node] = { val snapshot = metadataSnapshot - brokerId match { - case id if snapshot.controllerId.filter(_.isInstanceOf[KRaftCachedControllerId]).exists(_.id == id) => - kraftControllerNodeMap.get(id) - case _ => snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName)) - } + snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName)) } override def getAliveBrokerNodes(listenerName: ListenerName): Iterable[Node] = { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index ecd644b56dd3a..48ff022a79a13 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -217,7 +217,7 @@ public RecordsSend toSend() { // TODO: fix to support raft ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(0, - config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), null, false); + config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), false); metadataCache.updateMetadata(0, updateMetadataRequest); replicaManager = new ReplicaManagerBuilder(). diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java index 1c5e499145fcf..38933e7e1d31a 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java @@ -109,7 +109,7 @@ public class MetadataRequestBenchmark { private final Metrics metrics = new Metrics(); private final int brokerId = 1; private final ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId, - MetadataVersion.latestTesting(), BrokerFeatures.createEmpty(), null, false); + MetadataVersion.latestTesting(), BrokerFeatures.createEmpty(), false); private final ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class); private final ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class); private final ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java index fb76bab02cfa1..3c4e84c6a6de5 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java @@ -116,7 +116,7 @@ public void setup() { final MetadataCache metadataCache = MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(), this.brokerProperties.interBrokerProtocolVersion(), - BrokerFeatures.createEmpty(), null, false); + BrokerFeatures.createEmpty(), false); this.quotaManagers = QuotaFactory.instantiate(this.brokerProperties, this.metrics, diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java index 86db11d4e36b4..c38e4b489e264 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java @@ -162,7 +162,7 @@ public Properties getEntityConfigs(String rootEntityType, String sanitizedEntity setBrokerTopicStats(brokerTopicStats). setMetadataCache(MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(), this.brokerProperties.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), - null, false)). + false)). setLogDirFailureChannel(failureChannel). setAlterPartitionManager(alterPartitionManager). build();