Skip to content

Commit

Permalink
KAFKA-18360 Remove zookeeper configurations (#18566)
Browse files Browse the repository at this point in the history
Remove broker.id.generation.enable and reserved.broker.max.id, which are not used in KRaft mode.
Remove inter.broker.protocol.version, which is not used in KRaft mode.

Reviewers: PoAn Yang <[email protected]>, Ismael Juma <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
cmccabe authored Feb 6, 2025
1 parent a3d9d88 commit b2b2408
Show file tree
Hide file tree
Showing 12 changed files with 47 additions and 161 deletions.
31 changes: 3 additions & 28 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -389,28 +389,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val uncleanLeaderElectionCheckIntervalMs: Long = getLong(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG)
def uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG)

// We keep the user-provided String as `MetadataVersion.fromVersionString` can choose a slightly different version (eg if `0.10.0`
// is passed, `0.10.0-IV0` may be picked)
val interBrokerProtocolVersionString = getString(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)
val interBrokerProtocolVersion = if (processRoles.isEmpty) {
MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
} else {
if (originals.containsKey(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)) {
// A user-supplied IBP was given
val configuredVersion = MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
if (!configuredVersion.isKRaftSupported) {
throw new ConfigException(s"A non-KRaft version $interBrokerProtocolVersionString given for ${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG}. " +
s"The minimum version is ${MetadataVersion.MINIMUM_KRAFT_VERSION}")
} else {
warn(s"${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG} is deprecated in KRaft mode as of 3.3 and will only " +
s"be read when first upgrading from a KRaft prior to 3.3. See kafka-storage.sh help for details on setting " +
s"the metadata.version for a new KRaft cluster.")
}
}
// In KRaft mode, we pin this value to the minimum KRaft-supported version. This prevents inadvertent usage of
// the static IBP config in broker components running in KRaft mode
MetadataVersion.MINIMUM_KRAFT_VERSION
}
// This will be removed soon. See KAFKA-18366.
val interBrokerProtocolVersion = MetadataVersion.MINIMUM_KRAFT_VERSION

/** ********* Controlled shutdown configuration ***********/
val controlledShutdownEnable = getBoolean(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG)
Expand Down Expand Up @@ -713,15 +693,10 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
validateAdvertisedControllerListenersNonEmptyForKRaftController()
validateControllerListenerNamesMustAppearInListenersForKRaftController()
} else {
// controller listener names must be empty when not in KRaft mode
require(controllerListenerNames.isEmpty,
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must be empty when not running in KRaft mode: ${controllerListenerNames.asJava}")
}

val listenerNames = listeners.map(_.listenerName).toSet
if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) {
// validations for all broker setups (i.e. broker-only and co-located)
if (processRoles.contains(ProcessRole.BrokerRole)) {
validateAdvertisedBrokerListenersNonEmptyForBroker()
require(advertisedBrokerListenerNames.contains(interBrokerListenerName),
s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/server/KafkaRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ object KafkaRaftServer {
}

// Load the BootstrapMetadata.
val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir,
Optional.ofNullable(config.interBrokerProtocolVersionString))
val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir, Optional.empty())
val bootstrapMetadata = bootstrapDirectory.read()
(metaPropsEnsemble, bootstrapMetadata)
}
Expand Down
9 changes: 3 additions & 6 deletions core/src/main/scala/kafka/tools/StorageTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
import org.apache.kafka.metadata.storage.{Formatter, FormatterException}
import org.apache.kafka.raft.{DynamicVoters, QuorumConfig}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.ReplicationConfigs

import java.util
import scala.collection.mutable
Expand Down Expand Up @@ -129,11 +128,9 @@ object StorageTool extends Logging {
setIgnoreFormatted(namespace.getBoolean("ignore_formatted")).
setControllerListenerName(config.controllerListenerNames.head).
setMetadataLogDirectory(config.metadataLogDir)
Option(namespace.getString("release_version")) match {
case Some(releaseVersion) => formatter.setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion))
case None => Option(config.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).
foreach(v => formatter.setReleaseVersion(MetadataVersion.fromVersionString(v.toString)))
}
Option(namespace.getString("release_version")).foreach(
releaseVersion => formatter.
setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion)))
Option(namespace.getList[String]("feature")).foreach(
featureNamesAndLevels(_).foreachEntry {
(k, v) => formatter.setFeatureLevel(k, v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.KRaftConfigs;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -545,7 +544,6 @@ KafkaConfig createKafkaDefaultConfig() {
properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, voterId + "@localhost:9093");
properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL");
properties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,SSL:SSL");
TestUtils.setIbpVersion(properties, MetadataVersion.latestProduction());
return new KafkaConfig(properties);
}
}
12 changes: 6 additions & 6 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,12 @@ class KafkaApisTest extends Logging {
metrics.close()
}

def createKafkaApis(interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latestTesting,
authorizer: Option[Authorizer] = None,
configRepository: ConfigRepository = new MockConfigRepository(),
overrideProperties: Map[String, String] = Map.empty,
featureVersions: Seq[FeatureVersion] = Seq.empty): KafkaApis = {
def createKafkaApis(
authorizer: Option[Authorizer] = None,
configRepository: ConfigRepository = new MockConfigRepository(),
overrideProperties: Map[String, String] = Map.empty,
featureVersions: Seq[FeatureVersion] = Seq.empty
): KafkaApis = {

val properties = TestUtils.createBrokerConfig(brokerId)
properties.put(KRaftConfigs.NODE_ID_CONFIG, brokerId.toString)
Expand All @@ -171,7 +172,6 @@ class KafkaApisTest extends Logging {
properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$voterId@localhost:9093")

overrideProperties.foreach( p => properties.put(p._1, p._2))
TestUtils.setIbpVersion(properties, interBrokerProtocolVersion)
val config = new KafkaConfig(properties)

val listenerType = ListenerType.BROKER
Expand Down
64 changes: 16 additions & 48 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ class KafkaConfigTest {
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props1))
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props2))
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props3))

}

