Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: cleanup core modules part 1 #15252

Merged
merged 3 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 29 additions & 29 deletions core/src/main/scala/kafka/admin/AclCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ import scala.io.StdIn

object AclCommand extends Logging {

val AuthorizerDeprecationMessage: String = "Warning: support for ACL configuration directly " +
private val AuthorizerDeprecationMessage: String = "Warning: support for ACL configuration directly " +
"through the authorizer is deprecated and will be removed in a future release. Please use " +
"--bootstrap-server instead to set ACLs through the admin client."
val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, PatternType.LITERAL)
private val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, PatternType.LITERAL)

private val Newline = scala.util.Properties.lineSeparator

Expand Down Expand Up @@ -89,7 +89,7 @@ object AclCommand extends Logging {
def listAcls(): Unit
}

class AdminClientService(val opts: AclCommandOptions) extends AclCommandService with Logging {
private class AdminClientService(val opts: AclCommandOptions) extends AclCommandService with Logging {

private def withAdminClient(opts: AclCommandOptions)(f: Admin => Unit): Unit = {
val props = if (opts.options.has(opts.commandConfigOpt))
Expand Down Expand Up @@ -487,69 +487,69 @@ object AclCommand extends Logging {
for ((resource, acls) <- resourceToAcls) {
val validOps = AclEntry.supportedOperations(resource.resourceType) + AclOperation.ALL
if ((acls.map(_.operation) -- validOps).nonEmpty)
CommandLineUtils.printUsageAndExit(opts.parser, s"ResourceType ${resource.resourceType} only supports operations ${validOps.map(JSecurityUtils.operationName(_)).mkString(", ")}")
CommandLineUtils.printUsageAndExit(opts.parser, s"ResourceType ${resource.resourceType} only supports operations ${validOps.map(JSecurityUtils.operationName).mkString(", ")}")
}
}

class AclCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."

val bootstrapServerOpt = parser.accepts("bootstrap-server", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." +
val bootstrapServerOpt: OptionSpec[String] = parser.accepts("bootstrap-server", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." +
" This list should be in the form host1:port1,host2:port2,... This config is required for acl management using admin client API.")
.withRequiredArg
.describedAs("server to connect to")
.ofType(classOf[String])

val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
val commandConfigOpt: OptionSpec[String] = parser.accepts("command-config", CommandConfigDoc)
.withOptionalArg()
.describedAs("command-config")
.ofType(classOf[String])

val authorizerOpt = parser.accepts("authorizer", "DEPRECATED: Fully qualified class name of " +
val authorizerOpt: OptionSpec[String] = parser.accepts("authorizer", "DEPRECATED: Fully qualified class name of " +
"the authorizer, which defaults to kafka.security.authorizer.AclAuthorizer if --bootstrap-server is not provided. " +
AclCommand.AuthorizerDeprecationMessage)
.withRequiredArg
.describedAs("authorizer")
.ofType(classOf[String])

val authorizerPropertiesOpt = parser.accepts("authorizer-properties", "DEPRECATED: The " +
val authorizerPropertiesOpt: OptionSpec[String] = parser.accepts("authorizer-properties", "DEPRECATED: The " +
"properties required to configure an instance of the Authorizer specified by --authorizer. " +
"These are key=val pairs. For the default authorizer, example values are: zookeeper.connect=localhost:2181. " +
AclCommand.AuthorizerDeprecationMessage)
.withRequiredArg
.describedAs("authorizer-properties")
.ofType(classOf[String])

val topicOpt = parser.accepts("topic", "topic to which ACLs should be added or removed. " +
val topicOpt: OptionSpec[String] = parser.accepts("topic", "topic to which ACLs should be added or removed. " +
"A value of '*' indicates ACL should apply to all topics.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])

val clusterOpt = parser.accepts("cluster", "Add/Remove cluster ACLs.")
val groupOpt = parser.accepts("group", "Consumer Group to which the ACLs should be added or removed. " +
val clusterOpt: OptionSpecBuilder = parser.accepts("cluster", "Add/Remove cluster ACLs.")
val groupOpt: OptionSpec[String] = parser.accepts("group", "Consumer Group to which the ACLs should be added or removed. " +
"A value of '*' indicates the ACLs should apply to all groups.")
.withRequiredArg
.describedAs("group")
.ofType(classOf[String])

val transactionalIdOpt = parser.accepts("transactional-id", "The transactionalId to which ACLs should " +
val transactionalIdOpt: OptionSpec[String] = parser.accepts("transactional-id", "The transactionalId to which ACLs should " +
"be added or removed. A value of '*' indicates the ACLs should apply to all transactionalIds.")
.withRequiredArg
.describedAs("transactional-id")
.ofType(classOf[String])

val idempotentOpt = parser.accepts("idempotent", "Enable idempotence for the producer. This should be " +
val idempotentOpt: OptionSpecBuilder = parser.accepts("idempotent", "Enable idempotence for the producer. This should be " +
"used in combination with the --producer option. Note that idempotence is enabled automatically if " +
"the producer is authorized to a particular transactional-id.")

val delegationTokenOpt = parser.accepts("delegation-token", "Delegation token to which ACLs should be added or removed. " +
val delegationTokenOpt: OptionSpec[String] = parser.accepts("delegation-token", "Delegation token to which ACLs should be added or removed. " +
"A value of '*' indicates ACL should apply to all tokens.")
.withRequiredArg
.describedAs("delegation-token")
.ofType(classOf[String])

val resourcePatternType = parser.accepts("resource-pattern-type", "The type of the resource pattern or pattern filter. " +
val resourcePatternType: OptionSpec[PatternType] = parser.accepts("resource-pattern-type", "The type of the resource pattern or pattern filter. " +
"When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'. " +
"When listing or removing acls, a specific pattern type can be used to list or remove acls from specific resource patterns, " +
"or use the filter values of 'any' or 'match', where 'any' will match any pattern type, but will match the resource name exactly, " +
Expand All @@ -560,24 +560,24 @@ object AclCommand extends Logging {
.withValuesConvertedBy(new PatternTypeConverter())
.defaultsTo(PatternType.LITERAL)

val addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.")
val removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.")
val listOpt = parser.accepts("list", "List ACLs for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.")
val addOpt: OptionSpecBuilder = parser.accepts("add", "Indicates you are trying to add ACLs.")
val removeOpt: OptionSpecBuilder = parser.accepts("remove", "Indicates you are trying to remove ACLs.")
val listOpt: OptionSpecBuilder = parser.accepts("list", "List ACLs for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.")

val operationsOpt = parser.accepts("operation", "Operation that is being allowed or denied. Valid operation names are: " + Newline +
val operationsOpt: OptionSpec[String] = parser.accepts("operation", "Operation that is being allowed or denied. Valid operation names are: " + Newline +
AclEntry.AclOperations.map("\t" + JSecurityUtils.operationName(_)).mkString(Newline) + Newline)
.withRequiredArg
.ofType(classOf[String])
.defaultsTo(JSecurityUtils.operationName(AclOperation.ALL))

val allowPrincipalsOpt = parser.accepts("allow-principal", "principal is in principalType:name format." +
val allowPrincipalsOpt: OptionSpec[String] = parser.accepts("allow-principal", "principal is in principalType:name format." +
" Note that principalType must be supported by the Authorizer being used." +
" For example, User:'*' is the wild card indicating all users.")
.withRequiredArg
.describedAs("allow-principal")
.ofType(classOf[String])

val denyPrincipalsOpt = parser.accepts("deny-principal", "principal is in principalType:name format. " +
val denyPrincipalsOpt: OptionSpec[String] = parser.accepts("deny-principal", "principal is in principalType:name format. " +
"By default anyone not added through --allow-principal is denied access. " +
"You only need to use this option as negation to already allowed set. " +
"Note that principalType must be supported by the Authorizer being used. " +
Expand All @@ -588,33 +588,33 @@ object AclCommand extends Logging {
.describedAs("deny-principal")
.ofType(classOf[String])

val listPrincipalsOpt = parser.accepts("principal", "List ACLs for the specified principal. principal is in principalType:name format." +
val listPrincipalsOpt: OptionSpec[String] = parser.accepts("principal", "List ACLs for the specified principal. principal is in principalType:name format." +
" Note that principalType must be supported by the Authorizer being used. Multiple --principal option can be passed.")
.withOptionalArg()
.describedAs("principal")
.ofType(classOf[String])

val allowHostsOpt = parser.accepts("allow-host", "Host from which principals listed in --allow-principal will have access. " +
val allowHostsOpt: OptionSpec[String] = parser.accepts("allow-host", "Host from which principals listed in --allow-principal will have access. " +
"If you have specified --allow-principal then the default for this option will be set to '*' which allows access from all hosts.")
.withRequiredArg
.describedAs("allow-host")
.ofType(classOf[String])

val denyHostsOpt = parser.accepts("deny-host", "Host from which principals listed in --deny-principal will be denied access. " +
val denyHostsOpt: OptionSpec[String] = parser.accepts("deny-host", "Host from which principals listed in --deny-principal will be denied access. " +
"If you have specified --deny-principal then the default for this option will be set to '*' which denies access from all hosts.")
.withRequiredArg
.describedAs("deny-host")
.ofType(classOf[String])

val producerOpt = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " +
val producerOpt: OptionSpecBuilder = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " +
"This will generate ACLs that allows WRITE,DESCRIBE and CREATE on topic.")

val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " +
val consumerOpt: OptionSpecBuilder = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " +
"This will generate ACLs that allows READ,DESCRIBE on topic and READ on group.")

val forceOpt = parser.accepts("force", "Assume Yes to all queries and do not prompt.")
val forceOpt: OptionSpecBuilder = parser.accepts("force", "Assume Yes to all queries and do not prompt.")

val zkTlsConfigFile = parser.accepts("zk-tls-config-file",
val zkTlsConfigFile: OptionSpec[String] = parser.accepts("zk-tls-config-file",
"DEPRECATED: Identifies the file where ZooKeeper client TLS connectivity properties are defined for" +
" the default authorizer kafka.security.authorizer.AclAuthorizer." +
" Any properties other than the following (with or without an \"authorizer.\" prefix) are ignored: " +
Expand All @@ -624,7 +624,7 @@ object AclCommand extends Logging {
AclCommand.AuthorizerDeprecationMessage)
.withRequiredArg().describedAs("Authorizer ZooKeeper TLS configuration").ofType(classOf[String])

val userPrincipalOpt = parser.accepts("user-principal", "Specifies a user principal as a resource in relation with the operation. For instance " +
val userPrincipalOpt: OptionSpec[String] = parser.accepts("user-principal", "Specifies a user principal as a resource in relation with the operation. For instance " +
"one could grant CreateTokens or DescribeTokens permission on a given user principal.")
.withRequiredArg()
.describedAs("user-principal")
Expand Down
45 changes: 23 additions & 22 deletions core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.IOException
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
import joptsimple.OptionSpec
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.common.utils.Utils
Expand Down Expand Up @@ -61,8 +62,8 @@ object BrokerApiVersionsCommand {
val brokerMap = adminClient.listAllBrokerVersionInfo()
brokerMap.forKeyValue { (broker, versionInfoOrError) =>
versionInfoOrError match {
case Success(v) => out.print(s"${broker} -> ${v.toString(true)}\n")
case Failure(v) => out.print(s"${broker} -> ERROR: ${v}\n")
case Success(v) => out.print(s"$broker -> ${v.toString(true)}\n")
case Failure(v) => out.print(s"$broker -> ERROR: $v\n")
}
}
adminClient.close()
Expand All @@ -77,22 +78,22 @@ object BrokerApiVersionsCommand {
AdminClient.create(props)
}

class BrokerVersionCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
val BootstrapServerDoc = "REQUIRED: The server to connect to."
val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."
private class BrokerVersionCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
private val BootstrapServerDoc = "REQUIRED: The server to connect to."
private val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."

val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
val commandConfigOpt: OptionSpec[String] = parser.accepts("command-config", CommandConfigDoc)
.withRequiredArg
.describedAs("command config property file")
.ofType(classOf[String])
val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
val bootstrapServerOpt: OptionSpec[String] = parser.accepts("bootstrap-server", BootstrapServerDoc)
.withRequiredArg
.describedAs("server(s) to use for bootstrapping")
.ofType(classOf[String])
options = parser.parse(args : _*)
checkArgs()

def checkArgs(): Unit = {
private def checkArgs(): Unit = {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to retrieve broker version information.")
// check required args
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
Expand All @@ -105,10 +106,10 @@ object BrokerApiVersionsCommand {
val client: ConsumerNetworkClient,
val bootstrapBrokers: List[Node]) extends Logging {

@volatile var running = true
val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]()
@volatile private var running = true
private val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]()

val networkThread = new KafkaThread("admin-client-network-thread", () => {
private val networkThread = new KafkaThread("admin-client-network-thread", () => {
try {
while (running)
client.poll(time.timer(Long.MaxValue))
Expand Down Expand Up @@ -200,17 +201,17 @@ object BrokerApiVersionsCommand {
}

private object AdminClient {
val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
val DefaultRequestTimeoutMs = 5000
val DefaultMaxInFlightRequestsPerConnection = 100
val DefaultReconnectBackoffMs = 50
val DefaultReconnectBackoffMax = 50
val DefaultSendBufferBytes = 128 * 1024
val DefaultReceiveBufferBytes = 32 * 1024
val DefaultRetryBackoffMs = 100

val AdminClientIdSequence = new AtomicInteger(1)
val AdminConfigDef = {
private val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
private val DefaultRequestTimeoutMs = 5000
private val DefaultMaxInFlightRequestsPerConnection = 100
private val DefaultReconnectBackoffMs = 50
private val DefaultReconnectBackoffMax = 50
private val DefaultSendBufferBytes = 128 * 1024
private val DefaultReceiveBufferBytes = 32 * 1024
private val DefaultRetryBackoffMs = 100

private val AdminClientIdSequence = new AtomicInteger(1)
private val AdminConfigDef = {
val config = new ConfigDef()
.define(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
Expand Down
Loading