Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into upgrade/maven_artifact_version
Browse files Browse the repository at this point in the history
  • Loading branch information
bmscomp authored Feb 5, 2024
2 parents 0bb6806 + a63131a commit e79f725
Show file tree
Hide file tree
Showing 38 changed files with 1,811 additions and 543 deletions.
6 changes: 4 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ plugins {
// be dropped from gradle/resources/dependencycheck-suppressions.xml
id "com.github.spotbugs" version '5.1.3' apply false
id 'org.scoverage' version '7.0.1' apply false
id 'com.github.johnrengelman.shadow' version '8.1.1' apply false
// Updating the shadow plugin version to 8.1.1 causes issue with signing and publishing the shadowed
// artifacts - see https://github.com/johnrengelman/shadow/issues/901
id 'com.github.johnrengelman.shadow' version '8.1.0' apply false
id 'com.diffplug.spotless' version '6.14.0' apply false // 6.14.1 and newer require Java 11 at compile time, so we can't upgrade until AK 4.0
}

Expand Down Expand Up @@ -302,7 +304,7 @@ subprojects {
from components.java
} else {
apply plugin: 'com.github.johnrengelman.shadow'
artifact shadowJar
project.shadow.component(mavenJava)
}

afterEvaluate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import java.util.Map;
import java.util.stream.Collectors;