@Test
Expand Down Expand Up @@ -187,7 +186,7 @@ class KafkaConfigTest {
val advertisedHostName = "routable-host"
val advertisedPort = 1234

val props = TestUtils.createBrokerConfig(0)
val props = createDefaultConfig()
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"PLAINTEXT://$advertisedHostName:$advertisedPort")

val serverConfig = KafkaConfig.fromProps(props)
Expand Down Expand Up @@ -617,29 +616,6 @@ class KafkaConfigTest {
assertEquals(conf.effectiveAdvertisedBrokerListeners, listenerListToEndPoints("PLAINTEXT://:9092"))
}

@Test
def testVersionConfiguration(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
val conf = KafkaConfig.fromProps(props)
assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, conf.interBrokerProtocolVersion)

props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0.0-IV1")
val conf2 = KafkaConfig.fromProps(props)
assertEquals(MetadataVersion.IBP_3_0_IV1, conf2.interBrokerProtocolVersion)

// check that patch version doesn't affect equality
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0.1-IV1")
val conf3 = KafkaConfig.fromProps(props)
assertEquals(MetadataVersion.IBP_3_0_IV1, conf3.interBrokerProtocolVersion)

//check that latest is newer than 3.0.1-IV0
assertTrue(MetadataVersion.latestTesting.isAtLeast(conf3.interBrokerProtocolVersion))
}

