diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index dbae6ef8507dd..f835475fa9efe 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -40,10 +40,10 @@ import scala.io.StdIn object AclCommand extends Logging { - val AuthorizerDeprecationMessage: String = "Warning: support for ACL configuration directly " + + private val AuthorizerDeprecationMessage: String = "Warning: support for ACL configuration directly " + "through the authorizer is deprecated and will be removed in a future release. Please use " + "--bootstrap-server instead to set ACLs through the admin client." - val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, PatternType.LITERAL) + private val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, PatternType.LITERAL) private val Newline = scala.util.Properties.lineSeparator @@ -89,7 +89,7 @@ object AclCommand extends Logging { def listAcls(): Unit } - class AdminClientService(val opts: AclCommandOptions) extends AclCommandService with Logging { + private class AdminClientService(val opts: AclCommandOptions) extends AclCommandService with Logging { private def withAdminClient(opts: AclCommandOptions)(f: Admin => Unit): Unit = { val props = if (opts.options.has(opts.commandConfigOpt)) @@ -487,32 +487,32 @@ object AclCommand extends Logging { for ((resource, acls) <- resourceToAcls) { val validOps = AclEntry.supportedOperations(resource.resourceType) + AclOperation.ALL if ((acls.map(_.operation) -- validOps).nonEmpty) - CommandLineUtils.printUsageAndExit(opts.parser, s"ResourceType ${resource.resourceType} only supports operations ${validOps.map(JSecurityUtils.operationName(_)).mkString(", ")}") + CommandLineUtils.printUsageAndExit(opts.parser, s"ResourceType ${resource.resourceType} only supports operations ${validOps.map(JSecurityUtils.operationName).mkString(", ")}") } } class AclCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) { val CommandConfigDoc = "A property file containing configs to be passed to Admin Client." - val bootstrapServerOpt = parser.accepts("bootstrap-server", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." + + val bootstrapServerOpt: OptionSpec[String] = parser.accepts("bootstrap-server", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." + " This list should be in the form host1:port1,host2:port2,... This config is required for acl management using admin client API.") .withRequiredArg .describedAs("server to connect to") .ofType(classOf[String]) - val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc) + val commandConfigOpt: OptionSpec[String] = parser.accepts("command-config", CommandConfigDoc) .withOptionalArg() .describedAs("command-config") .ofType(classOf[String]) - val authorizerOpt = parser.accepts("authorizer", "DEPRECATED: Fully qualified class name of " + + val authorizerOpt: OptionSpec[String] = parser.accepts("authorizer", "DEPRECATED: Fully qualified class name of " + "the authorizer, which defaults to kafka.security.authorizer.AclAuthorizer if --bootstrap-server is not provided. " + AclCommand.AuthorizerDeprecationMessage) .withRequiredArg .describedAs("authorizer") .ofType(classOf[String]) - val authorizerPropertiesOpt = parser.accepts("authorizer-properties", "DEPRECATED: The " + + val authorizerPropertiesOpt: OptionSpec[String] = parser.accepts("authorizer-properties", "DEPRECATED: The " + "properties required to configure an instance of the Authorizer specified by --authorizer. " + "These are key=val pairs. For the default authorizer, example values are: zookeeper.connect=localhost:2181. " + AclCommand.AuthorizerDeprecationMessage) @@ -520,36 +520,36 @@ object AclCommand extends Logging { .describedAs("authorizer-properties") .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "topic to which ACLs should be added or removed. " + + val topicOpt: OptionSpec[String] = parser.accepts("topic", "topic to which ACLs should be added or removed. " + "A value of '*' indicates ACL should apply to all topics.") .withRequiredArg .describedAs("topic") .ofType(classOf[String]) - val clusterOpt = parser.accepts("cluster", "Add/Remove cluster ACLs.") - val groupOpt = parser.accepts("group", "Consumer Group to which the ACLs should be added or removed. " + + val clusterOpt: OptionSpecBuilder = parser.accepts("cluster", "Add/Remove cluster ACLs.") + val groupOpt: OptionSpec[String] = parser.accepts("group", "Consumer Group to which the ACLs should be added or removed. " + "A value of '*' indicates the ACLs should apply to all groups.") .withRequiredArg .describedAs("group") .ofType(classOf[String]) - val transactionalIdOpt = parser.accepts("transactional-id", "The transactionalId to which ACLs should " + + val transactionalIdOpt: OptionSpec[String] = parser.accepts("transactional-id", "The transactionalId to which ACLs should " + "be added or removed. A value of '*' indicates the ACLs should apply to all transactionalIds.") .withRequiredArg .describedAs("transactional-id") .ofType(classOf[String]) - val idempotentOpt = parser.accepts("idempotent", "Enable idempotence for the producer. This should be " + + val idempotentOpt: OptionSpecBuilder = parser.accepts("idempotent", "Enable idempotence for the producer. This should be " + "used in combination with the --producer option. Note that idempotence is enabled automatically if " + "the producer is authorized to a particular transactional-id.") - val delegationTokenOpt = parser.accepts("delegation-token", "Delegation token to which ACLs should be added or removed. " + + val delegationTokenOpt: OptionSpec[String] = parser.accepts("delegation-token", "Delegation token to which ACLs should be added or removed. " + "A value of '*' indicates ACL should apply to all tokens.") .withRequiredArg .describedAs("delegation-token") .ofType(classOf[String]) - val resourcePatternType = parser.accepts("resource-pattern-type", "The type of the resource pattern or pattern filter. " + + val resourcePatternType: OptionSpec[PatternType] = parser.accepts("resource-pattern-type", "The type of the resource pattern or pattern filter. " + "When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'. " + "When listing or removing acls, a specific pattern type can be used to list or remove acls from specific resource patterns, " + "or use the filter values of 'any' or 'match', where 'any' will match any pattern type, but will match the resource name exactly, " + @@ -560,24 +560,24 @@ object AclCommand extends Logging { .withValuesConvertedBy(new PatternTypeConverter()) .defaultsTo(PatternType.LITERAL) - val addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.") - val removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.") - val listOpt = parser.accepts("list", "List ACLs for the specified resource, use --topic or --group or --cluster to specify a resource.") + val addOpt: OptionSpecBuilder = parser.accepts("add", "Indicates you are trying to add ACLs.") + val removeOpt: OptionSpecBuilder = parser.accepts("remove", "Indicates you are trying to remove ACLs.") + val listOpt: OptionSpecBuilder = parser.accepts("list", "List ACLs for the specified resource, use --topic or --group or --cluster to specify a resource.") - val operationsOpt = parser.accepts("operation", "Operation that is being allowed or denied. Valid operation names are: " + Newline + + val operationsOpt: OptionSpec[String] = parser.accepts("operation", "Operation that is being allowed or denied. Valid operation names are: " + Newline + AclEntry.AclOperations.map("\t" + JSecurityUtils.operationName(_)).mkString(Newline) + Newline) .withRequiredArg .ofType(classOf[String]) .defaultsTo(JSecurityUtils.operationName(AclOperation.ALL)) - val allowPrincipalsOpt = parser.accepts("allow-principal", "principal is in principalType:name format." + + val allowPrincipalsOpt: OptionSpec[String] = parser.accepts("allow-principal", "principal is in principalType:name format." + " Note that principalType must be supported by the Authorizer being used." + " For example, User:'*' is the wild card indicating all users.") .withRequiredArg .describedAs("allow-principal") .ofType(classOf[String]) - val denyPrincipalsOpt = parser.accepts("deny-principal", "principal is in principalType:name format. " + + val denyPrincipalsOpt: OptionSpec[String] = parser.accepts("deny-principal", "principal is in principalType:name format. " + "By default anyone not added through --allow-principal is denied access. " + "You only need to use this option as negation to already allowed set. " + "Note that principalType must be supported by the Authorizer being used. " + @@ -588,33 +588,33 @@ object AclCommand extends Logging { .describedAs("deny-principal") .ofType(classOf[String]) - val listPrincipalsOpt = parser.accepts("principal", "List ACLs for the specified principal. principal is in principalType:name format." + + val listPrincipalsOpt: OptionSpec[String] = parser.accepts("principal", "List ACLs for the specified principal. principal is in principalType:name format." + " Note that principalType must be supported by the Authorizer being used. Multiple --principal option can be passed.") .withOptionalArg() .describedAs("principal") .ofType(classOf[String]) - val allowHostsOpt = parser.accepts("allow-host", "Host from which principals listed in --allow-principal will have access. " + + val allowHostsOpt: OptionSpec[String] = parser.accepts("allow-host", "Host from which principals listed in --allow-principal will have access. " + "If you have specified --allow-principal then the default for this option will be set to '*' which allows access from all hosts.") .withRequiredArg .describedAs("allow-host") .ofType(classOf[String]) - val denyHostsOpt = parser.accepts("deny-host", "Host from which principals listed in --deny-principal will be denied access. " + + val denyHostsOpt: OptionSpec[String] = parser.accepts("deny-host", "Host from which principals listed in --deny-principal will be denied access. " + "If you have specified --deny-principal then the default for this option will be set to '*' which denies access from all hosts.") .withRequiredArg .describedAs("deny-host") .ofType(classOf[String]) - val producerOpt = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " + + val producerOpt: OptionSpecBuilder = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " + "This will generate ACLs that allows WRITE,DESCRIBE and CREATE on topic.") - val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " + + val consumerOpt: OptionSpecBuilder = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " + "This will generate ACLs that allows READ,DESCRIBE on topic and READ on group.") - val forceOpt = parser.accepts("force", "Assume Yes to all queries and do not prompt.") + val forceOpt: OptionSpecBuilder = parser.accepts("force", "Assume Yes to all queries and do not prompt.") - val zkTlsConfigFile = parser.accepts("zk-tls-config-file", + val zkTlsConfigFile: OptionSpec[String] = parser.accepts("zk-tls-config-file", "DEPRECATED: Identifies the file where ZooKeeper client TLS connectivity properties are defined for" + " the default authorizer kafka.security.authorizer.AclAuthorizer." + " Any properties other than the following (with or without an \"authorizer.\" prefix) are ignored: " + @@ -624,7 +624,7 @@ object AclCommand extends Logging { AclCommand.AuthorizerDeprecationMessage) .withRequiredArg().describedAs("Authorizer ZooKeeper TLS configuration").ofType(classOf[String]) - val userPrincipalOpt = parser.accepts("user-principal", "Specifies a user principal as a resource in relation with the operation. For instance " + + val userPrincipalOpt: OptionSpec[String] = parser.accepts("user-principal", "Specifies a user principal as a resource in relation with the operation. For instance " + "one could grant CreateTokens or DescribeTokens permission on a given user principal.") .withRequiredArg() .describedAs("user-principal") diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala index 6cf8beca2755a..45df36f7a1f87 100644 --- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala +++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala @@ -22,6 +22,7 @@ import java.io.IOException import java.util.Properties import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit} +import joptsimple.OptionSpec import kafka.utils.Implicits._ import kafka.utils.Logging import org.apache.kafka.common.utils.Utils @@ -61,8 +62,8 @@ object BrokerApiVersionsCommand { val brokerMap = adminClient.listAllBrokerVersionInfo() brokerMap.forKeyValue { (broker, versionInfoOrError) => versionInfoOrError match { - case Success(v) => out.print(s"${broker} -> ${v.toString(true)}\n") - case Failure(v) => out.print(s"${broker} -> ERROR: ${v}\n") + case Success(v) => out.print(s"$broker -> ${v.toString(true)}\n") + case Failure(v) => out.print(s"$broker -> ERROR: $v\n") } } adminClient.close() @@ -77,22 +78,22 @@ object BrokerApiVersionsCommand { AdminClient.create(props) } - class BrokerVersionCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) { - val BootstrapServerDoc = "REQUIRED: The server to connect to." - val CommandConfigDoc = "A property file containing configs to be passed to Admin Client." + private class BrokerVersionCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) { + private val BootstrapServerDoc = "REQUIRED: The server to connect to." + private val CommandConfigDoc = "A property file containing configs to be passed to Admin Client." - val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc) + val commandConfigOpt: OptionSpec[String] = parser.accepts("command-config", CommandConfigDoc) .withRequiredArg .describedAs("command config property file") .ofType(classOf[String]) - val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc) + val bootstrapServerOpt: OptionSpec[String] = parser.accepts("bootstrap-server", BootstrapServerDoc) .withRequiredArg .describedAs("server(s) to use for bootstrapping") .ofType(classOf[String]) options = parser.parse(args : _*) checkArgs() - def checkArgs(): Unit = { + private def checkArgs(): Unit = { CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to retrieve broker version information.") // check required args CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) @@ -105,10 +106,10 @@ object BrokerApiVersionsCommand { val client: ConsumerNetworkClient, val bootstrapBrokers: List[Node]) extends Logging { - @volatile var running = true - val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]() + @volatile private var running = true + private val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]() - val networkThread = new KafkaThread("admin-client-network-thread", () => { + private val networkThread = new KafkaThread("admin-client-network-thread", () => { try { while (running) client.poll(time.timer(Long.MaxValue)) @@ -200,17 +201,17 @@ object BrokerApiVersionsCommand { } private object AdminClient { - val DefaultConnectionMaxIdleMs = 9 * 60 * 1000 - val DefaultRequestTimeoutMs = 5000 - val DefaultMaxInFlightRequestsPerConnection = 100 - val DefaultReconnectBackoffMs = 50 - val DefaultReconnectBackoffMax = 50 - val DefaultSendBufferBytes = 128 * 1024 - val DefaultReceiveBufferBytes = 32 * 1024 - val DefaultRetryBackoffMs = 100 - - val AdminClientIdSequence = new AtomicInteger(1) - val AdminConfigDef = { + private val DefaultConnectionMaxIdleMs = 9 * 60 * 1000 + private val DefaultRequestTimeoutMs = 5000 + private val DefaultMaxInFlightRequestsPerConnection = 100 + private val DefaultReconnectBackoffMs = 50 + private val DefaultReconnectBackoffMax = 50 + private val DefaultSendBufferBytes = 128 * 1024 + private val DefaultReceiveBufferBytes = 32 * 1024 + private val DefaultRetryBackoffMs = 100 + + private val AdminClientIdSequence = new AtomicInteger(1) + private val AdminConfigDef = { val config = new ConfigDef() .define( CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index a8ca5960528e6..3f75b7d7cebb8 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -78,9 +78,9 @@ object ConfigCommand extends Logging { val BrokerDefaultEntityName = "" val BrokerLoggerConfigType = "broker-loggers" - val BrokerSupportedConfigTypes = ConfigType.ALL.asScala :+ BrokerLoggerConfigType :+ ConfigType.CLIENT_METRICS - val ZkSupportedConfigTypes = Seq(ConfigType.USER, ConfigType.BROKER) - val DefaultScramIterations = 4096 + private val BrokerSupportedConfigTypes = ConfigType.ALL.asScala :+ BrokerLoggerConfigType :+ ConfigType.CLIENT_METRICS + private val ZkSupportedConfigTypes = Seq(ConfigType.USER, ConfigType.BROKER) + private val DefaultScramIterations = 4096 def main(args: Array[String]): Unit = { try { @@ -316,7 +316,7 @@ object ConfigCommand extends Logging { props.keySet.forEach { propsKey => if (!propsKey.toString.matches("[a-zA-Z0-9._-]*")) { throw new IllegalArgumentException( - s"Invalid character found for config key: ${propsKey}" + s"Invalid character found for config key: $propsKey" ) } } @@ -684,7 +684,7 @@ object ConfigCommand extends Logging { } case class Entity(entityType: String, sanitizedName: Option[String]) { - val entityPath = sanitizedName match { + val entityPath: String = sanitizedName match { case Some(n) => entityType + "/" + n case None => entityType } @@ -706,7 +706,7 @@ object ConfigCommand extends Logging { } case class ConfigEntity(root: Entity, child: Option[Entity]) { - val fullSanitizedName = root.sanitizedName.getOrElse("") + child.map(s => "/" + s.entityPath).getOrElse("") + val fullSanitizedName: String = root.sanitizedName.getOrElse("") + child.map(s => "/" + s.entityPath).getOrElse("") def getAllEntities(zkClient: KafkaZkClient) : Seq[ConfigEntity] = { // Describe option examples: @@ -789,39 +789,39 @@ object ConfigCommand extends Logging { class ConfigCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) { - val zkConnectOpt = parser.accepts("zookeeper", "DEPRECATED. The connection string for the zookeeper connection in the form host:port. " + + val zkConnectOpt: OptionSpec[String] = parser.accepts("zookeeper", "DEPRECATED. The connection string for the zookeeper connection in the form host:port. " + "Multiple URLS can be given to allow fail-over. Required when configuring SCRAM credentials for users or " + "dynamic broker configs when the relevant broker(s) are down. Not allowed otherwise.") .withRequiredArg .describedAs("urls") .ofType(classOf[String]) - val bootstrapServerOpt = parser.accepts("bootstrap-server", "The Kafka servers to connect to.") + val bootstrapServerOpt: OptionSpec[String] = parser.accepts("bootstrap-server", "The Kafka servers to connect to.") .withRequiredArg .describedAs("server to connect to") .ofType(classOf[String]) - val bootstrapControllerOpt = parser.accepts("bootstrap-controller", "The Kafka controllers to connect to.") + val bootstrapControllerOpt: OptionSpec[String] = parser.accepts("bootstrap-controller", "The Kafka controllers to connect to.") .withRequiredArg .describedAs("controller to connect to") .ofType(classOf[String]) - val commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client. " + + val commandConfigOpt: OptionSpec[String] = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client. " + "This is used only with --bootstrap-server option for describing and altering broker configs.") .withRequiredArg .describedAs("command config property file") .ofType(classOf[String]) - val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.") - val describeOpt = parser.accepts("describe", "List configs for the given entity.") - val allOpt = parser.accepts("all", "List all configs for the given topic, broker, or broker-logger entity (includes static configuration when the entity type is brokers)") + val alterOpt: OptionSpecBuilder = parser.accepts("alter", "Alter the configuration for the entity.") + val describeOpt: OptionSpecBuilder = parser.accepts("describe", "List configs for the given entity.") + val allOpt: OptionSpecBuilder = parser.accepts("all", "List all configs for the given topic, broker, or broker-logger entity (includes static configuration when the entity type is brokers)") - val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers/broker-loggers/ips/client-metrics)") + val entityType: OptionSpec[String] = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers/broker-loggers/ips/client-metrics)") .withRequiredArg .ofType(classOf[String]) - val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id/user principal name/broker id/ip/client metrics)") + val entityName: OptionSpec[String] = parser.accepts("entity-name", "Name of entity (topic name/client id/user principal name/broker id/ip/client metrics)") .withRequiredArg .ofType(classOf[String]) - val entityDefault = parser.accepts("entity-default", "Default entity name for clients/users/brokers/ips (applies to corresponding entity type in command line)") + private val entityDefault: OptionSpecBuilder = parser.accepts("entity-default", "Default entity name for clients/users/brokers/ips (applies to corresponding entity type in command line)") - val nl = System.getProperty("line.separator") - val addConfig = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " + + val nl: String = System.getProperty("line.separator") + val addConfig: OptionSpec[String] = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " + "For entity-type '" + ConfigType.TOPIC + "': " + LogConfig.configNames.asScala.map("\t" + _).mkString(nl, nl, nl) + "For entity-type '" + ConfigType.BROKER + "': " + DynamicConfig.Broker.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) + "For entity-type '" + ConfigType.USER + "': " + DynamicConfig.User.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) + @@ -831,37 +831,37 @@ object ConfigCommand extends Logging { s"Entity types '${ConfigType.USER}' and '${ConfigType.CLIENT}' may be specified together to update config for clients of a specific user.") .withRequiredArg .ofType(classOf[String]) - val addConfigFile = parser.accepts("add-config-file", "Path to a properties file with configs to add. See add-config for a list of valid configurations.") + val addConfigFile: OptionSpec[String] = parser.accepts("add-config-file", "Path to a properties file with configs to add. See add-config for a list of valid configurations.") .withRequiredArg .ofType(classOf[String]) - val deleteConfig = parser.accepts("delete-config", "config keys to remove 'k1,k2'") + val deleteConfig: OptionSpec[String] = parser.accepts("delete-config", "config keys to remove 'k1,k2'") .withRequiredArg .ofType(classOf[String]) .withValuesSeparatedBy(',') - val forceOpt = parser.accepts("force", "Suppress console prompts") - val topic = parser.accepts("topic", "The topic's name.") + val forceOpt: OptionSpecBuilder = parser.accepts("force", "Suppress console prompts") + val topic: OptionSpec[String] = parser.accepts("topic", "The topic's name.") .withRequiredArg .ofType(classOf[String]) - val client = parser.accepts("client", "The client's ID.") + val client: OptionSpec[String] = parser.accepts("client", "The client's ID.") .withRequiredArg .ofType(classOf[String]) - val clientDefaults = parser.accepts("client-defaults", "The config defaults for all clients.") - val user = parser.accepts("user", "The user's principal name.") + private val clientDefaults = parser.accepts("client-defaults", "The config defaults for all clients.") + val user: OptionSpec[String] = parser.accepts("user", "The user's principal name.") .withRequiredArg .ofType(classOf[String]) - val userDefaults = parser.accepts("user-defaults", "The config defaults for all users.") - val broker = parser.accepts("broker", "The broker's ID.") + private val userDefaults = parser.accepts("user-defaults", "The config defaults for all users.") + val broker: OptionSpec[String] = parser.accepts("broker", "The broker's ID.") .withRequiredArg .ofType(classOf[String]) - val brokerDefaults = parser.accepts("broker-defaults", "The config defaults for all brokers.") - val brokerLogger = parser.accepts("broker-logger", "The broker's ID for its logger config.") + private val brokerDefaults = parser.accepts("broker-defaults", "The config defaults for all brokers.") + private val brokerLogger = parser.accepts("broker-logger", "The broker's ID for its logger config.") .withRequiredArg .ofType(classOf[String]) - val ipDefaults = parser.accepts("ip-defaults", "The config defaults for all IPs.") - val ip = parser.accepts("ip", "The IP address.") + private val ipDefaults = parser.accepts("ip-defaults", "The config defaults for all IPs.") + val ip: OptionSpec[String] = parser.accepts("ip", "The IP address.") .withRequiredArg .ofType(classOf[String]) - val zkTlsConfigFile = parser.accepts("zk-tls-config-file", + val zkTlsConfigFile: OptionSpec[String] = parser.accepts("zk-tls-config-file", "Identifies the file where ZooKeeper client TLS connectivity properties are defined. Any properties other than " + KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList.sorted.mkString(", ") + " are ignored.") .withRequiredArg().describedAs("ZooKeeper TLS configuration").ofType(classOf[String]) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 7db2707e287e3..243021ba8af45 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -17,6 +17,8 @@ package kafka.admin +import com.fasterxml.jackson.databind.{ObjectReader, ObjectWriter} + import java.time.{Duration, Instant} import java.util.{Collections, Properties} import com.fasterxml.jackson.dataformat.csv.CsvMapper @@ -34,7 +36,7 @@ import scala.jdk.CollectionConverters._ import scala.collection.mutable.ListBuffer import scala.collection.{Map, Seq, immutable, mutable} import scala.util.{Failure, Success, Try} -import joptsimple.{OptionException, OptionSpec} +import joptsimple.{OptionException, OptionSpec, OptionSpecBuilder} import org.apache.kafka.common.protocol.Errors import scala.collection.immutable.TreeMap @@ -104,12 +106,12 @@ object ConsumerGroupCommand extends Logging { val MISSING_COLUMN_VALUE = "-" - def printError(msg: String, e: Option[Throwable] = None): Unit = { + private def printError(msg: String, e: Option[Throwable] = None): Unit = { println(s"\nError: $msg") e.foreach(_.printStackTrace()) } - def printOffsetsToReset(groupAssignmentsToReset: Map[String, Map[TopicPartition, OffsetAndMetadata]]): Unit = { + private def printOffsetsToReset(groupAssignmentsToReset: Map[String, Map[TopicPartition, OffsetAndMetadata]]): Unit = { val format = "%-30s %-30s %-10s %-15s" if (groupAssignmentsToReset.nonEmpty) println("\n" + format.format("GROUP", "TOPIC", "PARTITION", "NEW-OFFSET")) @@ -139,21 +141,21 @@ object ConsumerGroupCommand extends Logging { private[admin] case class CsvRecordWithGroup(group: String, topic: String, partition: Int, offset: Long) extends CsvRecord private[admin] case class CsvRecordNoGroup(topic: String, partition: Int, offset: Long) extends CsvRecord private[admin] object CsvRecordWithGroup { - val fields = Array("group", "topic", "partition", "offset") + val fields: Array[String] = Array("group", "topic", "partition", "offset") } private[admin] object CsvRecordNoGroup { - val fields = Array("topic", "partition", "offset") + val fields: Array[String] = Array("topic", "partition", "offset") } // Example: CsvUtils().readerFor[CsvRecordWithoutGroup] private[admin] case class CsvUtils() { val mapper = new CsvMapper mapper.registerModule(DefaultScalaModule) - def readerFor[T <: CsvRecord : ClassTag] = { + def readerFor[T <: CsvRecord : ClassTag]: ObjectReader = { val schema = getSchema[T] val clazz = implicitly[ClassTag[T]].runtimeClass mapper.readerFor(clazz).`with`(schema) } - def writerFor[T <: CsvRecord : ClassTag] = { + def writerFor[T <: CsvRecord : ClassTag]: ObjectWriter = { val schema = getSchema[T] val clazz = implicitly[ClassTag[T]].runtimeClass mapper.writerFor(clazz).`with`(schema) @@ -562,7 +564,7 @@ object ConsumerGroupCommand extends Logging { /** * Returns states of the specified consumer groups and partition assignment states */ - def collectGroupsOffsets(groupIds: Seq[String]): TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])] = { + private def collectGroupsOffsets(groupIds: Seq[String]): TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])] = { val consumerGroups = describeConsumerGroups(groupIds) val groupOffsets = TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield { @@ -970,7 +972,7 @@ object ConsumerGroupCommand extends Logging { sealed trait LogOffsetResult - object LogOffsetResult { + private object LogOffsetResult { case class LogOffset(value: Long) extends LogOffsetResult case object Unknown extends LogOffsetResult case object Ignore extends LogOffsetResult @@ -978,124 +980,124 @@ object ConsumerGroupCommand extends Logging { class ConsumerGroupCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) { val BootstrapServerDoc = "REQUIRED: The server(s) to connect to." - val GroupDoc = "The consumer group we wish to act on." - val TopicDoc = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " + + private val GroupDoc = "The consumer group we wish to act on." + private val TopicDoc = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " + "In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " + "Reset-offsets also supports multiple topic inputs." - val AllTopicsDoc = "Consider all topics assigned to a group in the `reset-offsets` process." - val ListDoc = "List all consumer groups." - val DescribeDoc = "Describe consumer group and list offset lag (number of messages not yet processed) related to given group." - val AllGroupsDoc = "Apply to all consumer groups." - val nl = System.getProperty("line.separator") - val DeleteDoc = "Pass in groups to delete topic partition offsets and ownership information " + + private val AllTopicsDoc = "Consider all topics assigned to a group in the `reset-offsets` process." + private val ListDoc = "List all consumer groups." + private val DescribeDoc = "Describe consumer group and list offset lag (number of messages not yet processed) related to given group." + private val AllGroupsDoc = "Apply to all consumer groups." + val nl: String = System.getProperty("line.separator") + private val DeleteDoc = "Pass in groups to delete topic partition offsets and ownership information " + "over the entire consumer group. For instance --group g1 --group g2" - val TimeoutMsDoc = "The timeout that can be set for some use cases. For example, it can be used when describing the group " + + private val TimeoutMsDoc = "The timeout that can be set for some use cases. For example, it can be used when describing the group " + "to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, " + "or is going through some changes)." - val CommandConfigDoc = "Property file containing configs to be passed to Admin Client and Consumer." - val ResetOffsetsDoc = "Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive" + nl + + val CommandConfigDoc: String = "Property file containing configs to be passed to Admin Client and Consumer." + private val ResetOffsetsDoc = "Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive" + nl + "Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets. " + "Additionally, the --export option is used to export the results to a CSV format." + nl + "You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, " + "--to-latest, --shift-by, --from-file, --to-current, --to-offset." + nl + "To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from-file'." - val DryRunDoc = "Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets." - val ExecuteDoc = "Execute operation. Supported operations: reset-offsets." - val ExportDoc = "Export operation execution to a CSV file. Supported operations: reset-offsets." - val ResetToOffsetDoc = "Reset offsets to a specific offset." - val ResetFromFileDoc = "Reset offsets to values defined in CSV file." - val ResetToDatetimeDoc = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'" - val ResetByDurationDoc = "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'" - val ResetToEarliestDoc = "Reset offsets to earliest offset." - val ResetToLatestDoc = "Reset offsets to latest offset." - val ResetToCurrentDoc = "Reset offsets to current offset." - val ResetShiftByDoc = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative." - val MembersDoc = "Describe members of the group. This option may be used with '--describe' and '--bootstrap-server' options only." + nl + + private val DryRunDoc = "Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets." + private val ExecuteDoc = "Execute operation. Supported operations: reset-offsets." + private val ExportDoc = "Export operation execution to a CSV file. Supported operations: reset-offsets." + private val ResetToOffsetDoc = "Reset offsets to a specific offset." + private val ResetFromFileDoc = "Reset offsets to values defined in CSV file." + private val ResetToDatetimeDoc = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'" + private val ResetByDurationDoc = "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'" + private val ResetToEarliestDoc = "Reset offsets to earliest offset." + private val ResetToLatestDoc = "Reset offsets to latest offset." + private val ResetToCurrentDoc = "Reset offsets to current offset." + private val ResetShiftByDoc = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative." + private val MembersDoc = "Describe members of the group. This option may be used with '--describe' and '--bootstrap-server' options only." + nl + "Example: --bootstrap-server localhost:9092 --describe --group group1 --members" - val VerboseDoc = "Provide additional information, if any, when describing the group. This option may be used " + + private val VerboseDoc = "Provide additional information, if any, when describing the group. This option may be used " + "with '--offsets'/'--members'/'--state' and '--bootstrap-server' options only." + nl + "Example: --bootstrap-server localhost:9092 --describe --group group1 --members --verbose" - val OffsetsDoc = "Describe the group and list all topic partitions in the group along with their offset lag. " + + private val OffsetsDoc = "Describe the group and list all topic partitions in the group along with their offset lag. " + "This is the default sub-action of and may be used with '--describe' and '--bootstrap-server' options only." + nl + "Example: --bootstrap-server localhost:9092 --describe --group group1 --offsets" - val StateDoc = "When specified with '--describe', includes the state of the group." + nl + + private val StateDoc = "When specified with '--describe', includes the state of the group." + nl + "Example: --bootstrap-server localhost:9092 --describe --group group1 --state" + nl + "When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states." + nl + "Example: --bootstrap-server localhost:9092 --list --state stable,empty" + nl + "This option may be used with '--describe', '--list' and '--bootstrap-server' options only." - val DeleteOffsetsDoc = "Delete offsets of consumer group. Supports one consumer group at the time, and multiple topics." + private val DeleteOffsetsDoc = "Delete offsets of consumer group. Supports one consumer group at the time, and multiple topics." - val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc) + val bootstrapServerOpt: OptionSpec[String] = parser.accepts("bootstrap-server", BootstrapServerDoc) .withRequiredArg .describedAs("server to connect to") .ofType(classOf[String]) - val groupOpt = parser.accepts("group", GroupDoc) + val groupOpt: OptionSpec[String] = parser.accepts("group", GroupDoc) .withRequiredArg .describedAs("consumer group") .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", TopicDoc) + val topicOpt: OptionSpec[String] = parser.accepts("topic", TopicDoc) .withRequiredArg .describedAs("topic") .ofType(classOf[String]) - val allTopicsOpt = parser.accepts("all-topics", AllTopicsDoc) - val listOpt = parser.accepts("list", ListDoc) - val describeOpt = parser.accepts("describe", DescribeDoc) - val allGroupsOpt = parser.accepts("all-groups", AllGroupsDoc) - val deleteOpt = parser.accepts("delete", DeleteDoc) - val timeoutMsOpt = parser.accepts("timeout", TimeoutMsDoc) + val allTopicsOpt: OptionSpecBuilder = parser.accepts("all-topics", AllTopicsDoc) + val listOpt: OptionSpecBuilder = parser.accepts("list", ListDoc) + val describeOpt: OptionSpecBuilder = parser.accepts("describe", DescribeDoc) + val allGroupsOpt: OptionSpecBuilder = parser.accepts("all-groups", AllGroupsDoc) + val deleteOpt: OptionSpecBuilder = parser.accepts("delete", DeleteDoc) + val timeoutMsOpt: OptionSpec[Long] = parser.accepts("timeout", TimeoutMsDoc) .withRequiredArg .describedAs("timeout (ms)") .ofType(classOf[Long]) .defaultsTo(5000) - val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc) + val commandConfigOpt: OptionSpec[String] = parser.accepts("command-config", CommandConfigDoc) .withRequiredArg .describedAs("command config property file") .ofType(classOf[String]) - val resetOffsetsOpt = parser.accepts("reset-offsets", ResetOffsetsDoc) - val deleteOffsetsOpt = parser.accepts("delete-offsets", DeleteOffsetsDoc) - val dryRunOpt = parser.accepts("dry-run", DryRunDoc) - val executeOpt = parser.accepts("execute", ExecuteDoc) - val exportOpt = parser.accepts("export", ExportDoc) - val resetToOffsetOpt = parser.accepts("to-offset", ResetToOffsetDoc) + val resetOffsetsOpt: OptionSpecBuilder = parser.accepts("reset-offsets", ResetOffsetsDoc) + val deleteOffsetsOpt: OptionSpecBuilder = parser.accepts("delete-offsets", DeleteOffsetsDoc) + val dryRunOpt: OptionSpecBuilder = parser.accepts("dry-run", DryRunDoc) + val executeOpt: OptionSpecBuilder = parser.accepts("execute", ExecuteDoc) + val exportOpt: OptionSpecBuilder = parser.accepts("export", ExportDoc) + val resetToOffsetOpt: OptionSpec[Long] = parser.accepts("to-offset", ResetToOffsetDoc) .withRequiredArg() .describedAs("offset") .ofType(classOf[Long]) - val resetFromFileOpt = parser.accepts("from-file", ResetFromFileDoc) + val resetFromFileOpt: OptionSpec[String] = parser.accepts("from-file", ResetFromFileDoc) .withRequiredArg() .describedAs("path to CSV file") .ofType(classOf[String]) - val resetToDatetimeOpt = parser.accepts("to-datetime", ResetToDatetimeDoc) + val resetToDatetimeOpt: OptionSpec[String] = parser.accepts("to-datetime", ResetToDatetimeDoc) .withRequiredArg() .describedAs("datetime") .ofType(classOf[String]) - val resetByDurationOpt = parser.accepts("by-duration", ResetByDurationDoc) + val resetByDurationOpt: OptionSpec[String] = parser.accepts("by-duration", ResetByDurationDoc) .withRequiredArg() .describedAs("duration") .ofType(classOf[String]) - val resetToEarliestOpt = parser.accepts("to-earliest", ResetToEarliestDoc) - val resetToLatestOpt = parser.accepts("to-latest", ResetToLatestDoc) - val resetToCurrentOpt = parser.accepts("to-current", ResetToCurrentDoc) - val resetShiftByOpt = parser.accepts("shift-by", ResetShiftByDoc) + val resetToEarliestOpt: OptionSpecBuilder = parser.accepts("to-earliest", ResetToEarliestDoc) + val resetToLatestOpt: OptionSpecBuilder = parser.accepts("to-latest", ResetToLatestDoc) + val resetToCurrentOpt: OptionSpecBuilder = parser.accepts("to-current", ResetToCurrentDoc) + val resetShiftByOpt: OptionSpec[Long] = parser.accepts("shift-by", ResetShiftByDoc) .withRequiredArg() .describedAs("number-of-offsets") .ofType(classOf[Long]) - val membersOpt = parser.accepts("members", MembersDoc) + val membersOpt: OptionSpecBuilder = parser.accepts("members", MembersDoc) .availableIf(describeOpt) - val verboseOpt = parser.accepts("verbose", VerboseDoc) + val verboseOpt: OptionSpecBuilder = parser.accepts("verbose", VerboseDoc) .availableIf(describeOpt) - val offsetsOpt = parser.accepts("offsets", OffsetsDoc) + val offsetsOpt: OptionSpecBuilder = parser.accepts("offsets", OffsetsDoc) .availableIf(describeOpt) - val stateOpt = parser.accepts("state", StateDoc) + val stateOpt: OptionSpec[String] = parser.accepts("state", StateDoc) .availableIf(describeOpt, listOpt) .withOptionalArg() .ofType(classOf[String]) options = parser.parse(args : _*) - val allGroupSelectionScopeOpts = immutable.Set[OptionSpec[_]](groupOpt, allGroupsOpt) - val allConsumerGroupLevelOpts = immutable.Set[OptionSpec[_]](listOpt, describeOpt, deleteOpt, resetOffsetsOpt) - val allResetOffsetScenarioOpts = immutable.Set[OptionSpec[_]](resetToOffsetOpt, resetShiftByOpt, + private val allGroupSelectionScopeOpts = immutable.Set[OptionSpec[_]](groupOpt, allGroupsOpt) + private val allConsumerGroupLevelOpts = immutable.Set[OptionSpec[_]](listOpt, describeOpt, deleteOpt, resetOffsetsOpt) + val allResetOffsetScenarioOpts: Set[OptionSpec[_]] = immutable.Set[OptionSpec[_]](resetToOffsetOpt, resetShiftByOpt, resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt) - val allDeleteOffsetsOpts = immutable.Set[OptionSpec[_]](groupOpt, topicOpt) + private val allDeleteOffsetsOpts = immutable.Set[OptionSpec[_]](groupOpt, topicOpt) def checkArgs(): Unit = { diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index 7bc7339d548d5..9c81bb23e2e2b 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -17,7 +17,7 @@ package kafka.admin -import joptsimple.{ArgumentAcceptingOptionSpec, OptionSet} +import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder} import kafka.server.KafkaConfig import kafka.utils.{Exit, Logging, ToolsUtils} import kafka.utils.Implicits._ @@ -32,8 +32,8 @@ import org.apache.zookeeper.client.ZKClientConfig import org.apache.zookeeper.data.Stat import scala.annotation.tailrec +import scala.collection.mutable import scala.jdk.CollectionConverters._ -import scala.collection.mutable.Queue import scala.concurrent._ import scala.concurrent.duration._ @@ -62,10 +62,10 @@ import scala.concurrent.duration._ */ object ZkSecurityMigrator extends Logging { - val usageMessage = ("ZooKeeper Migration Tool Help. This tool updates the ACLs of " + private val usageMessage = ("ZooKeeper Migration Tool Help. This tool updates the ACLs of " + "znodes as part of the process of setting up ZooKeeper " + "authentication.") - val tlsConfigFileOption = "zk-tls-config-file" + private val tlsConfigFileOption = "zk-tls-config-file" def run(args: Array[String]): Unit = { val jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) @@ -86,7 +86,7 @@ object ZkSecurityMigrator extends Logging { throw new IllegalArgumentException("Incorrect configuration") } - if (!tlsClientAuthEnabled && !JaasUtils.isZkSaslEnabled()) { + if (!tlsClientAuthEnabled && !JaasUtils.isZkSaslEnabled) { val errorMsg = "Security isn't enabled, most likely the file isn't set properly: %s".format(jaasFile) System.out.println("ERROR: %s".format(errorMsg)) throw new IllegalArgumentException("Incorrect configuration") @@ -116,11 +116,10 @@ object ZkSecurityMigrator extends Logging { try { run(args) } catch { - case e: Exception => { + case e: Exception => e.printStackTrace() // must exit with non-zero status so system tests will know we failed Exit.exit(1) - } } } @@ -137,25 +136,25 @@ object ZkSecurityMigrator extends Logging { zkClientConfig } - private[admin] def createZkClientConfigFromOption(options: OptionSet, option: ArgumentAcceptingOptionSpec[String]) : Option[ZKClientConfig] = + private[admin] def createZkClientConfigFromOption(options: OptionSet, option: OptionSpec[String]) : Option[ZKClientConfig] = if (!options.has(option)) None else Some(createZkClientConfigFromFile(options.valueOf(option))) - class ZkSecurityMigratorOptions(args: Array[String]) extends CommandDefaultOptions(args) { - val zkAclOpt = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka znodes in ZooKeeper secure or unsecure." + private class ZkSecurityMigratorOptions(args: Array[String]) extends CommandDefaultOptions(args) { + val zkAclOpt: OptionSpec[String] = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka znodes in ZooKeeper secure or unsecure." + " The options are 'secure' and 'unsecure'").withRequiredArg().ofType(classOf[String]) - val zkUrlOpt = parser.accepts("zookeeper.connect", "Sets the ZooKeeper connect string (ensemble). This parameter " + + val zkUrlOpt: OptionSpec[String] = parser.accepts("zookeeper.connect", "Sets the ZooKeeper connect string (ensemble). This parameter " + "takes a comma-separated list of host:port pairs.").withRequiredArg().defaultsTo("localhost:2181"). ofType(classOf[String]) - val zkSessionTimeoutOpt = parser.accepts("zookeeper.session.timeout", "Sets the ZooKeeper session timeout."). + val zkSessionTimeoutOpt: OptionSpec[Integer] = parser.accepts("zookeeper.session.timeout", "Sets the ZooKeeper session timeout."). withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000) - val zkConnectionTimeoutOpt = parser.accepts("zookeeper.connection.timeout", "Sets the ZooKeeper connection timeout."). + val zkConnectionTimeoutOpt: OptionSpec[Integer] = parser.accepts("zookeeper.connection.timeout", "Sets the ZooKeeper connection timeout."). withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000) - val enablePathCheckOpt = parser.accepts("enable.path.check", "Checks if all the root paths exist in ZooKeeper " + + val enablePathCheckOpt: OptionSpecBuilder = parser.accepts("enable.path.check", "Checks if all the root paths exist in ZooKeeper " + "before migration. If not, exit the command.") - val zkTlsConfigFile = parser.accepts(tlsConfigFileOption, + val zkTlsConfigFile: OptionSpec[String] = parser.accepts(tlsConfigFileOption, "Identifies the file where ZooKeeper client TLS connectivity properties are defined. Any properties other than " + KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.mkString(", ") + " are ignored.") .withRequiredArg().describedAs("ZooKeeper TLS configuration").ofType(classOf[String]) @@ -165,14 +164,14 @@ object ZkSecurityMigrator extends Logging { class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging { private val zkSecurityMigratorUtils = new ZkSecurityMigratorUtils(zkClient) - private val futures = new Queue[Future[String]] + private val futures = new mutable.Queue[Future[String]] private def setAcl(path: String, setPromise: Promise[String]): Unit = { info("Setting ACL for path %s".format(path)) zkSecurityMigratorUtils.currentZooKeeper.setACL(path, zkClient.defaultAcls(path).asJava, -1, SetACLCallback, setPromise) } - private def getChildren(path: String, childrenPromise: Promise[String]): Unit = { + private def retrieveChildren(path: String, childrenPromise: Promise[String]): Unit = { info("Getting children to set ACLs for path %s".format(path)) zkSecurityMigratorUtils.currentZooKeeper.getChildren(path, false, GetChildrenCallback, childrenPromise) } @@ -193,7 +192,7 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging { futures += childrenPromise.future } setAcl(path, setPromise) - getChildren(path, childrenPromise) + retrieveChildren(path, childrenPromise) } private object GetChildrenCallback extends ChildrenCallback { diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala index 3b41802ef416b..ec0ac2e26eee9 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala @@ -66,7 +66,7 @@ case class LeaderAndIsr( LeaderAndIsr(leader, leaderEpoch + 1, isr, leaderRecoveryState, partitionEpoch) } - def newLeaderAndIsrWithBrokerEpoch(leader: Int, isrWithBrokerEpoch: List[BrokerState]): LeaderAndIsr = { + private def newLeaderAndIsrWithBrokerEpoch(leader: Int, isrWithBrokerEpoch: List[BrokerState]): LeaderAndIsr = { LeaderAndIsr(leader, leaderEpoch + 1, leaderRecoveryState, isrWithBrokerEpoch, partitionEpoch) } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 094e3cda41914..718717f652f74 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -207,8 +207,8 @@ case class PendingExpandIsr( sentLeaderAndIsr: LeaderAndIsr, lastCommittedState: CommittedPartitionState ) extends PendingPartitionChange { - val isr = lastCommittedState.isr - val maximalIsr = isr + newInSyncReplicaId + val isr: Set[Int] = lastCommittedState.isr + val maximalIsr: Set[Int] = isr + newInSyncReplicaId val isInflight = true def notifyListener(alterPartitionListener: AlterPartitionListener): Unit = { @@ -229,8 +229,8 @@ case class PendingShrinkIsr( sentLeaderAndIsr: LeaderAndIsr, lastCommittedState: CommittedPartitionState ) extends PendingPartitionChange { - val isr = lastCommittedState.isr - val maximalIsr = isr + val isr: Set[Int] = lastCommittedState.isr + val maximalIsr: Set[Int] = isr val isInflight = true def notifyListener(alterPartitionListener: AlterPartitionListener): Unit = { @@ -250,7 +250,7 @@ case class CommittedPartitionState( isr: Set[Int], leaderRecoveryState: LeaderRecoveryState ) extends PartitionState { - val maximalIsr = isr + val maximalIsr: Set[Int] = isr val isInflight = false override def toString: String = { @@ -1802,7 +1802,7 @@ class Partition(val topicPartition: TopicPartition, } else { val replica = remoteReplicasMap.get(brokerId) val brokerEpoch = if (replica == null) Option.empty else replica.stateSnapshot.brokerEpoch - if (!brokerEpoch.isDefined) { + if (brokerEpoch.isEmpty) { // There are two cases where the broker epoch can be missing: // 1. During ISR expansion, we already held lock for the partition and did the broker epoch check, so the new // ISR replica should have a valid broker epoch. Then, the missing broker epoch can only happen to the diff --git a/core/src/main/scala/kafka/common/ClientIdAndBroker.scala b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala index 3b09041d33ac4..c9c5c67e06ea7 100644 --- a/core/src/main/scala/kafka/common/ClientIdAndBroker.scala +++ b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala @@ -26,9 +26,5 @@ trait ClientIdBroker { } case class ClientIdAndBroker(clientId: String, brokerHost: String, brokerPort: Int) extends ClientIdBroker { - override def toString = "%s-%s-%d".format(clientId, brokerHost, brokerPort) -} - -case class ClientIdAllBrokers(clientId: String) extends ClientIdBroker { - override def toString = "%s-%s".format(clientId, "AllBrokers") + override def toString: String = "%s-%s-%d".format(clientId, brokerHost, brokerPort) } diff --git a/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala b/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala index 5ea66322a92c7..dfded33f009e4 100644 --- a/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala +++ b/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala @@ -20,5 +20,5 @@ package kafka.common /** * Thrown when a log cleaning task is requested to be aborted. */ -class LogCleaningAbortedException() extends RuntimeException() { +class LogCleaningAbortedException extends RuntimeException() { } diff --git a/core/src/main/scala/kafka/common/ThreadShutdownException.scala b/core/src/main/scala/kafka/common/ThreadShutdownException.scala index 6554a5ee4debb..8cd6601ce5aa9 100644 --- a/core/src/main/scala/kafka/common/ThreadShutdownException.scala +++ b/core/src/main/scala/kafka/common/ThreadShutdownException.scala @@ -20,5 +20,5 @@ package kafka.common /** * An exception that indicates a thread is being shut down normally. */ -class ThreadShutdownException() extends RuntimeException { +class ThreadShutdownException extends RuntimeException { } diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala index 2dc94ce94f1a3..1f7da18d9ac87 100644 --- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala +++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala @@ -68,7 +68,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient, thread.start() } - def close() = { + def close(): Unit = { isClosed.set(true) zkClient.unregisterStateChangeHandler(ZkStateChangeHandler.name) zkClient.unregisterZNodeChildChangeHandler(ChangeNotificationHandler.path) @@ -117,7 +117,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient, queue.put(new ChangeNotification) } - class ChangeNotification { + private class ChangeNotification { def process(): Unit = processNotifications() } @@ -143,11 +143,11 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient, /* get the change number from a change notification znode */ private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong - class ChangeEventProcessThread(name: String) extends ShutdownableThread(name) { + private class ChangeEventProcessThread(name: String) extends ShutdownableThread(name) { override def doWork(): Unit = queue.take().process() } - object ChangeNotificationHandler extends ZNodeChildChangeHandler { + private object ChangeNotificationHandler extends ZNodeChildChangeHandler { override val path: String = seqNodeRoot override def handleChildChange(): Unit = addChangeNotification() } diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 18e211f62bd54..67d3f3963b943 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -45,8 +45,8 @@ import scala.collection.{Seq, Set, mutable} import scala.jdk.CollectionConverters._ object ControllerChannelManager { - val QueueSizeMetricName = "QueueSize" - val RequestRateAndQueueTimeMetricName = "RequestRateAndQueueTimeMs" + private val QueueSizeMetricName = "QueueSize" + private val RequestRateAndQueueTimeMetricName = "RequestRateAndQueueTimeMs" } class ControllerChannelManager(controllerEpoch: () => Int, @@ -207,7 +207,7 @@ class ControllerChannelManager(controllerEpoch: () => Int, } } - protected def startRequestSendThread(brokerId: Int): Unit = { + private def startRequestSendThread(brokerId: Int): Unit = { val requestThread = brokerStateInfo(brokerId).requestSendThread if (requestThread.getState == Thread.State.NEW) requestThread.start() @@ -332,7 +332,7 @@ class ControllerBrokerRequestBatch( stateChangeLogger ) { - def sendEvent(event: ControllerEvent): Unit = { + private def sendEvent(event: ControllerEvent): Unit = { controllerEventManager.put(event) } def sendRequest(brokerId: Int, @@ -373,10 +373,10 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, stateChangeLogger: StateChangeLogger, kraftController: Boolean = false) extends Logging { val controllerId: Int = config.brokerId - val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, LeaderAndIsrPartitionState]] - val stopReplicaRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, StopReplicaPartitionState]] - val updateMetadataRequestBrokerSet = mutable.Set.empty[Int] - val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, UpdateMetadataPartitionState] + private val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, LeaderAndIsrPartitionState]] + private val stopReplicaRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, StopReplicaPartitionState]] + private val updateMetadataRequestBrokerSet = mutable.Set.empty[Int] + private val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, UpdateMetadataPartitionState] private var updateType: AbstractControlRequest.Type = AbstractControlRequest.Type.UNKNOWN private var metadataInstance: ControllerChannelContext = _ diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala index 8c99a40c41f65..d290658d6dcb8 100644 --- a/core/src/main/scala/kafka/controller/ControllerContext.scala +++ b/core/src/main/scala/kafka/controller/ControllerContext.scala @@ -450,10 +450,6 @@ class ControllerContext extends ControllerChannelContext { Some(replicaAssignment), Some(leaderIsrAndControllerEpoch)) } - def partitionLeaderAndIsr(partition: TopicPartition): Option[LeaderAndIsr] = { - partitionLeadershipInfo.get(partition).map(_.leaderAndIsr) - } - def leaderEpoch(partition: TopicPartition): Int = { // A sentinel (-2) is used as an epoch if the topic is queued for deletion. It overrides // any existing epoch. diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala index a1b4835ea970e..9a5a6fe1abcf2 100644 --- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala @@ -33,8 +33,8 @@ import scala.collection._ object ControllerEventManager { val ControllerEventThreadName = "controller-event-thread" - val EventQueueTimeMetricName = "EventQueueTimeMs" - val EventQueueSizeMetricName = "EventQueueSize" + private val EventQueueTimeMetricName = "EventQueueTimeMs" + private val EventQueueSizeMetricName = "EventQueueSize" } trait ControllerEventProcessor { @@ -44,8 +44,8 @@ trait ControllerEventProcessor { class QueuedEvent(val event: ControllerEvent, val enqueueTimeMs: Long) { - val processingStarted = new CountDownLatch(1) - val spent = new AtomicBoolean(false) + private val processingStarted = new CountDownLatch(1) + private val spent = new AtomicBoolean(false) def process(processor: ControllerEventProcessor): Unit = { if (spent.getAndSet(true)) diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala index f84240536dd85..0474f6362db43 100644 --- a/core/src/main/scala/kafka/controller/ControllerState.scala +++ b/core/src/main/scala/kafka/controller/ControllerState.scala @@ -47,7 +47,7 @@ object ControllerState { def value = 2 // The LeaderElectionRateAndTimeMs metric existed before `ControllerState` was introduced and we keep the name // for backwards compatibility. The alternative would be to have the same metric under two different names. - override def rateAndTimeMetricName = Some("LeaderElectionRateAndTimeMs") + override def rateAndTimeMetricName: Option[String] = Some("LeaderElectionRateAndTimeMs") } case object TopicChange extends ControllerState { diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index b989837f8912a..5b893c8e85175 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -16,7 +16,7 @@ */ package kafka.controller -import com.yammer.metrics.core.Timer +import com.yammer.metrics.core.{Meter, Timer} import java.util.concurrent.TimeUnit import kafka.api._ @@ -57,9 +57,9 @@ import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} sealed trait ElectionTrigger -final case object AutoTriggered extends ElectionTrigger -final case object ZkTriggered extends ElectionTrigger -final case object AdminClientTriggered extends ElectionTrigger +case object AutoTriggered extends ElectionTrigger +case object ZkTriggered extends ElectionTrigger +case object AdminClientTriggered extends ElectionTrigger object KafkaController extends Logging { val InitialControllerEpoch = 0 @@ -204,7 +204,7 @@ class KafkaController(val config: KafkaConfig, * is the controller. It merely registers the session expiration listener and starts the controller leader * elector */ - def startup() = { + def startup(): Unit = { zkClient.registerStateChangeHandler(new StateChangeHandler { override val name: String = StateChangeHandlers.ControllerHandler override def afterInitializingSession(): Unit = { @@ -2540,7 +2540,7 @@ class KafkaController(val config: KafkaConfig, allocateProducerIdsRequest.brokerEpoch, eventManagerCallback)) } - def processAllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Either[Errors, ProducerIdsBlock] => Unit): Unit = { + private def processAllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Either[Errors, ProducerIdsBlock] => Unit): Unit = { // Handle a few short-circuits if (!isActive) { callback.apply(Left(Errors.NOT_CONTROLLER)) @@ -2779,7 +2779,7 @@ case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpo private[controller] class ControllerStats { private val metricsGroup = new KafkaMetricsGroup(this.getClass) - val uncleanLeaderElectionRate = metricsGroup.newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) + val uncleanLeaderElectionRate: Meter = metricsGroup.newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) val rateAndTimeMetrics: Map[ControllerState, Timer] = ControllerState.values.flatMap { state => state.rateAndTimeMetricName.map { metricName => diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 71b163a2e2243..5dedad426406b 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -557,9 +557,9 @@ object PartitionLeaderElectionAlgorithms { sealed trait PartitionLeaderElectionStrategy final case class OfflinePartitionLeaderElectionStrategy(allowUnclean: Boolean) extends PartitionLeaderElectionStrategy -final case object ReassignPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy -final case object PreferredReplicaPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy -final case object ControlledShutdownPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy +case object ReassignPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy +case object PreferredReplicaPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy +case object ControlledShutdownPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy sealed trait PartitionState { def state: Byte diff --git a/core/src/main/scala/kafka/controller/StateChangeLogger.scala b/core/src/main/scala/kafka/controller/StateChangeLogger.scala index a1d1bb240b1ec..1292e3aa5e540 100644 --- a/core/src/main/scala/kafka/controller/StateChangeLogger.scala +++ b/core/src/main/scala/kafka/controller/StateChangeLogger.scala @@ -34,7 +34,7 @@ class StateChangeLogger(brokerId: Int, inControllerContext: Boolean, controllerE if (controllerEpoch.isDefined && !inControllerContext) throw new IllegalArgumentException("Controller epoch should only be defined if inControllerContext is true") - override lazy val logger = StateChangeLogger.logger + override lazy val logger: Logger = StateChangeLogger.logger locally { val prefix = if (inControllerContext) "Controller" else "Broker" diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index a56a701c1ca83..2603b2e3ce305 100755 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -82,7 +82,6 @@ class ControllerDeletionClient(controller: KafkaController, zkClient: KafkaZkCli * as well as from zookeeper. This is the only time the /brokers/topics/ path gets deleted. On the other hand, * if no replica is in TopicDeletionStarted state and at least one replica is in TopicDeletionFailed state, then * it marks the topic for deletion retry. - * @param controller */ class TopicDeletionManager(config: KafkaConfig, controllerContext: ControllerContext, @@ -251,7 +250,7 @@ class TopicDeletionManager(config: KafkaConfig, * Invoked with the list of topics to be deleted * It invokes onPartitionDeletion for all partitions of a topic. * The updateMetadataRequest is also going to set the leader for the topics being deleted to - * {@link LeaderAndIsr#LeaderDuringDelete}. This lets each broker know that this topic is being deleted and can be + * [[kafka.api.LeaderAndIsr#LeaderDuringDelete]]. This lets each broker know that this topic is being deleted and can be * removed from their caches. */ private def onTopicDeletion(topics: Set[String]): Unit = { diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 33ea53d6d25f9..217693ee08065 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -420,18 +420,6 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState } } - /** - * Verify the member.id is up to date for static members. Return true if both conditions met: - * 1. given member is a known static member to group - * 2. group stored member.id doesn't match with given member.id - */ - def isStaticMemberFenced( - groupInstanceId: String, - memberId: String - ): Boolean = { - currentStaticMemberId(groupInstanceId).exists(_ != memberId) - } - def canRebalance: Boolean = PreparingRebalance.validPreviousStates.contains(state) def transitionTo(groupState: GroupState): Unit = { diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala index c94fcf7be4184..6ad6273977052 100644 --- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala @@ -29,7 +29,7 @@ case class MemberSummary(memberId: String, assignment: Array[Byte]) private object MemberMetadata { - def plainProtocolSet(supportedProtocols: List[(String, Array[Byte])]) = supportedProtocols.map(_._1).toSet + def plainProtocolSet(supportedProtocols: List[(String, Array[Byte])]): Set[String] = supportedProtocols.map(_._1).toSet } /** diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala index 06e358d7b6cba..cdcf5389800fb 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala @@ -42,10 +42,10 @@ import scala.util.{Failure, Success, Try} object ProducerIdManager { // Once we reach this percentage of PIDs consumed from the current block, trigger a fetch of the next block - val PidPrefetchThreshold = 0.90 - val IterationLimit = 3 - val RetryBackoffMs = 50 - val NoRetry = -1L + val PidPrefetchThreshold: Double = 0.90 + val IterationLimit: Int = 3 + val RetryBackoffMs: Int = 50 + val NoRetry: Long = -1L // Creates a ProducerIdGenerate that directly interfaces with ZooKeeper, IBP < 3.0-IV0 def zk(brokerId: Int, zkClient: KafkaZkClient): ZkProducerIdManager = { diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 7eda8f3b1f2fc..74366317b40ea 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -94,16 +94,16 @@ class TransactionCoordinator(txnConfig: TransactionConfig, import TransactionCoordinator._ - type InitProducerIdCallback = InitProducerIdResult => Unit - type AddPartitionsCallback = Errors => Unit - type VerifyPartitionsCallback = AddPartitionsToTxnResult => Unit - type EndTxnCallback = Errors => Unit - type ApiResult[T] = Either[Errors, T] + private type InitProducerIdCallback = InitProducerIdResult => Unit + private type AddPartitionsCallback = Errors => Unit + private type VerifyPartitionsCallback = AddPartitionsToTxnResult => Unit + private type EndTxnCallback = Errors => Unit + private type ApiResult[T] = Either[Errors, T] /* Active flag of the coordinator */ private val isActive = new AtomicBoolean(false) - val producerIdManager = createProducerIdManager() + val producerIdManager: ProducerIdManager = createProducerIdManager() def handleInitProducerId(transactionalId: String, transactionTimeoutMs: Int, diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index e770d38e712b3..5a72554c74d60 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -159,7 +159,7 @@ class TransactionMarkerChannelManager( private val transactionsWithPendingMarkers = new ConcurrentHashMap[String, PendingCompleteTxn] - val writeTxnMarkersRequestVersion: Short = + private val writeTxnMarkersRequestVersion: Short = if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 1 else 0 @@ -176,7 +176,7 @@ class TransactionMarkerChannelManager( } private def removeMetrics(): Unit = { - MetricNames.foreach(metricsGroup.removeMetric(_)) + MetricNames.foreach(metricsGroup.removeMetric) } // visible for testing @@ -200,7 +200,7 @@ class TransactionMarkerChannelManager( trace(s"Added marker ${txnIdAndMarker.txnMarkerEntry} for transactional id ${txnIdAndMarker.txnId} to destination broker $brokerId") } - def retryLogAppends(): Unit = { + private def retryLogAppends(): Unit = { val txnLogAppendRetries: util.List[PendingCompleteTxn] = new util.ArrayList[PendingCompleteTxn]() txnLogAppendRetryQueue.drainTo(txnLogAppendRetries) txnLogAppendRetries.forEach { txnLogAppend => diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala index dc208081d109b..9fab77f30b6b7 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala @@ -27,7 +27,7 @@ import scala.collection.{immutable, mutable} object TransactionState { - val AllStates = Set( + val AllStates: Set[TransactionState] = Set( Empty, Ongoing, PrepareCommit, diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index 44c7de5df8656..ed8927a25567a 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -71,7 +71,7 @@ class TransactionStateManager(brokerId: Int, this.logIdent = "[Transaction State Manager " + brokerId + "]: " - type SendTxnMarkersCallback = (Int, TransactionResult, TransactionMetadata, TxnTransitMetadata) => Unit + private type SendTxnMarkersCallback = (Int, TransactionResult, TransactionMetadata, TxnTransitMetadata) => Unit /** shutting down flag */ private val shuttingDown = new AtomicBoolean(false) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index de0edcd8ca5ff..0422eee767bb8 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -465,7 +465,7 @@ class LogCleaner(initialConfig: CleanerConfig, * @param to The cleaned offset that is the first not cleaned offset to end * @param stats The statistics for this round of cleaning */ - def recordStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats): Unit = { + private def recordStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats): Unit = { this.lastStats = stats def mb(bytes: Double) = bytes / (1024*1024) val message = @@ -929,7 +929,7 @@ private[log] class Cleaner(val id: Int, * * @param maxLogMessageSize The maximum record size in bytes allowed */ - def growBuffers(maxLogMessageSize: Int): Unit = { + private def growBuffers(maxLogMessageSize: Int): Unit = { val maxBufferSize = math.max(maxLogMessageSize, maxIoBufferSize) if(readBuffer.capacity >= maxBufferSize || writeBuffer.capacity >= maxBufferSize) throw new IllegalStateException("This log contains a message larger than maximum allowable size of %s.".format(maxBufferSize)) @@ -942,7 +942,7 @@ private[log] class Cleaner(val id: Int, /** * Restore the I/O buffer capacity to its original size */ - def restoreBuffers(): Unit = { + private def restoreBuffers(): Unit = { if(this.readBuffer.capacity > this.ioBufferSize) this.readBuffer = ByteBuffer.allocate(this.ioBufferSize) if(this.writeBuffer.capacity > this.ioBufferSize) @@ -1139,7 +1139,7 @@ private[log] class Cleaner(val id: Int, /** * A simple struct for collecting pre-clean stats */ -private class PreCleanStats() { +private class PreCleanStats { var maxCompactionDelayMs = 0L var delayedPartitions = 0 var cleanablePartitions = 0 @@ -1160,8 +1160,8 @@ private class PreCleanStats() { */ private class CleanerStats(time: Time = Time.SYSTEM) { val startTime = time.milliseconds - var mapCompleteTime = -1L - var endTime = -1L + var mapCompleteTime: Long = -1L + var endTime: Long = -1L var bytesRead = 0L var bytesWritten = 0L var mapBytesRead = 0L diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 1ba402e558fb0..fa67f9c321ff6 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -585,7 +585,7 @@ private[log] object LogCleanerManager extends Logging { TimeSinceLastRunMsMetricName ) - def isCompactAndDelete(log: UnifiedLog): Boolean = { + private def isCompactAndDelete(log: UnifiedLog): Boolean = { log.config.compact && log.config.delete } @@ -593,7 +593,7 @@ private[log] object LogCleanerManager extends Logging { * get max delay between the time when log is required to be compacted as determined * by maxCompactionLagMs and the current time. */ - def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : Long = { + private def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : Long = { val dirtyNonActiveSegments = log.nonActiveLogSegmentsFrom(firstDirtyOffset) val firstBatchTimestamps = log.getFirstBatchTimestampForSegments(dirtyNonActiveSegments).stream.filter(_ > 0) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 747807afb1d0f..deb77c62d7582 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -292,7 +292,7 @@ class LogManager(logDirs: Seq[File], result += (logDir.getAbsolutePath -> directoryId) }) } catch { - case e: NoSuchFileException => + case _: NoSuchFileException => info(s"No meta.properties file found in $logDir.") case e: IOException => logDirFailureChannel.maybeAddOfflineLogDir(logDir.getAbsolutePath, s"Disk error while loading ID $logDir", e) @@ -369,7 +369,7 @@ class LogManager(logDirs: Seq[File], } // factory class for naming the log recovery threads used in metrics - class LogRecoveryThreadFactory(val dirPath: String) extends ThreadFactory { + private class LogRecoveryThreadFactory(val dirPath: String) extends ThreadFactory { val threadNum = new AtomicInteger(0) override def newThread(runnable: Runnable): Thread = { @@ -1325,7 +1325,7 @@ class LogManager(logDirs: Seq[File], * Delete any eligible logs. Return the number of segments deleted. * Only consider logs that are not compacted. */ - def cleanupLogs(): Unit = { + private def cleanupLogs(): Unit = { debug("Beginning log cleanup...") var total = 0 val startMs = time.milliseconds diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index bba208698e7f7..c0bb9d8cd66c6 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -46,7 +46,7 @@ import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMe import java.io.{File, IOException} import java.nio.file.{Files, Path} import java.util -import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} +import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, ScheduledFuture} import java.util.stream.Collectors import java.util.{Collections, Optional, OptionalInt, OptionalLong} import scala.annotation.nowarn @@ -202,7 +202,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, * set _topicId and write to the partition metadata file. * - Otherwise set _topicId to None */ - def initializeTopicId(): Unit = { + private def initializeTopicId(): Unit = { val partMetadataFile = partitionMetadataFile.getOrElse( throw new KafkaException("The partitionMetadataFile should have been initialized")) @@ -466,7 +466,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } - val producerExpireCheck = scheduler.schedule("PeriodicProducerExpirationCheck", () => removeExpiredProducers(time.milliseconds), + val producerExpireCheck: ScheduledFuture[_] = scheduler.schedule("PeriodicProducerExpirationCheck", () => removeExpiredProducers(time.milliseconds), producerIdExpirationCheckIntervalMs, producerIdExpirationCheckIntervalMs) // Visible for testing @@ -613,9 +613,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Maybe create the VerificationStateEntry for the given producer ID -- always return the VerificationGuard */ - def maybeCreateVerificationGuard(producerId: Long, - sequence: Int, - epoch: Short): VerificationGuard = lock synchronized { + private def maybeCreateVerificationGuard(producerId: Long, + sequence: Int, + epoch: Short): VerificationGuard = lock synchronized { producerStateManager.maybeCreateVerificationStateEntry(producerId, sequence, epoch).verificationGuard } @@ -1923,28 +1923,28 @@ class UnifiedLog(@volatile var logStartOffset: Long, } object UnifiedLog extends Logging { - val LogFileSuffix = LogFileUtils.LOG_FILE_SUFFIX + val LogFileSuffix: String = LogFileUtils.LOG_FILE_SUFFIX - val IndexFileSuffix = LogFileUtils.INDEX_FILE_SUFFIX + val IndexFileSuffix: String = LogFileUtils.INDEX_FILE_SUFFIX - val TimeIndexFileSuffix = LogFileUtils.TIME_INDEX_FILE_SUFFIX + val TimeIndexFileSuffix: String = LogFileUtils.TIME_INDEX_FILE_SUFFIX - val TxnIndexFileSuffix = LogFileUtils.TXN_INDEX_FILE_SUFFIX + val TxnIndexFileSuffix: String = LogFileUtils.TXN_INDEX_FILE_SUFFIX - val CleanedFileSuffix = LocalLog.CleanedFileSuffix + val CleanedFileSuffix: String = LocalLog.CleanedFileSuffix - val SwapFileSuffix = LocalLog.SwapFileSuffix + val SwapFileSuffix: String = LocalLog.SwapFileSuffix - val DeleteDirSuffix = LocalLog.DeleteDirSuffix + val DeleteDirSuffix: String = LocalLog.DeleteDirSuffix - val StrayDirSuffix = LocalLog.StrayDirSuffix + val StrayDirSuffix: String = LocalLog.StrayDirSuffix - val FutureDirSuffix = LocalLog.FutureDirSuffix + val FutureDirSuffix: String = LocalLog.FutureDirSuffix private[log] val DeleteDirPattern = LocalLog.DeleteDirPattern private[log] val FutureDirPattern = LocalLog.FutureDirPattern - val UnknownOffset = LocalLog.UnknownOffset + val UnknownOffset: Long = LocalLog.UnknownOffset def isRemoteLogEnabled(remoteStorageSystemEnable: Boolean, config: LogConfig, diff --git a/core/src/main/scala/org/apache/zookeeper/ZooKeeperMainWithTlsSupportForKafka.scala b/core/src/main/scala/org/apache/zookeeper/ZooKeeperMainWithTlsSupportForKafka.scala index 93674ad39e240..cd748dc9f163b 100644 --- a/core/src/main/scala/org/apache/zookeeper/ZooKeeperMainWithTlsSupportForKafka.scala +++ b/core/src/main/scala/org/apache/zookeeper/ZooKeeperMainWithTlsSupportForKafka.scala @@ -26,7 +26,7 @@ import org.apache.zookeeper.client.ZKClientConfig import scala.jdk.CollectionConverters._ object ZooKeeperMainWithTlsSupportForKafka { - val zkTlsConfigFileOption = "-zk-tls-config-file" + private val zkTlsConfigFileOption = "-zk-tls-config-file" def main(args: Array[String]): Unit = { val zkTlsConfigFileIndex = args.indexOf(zkTlsConfigFileOption) val zooKeeperMain: ZooKeeperMain = @@ -42,7 +42,7 @@ object ZooKeeperMainWithTlsSupportForKafka { Some(ZkSecurityMigrator.createZkClientConfigFromFile(args(zkTlsConfigFileIndex + 1)))) // The run method of ZooKeeperMain is package-private, // therefore this code unfortunately must reside in the same org.apache.zookeeper package. - zooKeeperMain.run + zooKeeperMain.run() } } @@ -68,13 +68,13 @@ class ZooKeeperMainWithTlsSupportForKafka(args: Array[String], val zkClientConfi super.processZKCmd(co) } - def kafkaTlsUsage(): Unit = { + private def kafkaTlsUsage(): Unit = { System.err.println("ZooKeeper -server host:port [-zk-tls-config-file ] cmd args") ZooKeeperMain.commandMap.keySet.asScala.toList.sorted.foreach(cmd => System.err.println(s"\t$cmd ${ZooKeeperMain.commandMap.get(cmd)}")) } - override def connectToZK(newHost: String) = { + override def connectToZK(newHost: String): Unit = { // ZooKeeperAdmin has no constructor that supports passing in both readOnly and ZkClientConfig, // and readOnly ends up being set to false when passing in a ZkClientConfig instance; // therefore it is currently not possible for us to construct a ZooKeeperAdmin instance with