Skip to content

Commit

Permalink
MINOR: cleanup core modules part 1 (#15252)
Browse files Browse the repository at this point in the history
* MINOR: Clean up core api, cluster, common, log, admin, controller and coordinator classes

Mark methods and fields private where possible
Annotate public methods and fields
Remove unused classes and methods

Signed-off-by: Josep Prat <[email protected]>

Reviewers: Luke Chen <[email protected]>
  • Loading branch information
jlprat authored Jan 26, 2024
1 parent 44272ea commit 2a6e420
Show file tree
Hide file tree
Showing 31 changed files with 256 additions and 275 deletions.
58 changes: 29 additions & 29 deletions core/src/main/scala/kafka/admin/AclCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ import scala.io.StdIn

object AclCommand extends Logging {

val AuthorizerDeprecationMessage: String = "Warning: support for ACL configuration directly " +
private val AuthorizerDeprecationMessage: String = "Warning: support for ACL configuration directly " +
"through the authorizer is deprecated and will be removed in a future release. Please use " +
"--bootstrap-server instead to set ACLs through the admin client."
val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, PatternType.LITERAL)
private val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, PatternType.LITERAL)

private val Newline = scala.util.Properties.lineSeparator

Expand Down Expand Up @@ -89,7 +89,7 @@ object AclCommand extends Logging {
def listAcls(): Unit
}