private def isValidKafkaConfig(props: Properties): Boolean = {
try {
KafkaConfig.fromProps(props)
Expand Down Expand Up @@ -1406,27 +1382,30 @@ class KafkaConfigTest {
}

@Test
def testRejectsNegativeNodeIdForRaftBasedBrokerCaseWithAutoGenEnabled(): Unit = {
// -1 is the default for both node.id and broker.id
def testAcceptsLargeId(): Unit = {
val largeBrokerId = 2000
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
assertFalse(isValidKafkaConfig(props))
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:9092")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, largeBrokerId.toString)
KafkaConfig.fromProps(props)
}

@Test
def testRejectsNegativeNodeIdForRaftBasedControllerCaseWithAutoGenEnabled(): Unit = {
// -1 is the default for both node.id and broker.id
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
def testRejectsNegativeNodeId(): Unit = {
val props = createDefaultConfig()
props.remove(ServerConfigs.BROKER_ID_CONFIG)
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "-1")
assertFalse(isValidKafkaConfig(props))
}

@Test
def testRejectsNegativeNodeId(): Unit = {
// -1 is the default for both node.id and broker.id
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
def testRejectsNegativeBrokerId(): Unit = {
val props = createDefaultConfig()
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "-1")
props.remove(KRaftConfigs.NODE_ID_CONFIG)
assertFalse(isValidKafkaConfig(props))
}

Expand Down Expand Up @@ -1613,17 +1592,6 @@ class KafkaConfigTest {
assertThrows(classOf[ConfigException], () => new KafkaConfig(props)).getMessage)
}

@Test
def testIgnoreUserInterBrokerProtocolVersionKRaft(): Unit = {
for (ibp <- Seq("3.0", "3.1", "3.2")) {
val props = new Properties()
props.putAll(kraftProps())
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, ibp)
val config = new KafkaConfig(props)
assertEquals(config.interBrokerProtocolVersion, MetadataVersion.MINIMUM_KRAFT_VERSION)
}
}

@Test
def testDefaultInterBrokerProtocolVersionKRaft(): Unit = {
val props = new Properties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadat
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
Expand Down Expand Up @@ -262,7 +262,7 @@ class KafkaRaftServerTest {
}

@Test
def testKRaftUpdateWithIBP(): Unit = {
def testKRaftUpdateAt3_3_IV1(): Unit = {
val clusterId = clusterIdBase64
val nodeId = 0
val metaProperties = new MetaProperties.Builder().
Expand All @@ -278,10 +278,9 @@ class KafkaRaftServerTest {
configProperties.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:9093")
configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
configProperties.put(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.3-IV1")

val (metaPropertiesEnsemble, bootstrapMetadata) =
invokeLoadMetaProperties(metaProperties, configProperties, None)
invokeLoadMetaProperties(metaProperties, configProperties, Some(MetadataVersion.IBP_3_3_IV1))

assertEquals(metaProperties, metaPropertiesEnsemble.logDirProps().values().iterator().next())
assertTrue(metaPropertiesEnsemble.errorLogDirs().isEmpty)
Expand All @@ -290,7 +289,7 @@ class KafkaRaftServerTest {
}

@Test
def testKRaftUpdateWithoutIBP(): Unit = {
def testKRaftUpdate(): Unit = {
val clusterId = clusterIdBase64
val nodeId = 0
val metaProperties = new MetaProperties.Builder().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBat
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.storage.internals.log.LogAppendInfo
Expand Down Expand Up @@ -78,13 +77,16 @@ class ReplicaFetcherThreadTest {
TestUtils.clearYammerMetrics()
}

private def createReplicaFetcherThread(name: String,
fetcherId: Int,
brokerConfig: KafkaConfig,
failedPartitions: FailedPartitions,
replicaMgr: ReplicaManager,
quota: ReplicaQuota,
leaderEndpointBlockingSend: BlockingSend): ReplicaFetcherThread = {
private def createReplicaFetcherThread(
name: String,
fetcherId: Int,
brokerConfig: KafkaConfig,
failedPartitions: FailedPartitions,
replicaMgr: ReplicaManager,
quota: ReplicaQuota,
leaderEndpointBlockingSend: BlockingSend,
metadataVersion: MetadataVersion = MetadataVersion.latestTesting()
): ReplicaFetcherThread = {
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${brokerConfig.brokerId}, leaderId=${leaderEndpointBlockingSend.brokerEndPoint().id}, fetcherId=$fetcherId] ")
val fetchSessionHandler = new FetchSessionHandler(logContext, leaderEndpointBlockingSend.brokerEndPoint().id)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, leaderEndpointBlockingSend, fetchSessionHandler,
Expand All @@ -96,7 +98,7 @@ class ReplicaFetcherThreadTest {
replicaMgr,
quota,
logContext.logPrefix,
() => brokerConfig.interBrokerProtocolVersion)
() => metadataVersion)
}

@Test
Expand Down Expand Up @@ -179,9 +181,8 @@ class ReplicaFetcherThreadTest {
verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.latestTesting, epochFetchCount = 0)
}

private def verifyFetchLeaderEpochOnFirstFetch(ibp: MetadataVersion, epochFetchCount: Int): Unit = {
private def verifyFetchLeaderEpochOnFirstFetch(metadataVersion: MetadataVersion, epochFetchCount: Int): Unit = {
val props = TestUtils.createBrokerConfig(1)
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, ibp.version)
val config = KafkaConfig.fromProps(props)

//Setup all dependencies
Expand Down Expand Up @@ -219,7 +220,8 @@ class ReplicaFetcherThreadTest {
failedPartitions,
replicaManager,
UNBOUNDED_QUOTA,
mockNetwork
mockNetwork,
metadataVersion
)
thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), 0L), t1p1 -> initialFetchState(Some(topicId1), 0L)))

Expand Down
14 changes: 0 additions & 14 deletions core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -357,26 +357,12 @@ Found problem:
"Failed to find content in output: " + stream.toString())
}

@Test
def testFormatWithReleaseVersionDefault(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultStaticQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
properties.setProperty("inter.broker.protocol.version", "3.7")
val stream = new ByteArrayOutputStream()
assertEquals(0, runFormatCommand(stream, properties))
assertTrue(stream.toString().contains("3.7-IV4"),
"Failed to find content in output: " + stream.toString())
}

@Test
def testFormatWithReleaseVersionDefaultAndReleaseVersion(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultStaticQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
properties.setProperty("inter.broker.protocol.version", "3.7")
val stream = new ByteArrayOutputStream()
assertEquals(0, runFormatCommand(stream, properties, Seq(
"--release-version", "3.6-IV0",
Expand Down
4 changes: 0 additions & 4 deletions core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,6 @@ object TestUtils extends Logging {
props
}

def setIbpVersion(config: Properties, version: MetadataVersion): Unit = {
config.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, version.version)
}

def createAdminClient[B <: KafkaBroker](
brokers: Seq[B],
listenerName: ListenerName,
Expand Down
Loading

0 comments on commit b2b2408

Please sign in to comment.