Skip to content

Commit

Permalink
MINOR: Convert AlterReplicaLogDirsRequestTest to KRaft (#17764)
Browse files Browse the repository at this point in the history
Reviewers: Mickael Maison <[email protected]>
  • Loading branch information
cmccabe authored Nov 12, 2024
1 parent 207b359 commit 939831f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 27 deletions.
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.NodeToControllerChannelManager
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.kafka.storage.log.metrics.BrokerTopicStats

import java.time.Duration
Expand Down Expand Up @@ -83,6 +84,7 @@ trait KafkaBroker extends Logging {
def dataPlaneRequestProcessor: KafkaApis
def kafkaScheduler: Scheduler
def kafkaYammerMetrics: KafkaYammerMetrics
def logDirFailureChannel: LogDirFailureChannel
def logManager: LogManager
def remoteLogManagerOpt: Option[RemoteLogManager]
def metrics: Metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.kafka.common.requests.{AlterReplicaLogDirsRequest, AlterReplic
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.storage.internals.log.LogFileUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import java.util.Properties
import scala.jdk.CollectionConverters._
Expand All @@ -51,47 +52,49 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
.find(p => p.partitionIndex == tp.partition).get.errorCode)
}

@Test
def testAlterReplicaLogDirsRequest(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterReplicaLogDirsRequest(quorum: String): Unit = {
val partitionNum = 5

// Alter replica dir before topic creation
val logDir1 = new File(servers.head.config.logDirs(Random.nextInt(logDirCount))).getAbsolutePath
val logDir1 = new File(brokers.head.config.logDirs(Random.nextInt(logDirCount))).getAbsolutePath
val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap
val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1)

// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions
(0 until partitionNum).foreach { partition =>
val tp = new TopicPartition(topic, partition)
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp))
assertTrue(servers.head.logManager.getLog(tp).isEmpty)
assertTrue(brokers.head.logManager.getLog(tp).isEmpty)
}

createTopic(topic, partitionNum)
(0 until partitionNum).foreach { partition =>
assertEquals(logDir1, servers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent)
assertEquals(logDir1, brokers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent)
}

// Alter replica dir again after topic creation
val logDir2 = new File(servers.head.config.logDirs(Random.nextInt(logDirCount))).getAbsolutePath
val logDir2 = new File(brokers.head.config.logDirs(Random.nextInt(logDirCount))).getAbsolutePath
val partitionDirs2 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir2).toMap
val alterReplicaLogDirsResponse2 = sendAlterReplicaLogDirsRequest(partitionDirs2)
// The response should succeed for all partitions
(0 until partitionNum).foreach { partition =>
val tp = new TopicPartition(topic, partition)
assertEquals(Errors.NONE, findErrorForPartition(alterReplicaLogDirsResponse2, tp))
TestUtils.waitUntilTrue(() => {
logDir2 == servers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent
logDir2 == brokers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent
}, "timed out waiting for replica movement")
}
}

@Test
def testAlterReplicaLogDirsRequestErrorCode(): Unit = {
val offlineDir = new File(servers.head.config.logDirs.tail.head).getAbsolutePath
val validDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
val validDir2 = new File(servers.head.config.logDirs(2)).getAbsolutePath
val validDir3 = new File(servers.head.config.logDirs(3)).getAbsolutePath
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterReplicaLogDirsRequestErrorCode(quorum: String): Unit = {
val offlineDir = new File(brokers.head.config.logDirs.tail.head).getAbsolutePath
val validDir1 = new File(brokers.head.config.logDirs(1)).getAbsolutePath
val validDir2 = new File(brokers.head.config.logDirs(2)).getAbsolutePath
val validDir3 = new File(brokers.head.config.logDirs(3)).getAbsolutePath

// Test AlterReplicaDirRequest before topic creation
val partitionDirs1 = mutable.Map.empty[TopicPartition, String]
Expand All @@ -112,8 +115,8 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
assertEquals(Errors.NONE, findErrorForPartition(alterReplicaDirResponse2, new TopicPartition(topic, 1)))

// Test AlterReplicaDirRequest after topic creation and log directory failure
servers.head.logDirFailureChannel.maybeAddOfflineLogDir(offlineDir, "", new java.io.IOException())
TestUtils.waitUntilTrue(() => !servers.head.logManager.isLogDirOnline(offlineDir), s"timed out waiting for $offlineDir to be offline", 3000)
brokers.head.logDirFailureChannel.maybeAddOfflineLogDir(offlineDir, "", new java.io.IOException())
TestUtils.waitUntilTrue(() => !brokers.head.logManager.isLogDirOnline(offlineDir), s"timed out waiting for $offlineDir to be offline", 3000)
val partitionDirs3 = mutable.Map.empty[TopicPartition, String]
partitionDirs3.put(new TopicPartition(topic, 0), "invalidDir")
partitionDirs3.put(new TopicPartition(topic, 1), validDir3)
Expand All @@ -124,19 +127,20 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
}

@Test
def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterReplicaLogDirsRequestWithRetention(quorum: String): Unit = {
val partitionNum = 1

// Alter replica dir before topic creation
val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
val logDir1 = new File(brokers.head.config.logDirs(1)).getAbsolutePath
val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap
val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1)

// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions
val tp = new TopicPartition(topic, 0)
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp))
assertTrue(servers.head.logManager.getLog(tp).isEmpty)
assertTrue(brokers.head.logManager.getLog(tp).isEmpty)

val topicProperties = new Properties()
topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
Expand All @@ -147,13 +151,13 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024")

createTopic(topic, partitionNum, 1, topicProperties)
assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent)
assertEquals(logDir1, brokers.head.logManager.getLog(tp).get.dir.getParent)

// send enough records to trigger log rolling
(0 until 20).foreach { _ =>
TestUtils.generateAndProduceMessages(servers, topic, 10, 1)
TestUtils.generateAndProduceMessages(brokers, topic, 10, 1)
}
TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new TopicPartition(topic, 0)).get.numberOfSegments > 1,
TestUtils.waitUntilTrue(() => brokers.head.logManager.getLog(new TopicPartition(topic, 0)).get.numberOfSegments > 1,
"timed out waiting for log segment to roll")

// Wait for log segment retention in original dir.
Expand All @@ -162,12 +166,12 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
}, "timed out waiting for log segment to retention")

// Alter replica dir again after topic creation
val logDir2 = new File(servers.head.config.logDirs(2)).getAbsolutePath
val logDir2 = new File(brokers.head.config.logDirs(2)).getAbsolutePath
val alterReplicaLogDirsResponse2 = sendAlterReplicaLogDirsRequest(Map(tp -> logDir2))
// The response should succeed for all partitions
assertEquals(Errors.NONE, findErrorForPartition(alterReplicaLogDirsResponse2, tp))
TestUtils.waitUntilTrue(() => {
logDir2 == servers.head.logManager.getLog(tp).get.dir.getParent
logDir2 == brokers.head.logManager.getLog(tp).get.dir.getParent
}, "timed out waiting for replica movement")

// Make sure the deleted log segment is removed
Expand All @@ -191,7 +195,6 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
val data = new AlterReplicaLogDirsRequestData()
.setDirs(new AlterReplicaLogDirsRequestData.AlterReplicaLogDirCollection(logDirs.asJava.iterator))
val request = new AlterReplicaLogDirsRequest.Builder(data).build()
connectAndReceive[AlterReplicaLogDirsResponse](request, destination = controllerSocketServer)
connectAndReceive[AlterReplicaLogDirsResponse](request, anySocketServer)
}

}

0 comments on commit 939831f

Please sign in to comment.