class AdminClientService(val opts: AclCommandOptions) extends AclCommandService with Logging {
private class AdminClientService(val opts: AclCommandOptions) extends AclCommandService with Logging {

private def withAdminClient(opts: AclCommandOptions)(f: Admin => Unit): Unit = {
val props = if (opts.options.has(opts.commandConfigOpt))
Expand Down Expand Up @@ -487,69 +487,69 @@ object AclCommand extends Logging {
for ((resource, acls) <- resourceToAcls) {
val validOps = AclEntry.supportedOperations(resource.resourceType) + AclOperation.ALL
if ((acls.map(_.operation) -- validOps).nonEmpty)
CommandLineUtils.printUsageAndExit(opts.parser, s"ResourceType ${resource.resourceType} only supports operations ${validOps.map(JSecurityUtils.operationName(_)).mkString(", ")}")
CommandLineUtils.printUsageAndExit(opts.parser, s"ResourceType ${resource.resourceType} only supports operations ${validOps.map(JSecurityUtils.operationName).mkString(", ")}")
}
}

class AclCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."

val bootstrapServerOpt = parser.accepts("bootstrap-server", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." +
val bootstrapServerOpt: OptionSpec[String] = parser.accepts("bootstrap-server", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." +
" This list should be in the form host1:port1,host2:port2,... This config is required for acl management using admin client API.")
.withRequiredArg
.describedAs("server to connect to")
.ofType(classOf[String])

val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
val commandConfigOpt: OptionSpec[String] = parser.accepts("command-config", CommandConfigDoc)
.withOptionalArg()
.describedAs("command-config")
.ofType(classOf[String])

val authorizerOpt = parser.accepts("authorizer", "DEPRECATED: Fully qualified class name of " +
val authorizerOpt: OptionSpec[String] = parser.accepts("authorizer", "DEPRECATED: Fully qualified class name of " +
"the authorizer, which defaults to kafka.security.authorizer.AclAuthorizer if --bootstrap-server is not provided. " +
AclCommand.AuthorizerDeprecationMessage)
.withRequiredArg
.describedAs("authorizer")
.ofType(classOf[String])

val authorizerPropertiesOpt = parser.accepts("authorizer-properties", "DEPRECATED: The " +
val authorizerPropertiesOpt: OptionSpec[String] = parser.accepts("authorizer-properties", "DEPRECATED: The " +
"properties required to configure an instance of the Authorizer specified by --authorizer. " +
"These are key=val pairs. For the default authorizer, example values are: zookeeper.connect=localhost:2181. " +
AclCommand.AuthorizerDeprecationMessage)
.withRequiredArg
.describedAs("authorizer-properties")
.ofType(classOf[String])

val topicOpt = parser.accepts("topic", "topic to which ACLs should be added or removed. " +
val topicOpt: OptionSpec[String] = parser.accepts("topic", "topic to which ACLs should be added or removed. " +
"A value of '*' indicates ACL should apply to all topics.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])

val clusterOpt = parser.accepts("cluster", "Add/Remove cluster ACLs.")
val groupOpt = parser.accepts("group", "Consumer Group to which the ACLs should be added or removed. " +
val clusterOpt: OptionSpecBuilder = parser.accepts("cluster", "Add/Remove cluster ACLs.")
val groupOpt: OptionSpec[String] = parser.accepts("group", "Consumer Group to which the ACLs should be added or removed. " +
"A value of '*' indicates the ACLs should apply to all groups.")
.withRequiredArg
.describedAs("group")
.ofType(classOf[String])

val transactionalIdOpt = parser.accepts("transactional-id", "The transactionalId to which ACLs should " +
val transactionalIdOpt: OptionSpec[String] = parser.accepts("transactional-id", "The transactionalId to which ACLs should " +
"be added or removed. A value of '*' indicates the ACLs should apply to all transactionalIds.")
.withRequiredArg
.describedAs("transactional-id")
.ofType(classOf[String])

val idempotentOpt = parser.accepts("idempotent", "Enable idempotence for the producer. This should be " +
val idempotentOpt: OptionSpecBuilder = parser.accepts("idempotent", "Enable idempotence for the producer. This should be " +
"used in combination with the --producer option. Note that idempotence is enabled automatically if " +
"the producer is authorized to a particular transactional-id.")

val delegationTokenOpt = parser.accepts("delegation-token", "Delegation token to which ACLs should be added or removed. " +
val delegationTokenOpt: OptionSpec[String] = parser.accepts("delegation-token", "Delegation token to which ACLs should be added or removed. " +
"A value of '*' indicates ACL should apply to all tokens.")
.withRequiredArg
.describedAs("delegation-token")
.ofType(classOf[String])

val resourcePatternType = parser.accepts("resource-pattern-type", "The type of the resource pattern or pattern filter. " +
val resourcePatternType: OptionSpec[PatternType] = parser.accepts("resource-pattern-type", "The type of the resource pattern or pattern filter. " +
"When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'. " +
"When listing or removing acls, a specific pattern type can be used to list or remove acls from specific resource patterns, " +
"or use the filter values of 'any' or 'match', where 'any' will match any pattern type, but will match the resource name exactly, " +
Expand All @@ -560,24 +560,24 @@ object AclCommand extends Logging {
.withValuesConvertedBy(new PatternTypeConverter())
.defaultsTo(PatternType.LITERAL)

val addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.")
val removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.")
val listOpt = parser.accepts("list", "List ACLs for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.")
val addOpt: OptionSpecBuilder = parser.accepts("add", "Indicates you are trying to add ACLs.")
val removeOpt: OptionSpecBuilder = parser.accepts("remove", "Indicates you are trying to remove ACLs.")
val listOpt: OptionSpecBuilder = parser.accepts("list", "List ACLs for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.")

val operationsOpt = parser.accepts("operation", "Operation that is being allowed or denied. Valid operation names are: " + Newline +
val operationsOpt: OptionSpec[String] = parser.accepts("operation", "Operation that is being allowed or denied. Valid operation names are: " + Newline +
AclEntry.AclOperations.map("\t" + JSecurityUtils.operationName(_)).mkString(Newline) + Newline)
.withRequiredArg
.ofType(classOf[String])
.defaultsTo(JSecurityUtils.operationName(AclOperation.ALL))

val allowPrincipalsOpt = parser.accepts("allow-principal", "principal is in principalType:name format." +
val allowPrincipalsOpt: OptionSpec[String] = parser.accepts("allow-principal", "principal is in principalType:name format." +
" Note that principalType must be supported by the Authorizer being used." +
" For example, User:'*' is the wild card indicating all users.")
.withRequiredArg
.describedAs("allow-principal")
.ofType(classOf[String])

val denyPrincipalsOpt = parser.accepts("deny-principal", "principal is in principalType:name format. " +
val denyPrincipalsOpt: OptionSpec[String] = parser.accepts("deny-principal", "principal is in principalType:name format. " +
"By default anyone not added through --allow-principal is denied access. " +
"You only need to use this option as negation to already allowed set. " +
"Note that principalType must be supported by the Authorizer being used. " +
Expand All @@ -588,33 +588,33 @@ object AclCommand extends Logging {
.describedAs("deny-principal")
.ofType(classOf[String])

val listPrincipalsOpt = parser.accepts("principal", "List ACLs for the specified principal. principal is in principalType:name format." +
val listPrincipalsOpt: OptionSpec[String] = parser.accepts("principal", "List ACLs for the specified principal. principal is in principalType:name format." +
" Note that principalType must be supported by the Authorizer being used. Multiple --principal option can be passed.")
.withOptionalArg()
.describedAs("principal")
.ofType(classOf[String])

val allowHostsOpt = parser.accepts("allow-host", "Host from which principals listed in --allow-principal will have access. " +
val allowHostsOpt: OptionSpec[String] = parser.accepts("allow-host", "Host from which principals listed in --allow-principal will have access. " +
"If you have specified --allow-principal then the default for this option will be set to '*' which allows access from all hosts.")
.withRequiredArg
.describedAs("allow-host")
.ofType(classOf[String])

val denyHostsOpt = parser.accepts("deny-host", "Host from which principals listed in --deny-principal will be denied access. " +
val denyHostsOpt: OptionSpec[String] = parser.accepts("deny-host", "Host from which principals listed in --deny-principal will be denied access. " +
"If you have specified --deny-principal then the default for this option will be set to '*' which denies access from all hosts.")
.withRequiredArg
.describedAs("deny-host")
.ofType(classOf[String])

val producerOpt = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " +
val producerOpt: OptionSpecBuilder = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " +
"This will generate ACLs that allows WRITE,DESCRIBE and CREATE on topic.")

val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " +
val consumerOpt: OptionSpecBuilder = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " +
"This will generate ACLs that allows READ,DESCRIBE on topic and READ on group.")

val forceOpt = parser.accepts("force", "Assume Yes to all queries and do not prompt.")
val forceOpt: OptionSpecBuilder = parser.accepts("force", "Assume Yes to all queries and do not prompt.")

val zkTlsConfigFile = parser.accepts("zk-tls-config-file",
val zkTlsConfigFile: OptionSpec[String] = parser.accepts("zk-tls-config-file",
"DEPRECATED: Identifies the file where ZooKeeper client TLS connectivity properties are defined for" +
" the default authorizer kafka.security.authorizer.AclAuthorizer." +
" Any properties other than the following (with or without an \"authorizer.\" prefix) are ignored: " +
Expand All @@ -624,7 +624,7 @@ object AclCommand extends Logging {
AclCommand.AuthorizerDeprecationMessage)
.withRequiredArg().describedAs("Authorizer ZooKeeper TLS configuration").ofType(classOf[String])

val userPrincipalOpt = parser.accepts("user-principal", "Specifies a user principal as a resource in relation with the operation. For instance " +
val userPrincipalOpt: OptionSpec[String] = parser.accepts("user-principal", "Specifies a user principal as a resource in relation with the operation. For instance " +
"one could grant CreateTokens or DescribeTokens permission on a given user principal.")
.withRequiredArg()
.describedAs("user-principal")
Expand Down
45 changes: 23 additions & 22 deletions core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.IOException
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
import joptsimple.OptionSpec
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.common.utils.Utils
Expand Down Expand Up @@ -61,8 +62,8 @@ object BrokerApiVersionsCommand {
val brokerMap = adminClient.listAllBrokerVersionInfo()
brokerMap.forKeyValue { (broker, versionInfoOrError) =>
versionInfoOrError match {
case Success(v) => out.print(s"${broker} -> ${v.toString(true)}\n")
case Failure(v) => out.print(s"${broker} -> ERROR: ${v}\n")
case Success(v) => out.print(s"$broker -> ${v.toString(true)}\n")
case Failure(v) => out.print(s"$broker -> ERROR: $v\n")
}
}
adminClient.close()
Expand All @@ -77,22 +78,22 @@ object BrokerApiVersionsCommand {
AdminClient.create(props)
}

class BrokerVersionCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
val BootstrapServerDoc = "REQUIRED: The server to connect to."
val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."
private class BrokerVersionCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
private val BootstrapServerDoc = "REQUIRED: The server to connect to."
private val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."

val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
val commandConfigOpt: OptionSpec[String] = parser.accepts("command-config", CommandConfigDoc)
.withRequiredArg
.describedAs("command config property file")
.ofType(classOf[String])
val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
val bootstrapServerOpt: OptionSpec[String] = parser.accepts("bootstrap-server", BootstrapServerDoc)
.withRequiredArg
.describedAs("server(s) to use for bootstrapping")
.ofType(classOf[String])
options = parser.parse(args : _*)
checkArgs()

def checkArgs(): Unit = {
private def checkArgs(): Unit = {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to retrieve broker version information.")
// check required args
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
Expand All @@ -105,10 +106,10 @@ object BrokerApiVersionsCommand {
val client: ConsumerNetworkClient,
val bootstrapBrokers: List[Node]) extends Logging {

@volatile var running = true
val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]()
@volatile private var running = true
private val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]()

val networkThread = new KafkaThread("admin-client-network-thread", () => {
private val networkThread = new KafkaThread("admin-client-network-thread", () => {
try {
while (running)
client.poll(time.timer(Long.MaxValue))
Expand Down Expand Up @@ -200,17 +201,17 @@ object BrokerApiVersionsCommand {
}

private object AdminClient {
val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
val DefaultRequestTimeoutMs = 5000
val DefaultMaxInFlightRequestsPerConnection = 100
val DefaultReconnectBackoffMs = 50
val DefaultReconnectBackoffMax = 50
val DefaultSendBufferBytes = 128 * 1024
val DefaultReceiveBufferBytes = 32 * 1024
val DefaultRetryBackoffMs = 100

val AdminClientIdSequence = new AtomicInteger(1)
val AdminConfigDef = {
private val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
private val DefaultRequestTimeoutMs = 5000
private val DefaultMaxInFlightRequestsPerConnection = 100
private val DefaultReconnectBackoffMs = 50
private val DefaultReconnectBackoffMax = 50
private val DefaultSendBufferBytes = 128 * 1024
private val DefaultReceiveBufferBytes = 32 * 1024
private val DefaultRetryBackoffMs = 100

private val AdminClientIdSequence = new AtomicInteger(1)
private val AdminConfigDef = {
val config = new ConfigDef()
.define(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
Expand Down
Loading

0 comments on commit 2a6e420

Please sign in to comment.