Skip to content

Commit

Permalink
KAFKA-15853: Move general configs out of KafkaConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
OmniaGM committed May 23, 2024
1 parent 14b5c4d commit f61b34a
Show file tree
Hide file tree
Showing 72 changed files with 558 additions and 539 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.metadata.BrokerState;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.KafkaServerConfigs;
import org.apache.kafka.server.config.ZkConfigs;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.slf4j.Logger;
Expand Down Expand Up @@ -163,7 +164,7 @@ public void start() {
private void doStart() {
brokerConfig.put(ZkConfigs.ZK_CONNECT_CONFIG, zKConnectString());

putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
putIfAbsent(brokerConfig, KafkaServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, true);
putIfAbsent(brokerConfig, GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0);
putIfAbsent(brokerConfig, GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) brokers.length);
putIfAbsent(brokerConfig, AUTO_CREATE_TOPICS_ENABLE_CONFIG, false);
Expand All @@ -178,7 +179,7 @@ private void doStart() {
listenerName = new ListenerName(listenerConfig.toString());

for (int i = 0; i < brokers.length; i++) {
brokerConfig.put(KafkaConfig.BrokerIdProp(), i);
brokerConfig.put(KafkaServerConfigs.BROKER_ID_CONFIG, i);
currentBrokerLogDirs[i] = currentBrokerLogDirs[i] == null ? createLogDir() : currentBrokerLogDirs[i];
brokerConfig.put(LOG_DIR_CONFIG, currentBrokerLogDirs[i]);
if (!hasListenerConfig)
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import kafka.cluster.Partition;
import kafka.log.UnifiedLog;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import kafka.server.StopPartition;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
Expand All @@ -44,6 +43,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.config.KafkaServerConfigs;
import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
Expand Down Expand Up @@ -252,7 +252,7 @@ public RemoteStorageManager run() {

private void configureRSM() {
final Map<String, Object> rsmProps = new HashMap<>(rlmConfig.remoteStorageManagerProps());
rsmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
rsmProps.put(KafkaServerConfigs.BROKER_ID_CONFIG, brokerId);
remoteLogStorageManager.configure(rsmProps);
}

Expand Down Expand Up @@ -286,7 +286,7 @@ private void configureRLMM() {
// update the remoteLogMetadataProps here to override endpoint config if any
rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps());

rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
rlmmProps.put(KafkaServerConfigs.BROKER_ID_CONFIG, brokerId);
rlmmProps.put(LOG_DIR_CONFIG, logDir);
rlmmProps.put("cluster.id", clusterId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.server.config.KafkaServerConfigs
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation}
Expand Down Expand Up @@ -812,7 +812,7 @@ private[transaction] case class TransactionConfig(transactionalIdExpirationMs: I
transactionLogMinInsyncReplicas: Int = TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT,
abortTimedOutTransactionsIntervalMs: Int = TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT,
removeExpiredTransactionalIdsIntervalMs: Int = TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT,
requestTimeoutMs: Int = Defaults.REQUEST_TIMEOUT_MS)
requestTimeoutMs: Int = KafkaServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT)

case class TransactionalIdAndProducerIdEpoch(transactionalId: String, producerId: Long, producerEpoch: Short) {
override def toString: String = {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.server.config.KafkaServerConfigs
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, TransactionIndex}
Expand Down Expand Up @@ -499,7 +500,7 @@ object LogCleaner {
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP,
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP,
CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP,
KafkaConfig.MessageMaxBytesProp,
KafkaServerConfigs.MESSAGE_MAX_BYTES_CONFIG,
CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP,
CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP
)
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils}
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, Reconfigurable}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.config.QuotaConfigs
import org.apache.kafka.server.config.{KafkaServerConfigs, QuotaConfigs}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.FutureUtils
import org.slf4j.event.Level
Expand Down Expand Up @@ -425,7 +425,7 @@ object SocketServer {
object DataPlaneAcceptor {
val ThreadPrefix: String = "data-plane"
val MetricPrefix: String = ""
val ListenerReconfigurableConfigs: Set[String] = Set(KafkaConfig.NumNetworkThreadsProp)
val ListenerReconfigurableConfigs: Set[String] = Set(KafkaServerConfigs.NUM_NETWORK_THREADS_CONFIG)
}

class DataPlaneAcceptor(socketServer: SocketServer,
Expand Down Expand Up @@ -506,7 +506,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
* the configs have passed validation using [[validateReconfiguration( Map )]].
*/
override def reconfigure(configs: util.Map[String, _]): Unit = {
val newNumNetworkThreads = configs.get(KafkaConfig.NumNetworkThreadsProp).asInstanceOf[Int]
val newNumNetworkThreads = configs.get(KafkaServerConfigs.NUM_NETWORK_THREADS_CONFIG).asInstanceOf[Int]

if (newNumNetworkThreads != processors.length) {
info(s"Resizing network thread pool size for ${endPoint.listenerName} listener from ${processors.length} to $newNumNetworkThreads")
Expand All @@ -522,7 +522,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
* Configure this class with the given key-value pairs
*/
override def configure(configs: util.Map[String, _]): Unit = {
addProcessors(configs.get(KafkaConfig.NumNetworkThreadsProp).asInstanceOf[Int])
addProcessors(configs.get(KafkaServerConfigs.NUM_NETWORK_THREADS_CONFIG).asInstanceOf[Int])
}
}

Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.{ConfigType, KafkaSecurityConfigs, ReplicationConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZooKeeperInternals}
import org.apache.kafka.server.config.{ConfigType, KafkaSecurityConfigs, KafkaServerConfigs, ReplicationConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZooKeeperInternals}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfigs}
import org.apache.kafka.server.telemetry.ClientTelemetry
Expand Down Expand Up @@ -98,7 +98,7 @@ object DynamicBrokerConfig {
DynamicProducerStateManagerConfig ++
DynamicRemoteLogConfig.ReconfigurableConfigs

private val ClusterLevelListenerConfigs = Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, KafkaConfig.NumNetworkThreadsProp)
private val ClusterLevelListenerConfigs = Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, KafkaServerConfigs.NUM_NETWORK_THREADS_CONFIG)
private val PerBrokerConfigs = (DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs).diff(
ClusterLevelListenerConfigs)
private val ListenerMechanismConfigs = Set(KafkaSecurityConfigs.SASL_JAAS_CONFIG,
Expand Down Expand Up @@ -769,10 +769,10 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok

object DynamicThreadPool {
val ReconfigurableConfigs = Set(
KafkaConfig.NumIoThreadsProp,
KafkaServerConfigs.NUM_IO_THREADS_CONFIG,
ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG,
ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG,
KafkaConfig.BackgroundThreadsProp)
KafkaServerConfigs.BACKGROUND_THREADS_CONFIG)

def validateReconfiguration(currentConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
newConfig.values.forEach { (k, v) =>
Expand All @@ -794,10 +794,10 @@ object DynamicThreadPool {

def getValue(config: KafkaConfig, name: String): Int = {
name match {
case KafkaConfig.NumIoThreadsProp => config.numIoThreads
case KafkaServerConfigs.NUM_IO_THREADS_CONFIG => config.numIoThreads
case ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG => config.numReplicaFetchers
case ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG => config.numRecoveryThreadsPerDataDir
case KafkaConfig.BackgroundThreadsProp => config.backgroundThreads
case KafkaServerConfigs.BACKGROUND_THREADS_CONFIG => config.backgroundThreads
case n => throw new IllegalStateException(s"Unexpected config $n")
}
}
Expand All @@ -806,7 +806,7 @@ object DynamicThreadPool {
class ControllerDynamicThreadPool(controller: ControllerServer) extends BrokerReconfigurable {

override def reconfigurableConfigs: Set[String] = {
Set(KafkaConfig.NumIoThreadsProp)
Set(KafkaServerConfigs.NUM_IO_THREADS_CONFIG)
}

override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
Expand Down Expand Up @@ -899,7 +899,7 @@ class DynamicMetricsReporters(brokerId: Int, config: KafkaConfig, metrics: Metri

class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Metrics, clusterId: String) {
private[server] val dynamicConfig = config.dynamicConfig
private val propsOverride = Map[String, AnyRef](KafkaConfig.BrokerIdProp -> brokerId.toString)
private val propsOverride = Map[String, AnyRef](KafkaServerConfigs.BROKER_ID_CONFIG -> brokerId.toString)
private[server] val currentReporters = mutable.Map[String, MetricsReporter]()
createReporters(config, clusterId, metricsReporterClasses(dynamicConfig.currentKafkaConfig.values()).asJava,
Collections.emptyMap[String, Object])
Expand Down Expand Up @@ -1009,7 +1009,7 @@ object DynamicListenerConfig {
SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,

// Network threads
KafkaConfig.NumNetworkThreadsProp
KafkaServerConfigs.NUM_NETWORK_THREADS_CONFIG
)
}

Expand Down
Loading

0 comments on commit f61b34a

Please sign in to comment.