public class LeaderAndIsrRequest extends AbstractControlRequest {
public final class LeaderAndIsrRequest extends AbstractControlRequest {

public static class Builder extends AbstractControlRequest.Builder<LeaderAndIsrRequest> {

Expand Down Expand Up @@ -129,7 +129,7 @@ public String toString() {

private final LeaderAndIsrRequestData data;

LeaderAndIsrRequest(LeaderAndIsrRequestData data, short version) {
public LeaderAndIsrRequest(LeaderAndIsrRequestData data, short version) {
super(ApiKeys.LEADER_AND_ISR, version);
this.data = data;
// Do this from the constructor to make it thread-safe (even though it's only needed when some methods are called)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public ListGroupsRequest build(short version) {
throw new UnsupportedVersionException("The broker only supports ListGroups " +
"v" + version + ", but we need v4 or newer to request groups by states.");
}
if (!data.typesFilter().isEmpty() && version < 5) {
throw new UnsupportedVersionException("The broker only supports ListGroups " +
"v" + version + ", but we need v5 or newer to request groups by type.");
}
return new ListGroupsRequest(data, version);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

import static java.util.Collections.singletonList;

public class UpdateMetadataRequest extends AbstractControlRequest {
public final class UpdateMetadataRequest extends AbstractControlRequest {

public static class Builder extends AbstractControlRequest.Builder<UpdateMetadataRequest> {
private final List<UpdateMetadataPartitionState> partitionStates;
Expand Down Expand Up @@ -149,7 +149,7 @@ public String toString() {

private final UpdateMetadataRequestData data;

UpdateMetadataRequest(UpdateMetadataRequestData data, short version) {
public UpdateMetadataRequest(UpdateMetadataRequestData data, short version) {
super(ApiKeys.UPDATE_METADATA, version);
this.data = data;
// Do this from the constructor to make it thread-safe (even though it's only needed when some methods are called)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
// Version 3 is the first flexible version.
//
// Version 4 adds the StatesFilter field (KIP-518).
"validVersions": "0-4",
//
// Version 5 adds the TypesFilter field (KIP-848).
"validVersions": "0-5",
"flexibleVersions": "3+",
"fields": [
{ "name": "StatesFilter", "type": "[]string", "versions": "4+",
"about": "The states of the groups we want to list. If empty all groups are returned with their state."
}
"about": "The states of the groups we want to list. If empty, all groups are returned with their state." },
{ "name": "TypesFilter", "type": "[]string", "versions": "5+",
"about": "The types of the groups we want to list. If empty, all groups are returned with their type." }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
"name": "ListGroupsResponse",
// Version 1 adds the throttle time.
//
// Starting in version 2, on quota violation, brokers send out responses before throttling.
// Starting in version 2, on quota violation, brokers send out
// responses before throttling.
//
// Version 3 is the first flexible version.
//
// Version 4 adds the GroupState field (KIP-518).
"validVersions": "0-4",
//
// Version 5 adds the GroupType field (KIP-848).
"validVersions": "0-5",
"flexibleVersions": "3+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
Expand All @@ -38,7 +41,9 @@
{ "name": "ProtocolType", "type": "string", "versions": "0+",
"about": "The group protocol type." },
{ "name": "GroupState", "type": "string", "versions": "4+", "ignorable": true,
"about": "The group state name." }
"about": "The group state name." },
{ "name": "GroupType", "type": "string", "versions": "5+", "ignorable": true,
"about": "The group type name." }
]}
]
}
8 changes: 7 additions & 1 deletion core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition,
private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
targetLogDirectoryId match {
case Some(directoryId) =>
createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
} else {
warn(s"Skipping creation of log because there are potentially offline log " +
s"directories and log may already exist there. directoryId=$directoryId, " +
s"topicId=$topicId, targetLogDirectoryId=$targetLogDirectoryId")
}

case None =>
createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId)
Expand Down
22 changes: 14 additions & 8 deletions core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.OffsetConfig
import org.apache.kafka.coordinator.group.{Group, OffsetConfig}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.internals.log.VerificationGuard

Expand Down Expand Up @@ -1111,17 +1111,23 @@ private[group] class GroupCoordinator(
}
}

def handleListGroups(states: Set[String]): (Errors, List[GroupOverview]) = {
def handleListGroups(states: Set[String], groupTypes: Set[String]): (Errors, List[GroupOverview]) = {
if (!isActive.get) {
(Errors.COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
} else {
val errorCode = if (groupManager.isLoading) Errors.COORDINATOR_LOAD_IN_PROGRESS else Errors.NONE
// if states is empty, return all groups
val groups = if (states.isEmpty)
groupManager.currentGroups
else {
val caseInsensitiveStates = states.map(_.toLowerCase)
groupManager.currentGroups.filter(g => g.isInStates(caseInsensitiveStates))

// Convert state filter strings to lower case and group type strings to the corresponding enum type.
// This is done to ensure a case-insensitive comparison.
val caseInsensitiveStates = states.map(_.toLowerCase)
val enumTypesFilter: Set[Group.GroupType] = groupTypes.map(Group.GroupType.parse)

// Filter groups based on states and groupTypes. If either is empty, it won't filter on that criterion.
// While using the old group coordinator, all groups are considered classic groups by default.
// An empty list is returned for any other type filter.
val groups = groupManager.currentGroups.filter { g =>
(states.isEmpty || g.isInStates(caseInsensitiveStates)) &&
(enumTypesFilter.isEmpty || enumTypesFilter.contains(Group.GroupType.CLASSIC))
}
(errorCode, groups.map(_.overview).toList)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,10 @@ private[group] class GroupCoordinatorAdapter(
context: RequestContext,
request: ListGroupsRequestData
): CompletableFuture[ListGroupsResponseData] = {
// Handle a null array the same as empty
// Handle a null array the same as empty.
val (error, groups) = coordinator.handleListGroups(
Option(request.statesFilter).map(_.asScala.toSet).getOrElse(Set.empty)
Option(request.statesFilter).map(_.asScala.toSet).getOrElse(Set.empty),
Option(request.typesFilter).map(_.asScala.toSet).getOrElse(Set.empty)
)

val response = new ListGroupsResponseData()
Expand All @@ -223,7 +224,8 @@ private[group] class GroupCoordinatorAdapter(
response.groups.add(new ListGroupsResponseData.ListedGroup()
.setGroupId(group.groupId)
.setProtocolType(group.protocolType)
.setGroupState(group.state))
.setGroupState(group.state)
.setGroupType(group.groupType))
}

CompletableFuture.completedFuture(response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMe
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.types.SchemaException
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.Group

import scala.collection.{Seq, immutable, mutable}
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -158,7 +159,8 @@ private object GroupMetadata extends Logging {
*/
case class GroupOverview(groupId: String,
protocolType: String,
state: String)
state: String,
groupType: String)

/**
* Case class used to represent group metadata for the DescribeGroup API
Expand Down Expand Up @@ -611,7 +613,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
}

def overview: GroupOverview = {
GroupOverview(groupId, protocolType.getOrElse(""), state.toString)
GroupOverview(groupId, protocolType.getOrElse(""), state.toString, Group.GroupType.CLASSIC.toString)
}

def initializeOffsets(offsets: collection.Map[TopicPartition, CommitRecordMetadataAndOffset],
Expand Down
51 changes: 50 additions & 1 deletion core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
import kafka.utils.Implicits._
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.requests.{AbstractControlRequest, LeaderAndIsrRequest}
import org.apache.kafka.image.TopicsImage
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, PropertiesUtils}

Expand All @@ -47,6 +48,7 @@ import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, RemoteIndexCache}
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler

import java.util
import scala.annotation.nowarn

/**
Expand Down Expand Up @@ -135,6 +137,9 @@ class LogManager(logDirs: Seq[File],

private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]()

def hasOfflineLogDirs(): Boolean = offlineLogDirs.nonEmpty
def onlineLogDirId(uuid: Uuid): Boolean = directoryIds.exists(_._2 == uuid)

private def offlineLogDirs: Iterable[File] = {
val logDirsSet = mutable.Set[File]() ++= logDirs
_liveLogDirs.forEach(dir => logDirsSet -= dir)
Expand Down Expand Up @@ -1283,7 +1288,7 @@ class LogManager(logDirs: Seq[File],
* @param errorHandler The error handler that will be called when a exception for a particular
* topic-partition is raised
*/
def asyncDelete(topicPartitions: Set[TopicPartition],
def asyncDelete(topicPartitions: Iterable[TopicPartition],
isStray: Boolean,
errorHandler: (TopicPartition, Throwable) => Unit): Unit = {
val logDirs = mutable.Set.empty[File]
Expand Down Expand Up @@ -1571,4 +1576,48 @@ object LogManager {
}
}
}

/**
* Find logs which should not be on the current broker, according to the full LeaderAndIsrRequest.
*
* @param brokerId The ID of the current broker.
* @param request The full LeaderAndIsrRequest, containing all partitions owned by the broker.
* @param logs A collection of Log objects.
*
* @return The topic partitions which are no longer needed on this broker.
*/
def findStrayReplicas(
brokerId: Int,
request: LeaderAndIsrRequest,
logs: Iterable[UnifiedLog]
): Iterable[TopicPartition] = {
if (request.requestType() != AbstractControlRequest.Type.FULL) {
throw new RuntimeException("Cannot use incremental LeaderAndIsrRequest to find strays.")
}
val partitions = new util.HashMap[TopicPartition, Uuid]()
request.data().topicStates().forEach(topicState => {
topicState.partitionStates().forEach(partition => {
partitions.put(new TopicPartition(topicState.topicName(), partition.partitionIndex()),
topicState.topicId());
})
})
logs.flatMap { log =>
val topicId = log.topicId.getOrElse {
throw new RuntimeException(s"The log dir $log does not have a topic ID, " +
"which is not allowed when running in KRaft mode.")
}
Option(partitions.get(log.topicPartition)) match {
case Some(id) =>
if (id.equals(topicId)) {
None
} else {
info(s"Found stray log dir $log: this partition now exists with topic ID $id not $topicId.")
Some(log.topicPartition)
}
case None =>
info(s"Found stray log dir $log: this partition does not exist in the new full LeaderAndIsrRequest.")
Some(log.topicPartition)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class DelayedDeleteRecords(delayMs: Long,
(false, Errors.NOT_LEADER_OR_FOLLOWER, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
}

case HostedPartition.Offline =>
case HostedPartition.Offline(_) =>
(false, Errors.KAFKA_STORAGE_ERROR, DeleteRecordsResponse.INVALID_LOW_WATERMARK)

case HostedPartition.None =>
Expand Down
Loading

0 comments on commit e79f725

Please sign in to comment.