Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: convert AlterReplicaLogDirsRequestTest to KRaft #17764

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.NodeToControllerChannelManager
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.kafka.storage.log.metrics.BrokerTopicStats

import java.time.Duration
Expand Down Expand Up @@ -83,6 +84,7 @@ trait KafkaBroker extends Logging {
def dataPlaneRequestProcessor: KafkaApis
def kafkaScheduler: Scheduler
def kafkaYammerMetrics: KafkaYammerMetrics
def logDirFailureChannel: LogDirFailureChannel
def logManager: LogManager
def remoteLogManagerOpt: Option[RemoteLogManager]
def metrics: Metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.kafka.common.requests.{AlterReplicaLogDirsRequest, AlterReplic
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.storage.internals.log.LogFileUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import java.util.Properties
import scala.jdk.CollectionConverters._
Expand All @@ -51,47 +52,49 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
.find(p => p.partitionIndex == tp.partition).get.errorCode)
}

@Test
def testAlterReplicaLogDirsRequest(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterReplicaLogDirsRequest(quorum: String): Unit = {
val partitionNum = 5

// Alter replica dir before topic creation
val logDir1 = new File(servers.head.config.logDirs(Random.nextInt(logDirCount))).getAbsolutePath
val logDir1 = new File(brokers.head.config.logDirs(Random.nextInt(logDirCount))).getAbsolutePath
val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap
val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1)

// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions
(0 until partitionNum).foreach { partition =>
val tp = new TopicPartition(topic, partition)
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp))
assertTrue(servers.head.logManager.getLog(tp).isEmpty)
assertTrue(brokers.head.logManager.getLog(tp).isEmpty)
}

createTopic(topic, partitionNum)
(0 until partitionNum).foreach { partition =>
assertEquals(logDir1, servers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent)
assertEquals(logDir1, brokers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent)
}

// Alter replica dir again after topic creation
val logDir2 = new File(servers.head.config.logDirs(Random.nextInt(logDirCount))).getAbsolutePath
val logDir2 = new File(brokers.head.config.logDirs(Random.nextInt(logDirCount))).getAbsolutePath
val partitionDirs2 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir2).toMap
val alterReplicaLogDirsResponse2 = sendAlterReplicaLogDirsRequest(partitionDirs2)
// The response should succeed for all partitions
(0 until partitionNum).foreach { partition =>
val tp = new TopicPartition(topic, partition)
assertEquals(Errors.NONE, findErrorForPartition(alterReplicaLogDirsResponse2, tp))
TestUtils.waitUntilTrue(() => {
logDir2 == servers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent
logDir2 == brokers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent
}, "timed out waiting for replica movement")
}
}

@Test
def testAlterReplicaLogDirsRequestErrorCode(): Unit = {
val offlineDir = new File(servers.head.config.logDirs.tail.head).getAbsolutePath
val validDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
val validDir2 = new File(servers.head.config.logDirs(2)).getAbsolutePath
val validDir3 = new File(servers.head.config.logDirs(3)).getAbsolutePath
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterReplicaLogDirsRequestErrorCode(quorum: String): Unit = {
val offlineDir = new File(brokers.head.config.logDirs.tail.head).getAbsolutePath
val validDir1 = new File(brokers.head.config.logDirs(1)).getAbsolutePath
val validDir2 = new File(brokers.head.config.logDirs(2)).getAbsolutePath
val validDir3 = new File(brokers.head.config.logDirs(3)).getAbsolutePath

// Test AlterReplicaDirRequest before topic creation
val partitionDirs1 = mutable.Map.empty[TopicPartition, String]
Expand All @@ -112,8 +115,8 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
assertEquals(Errors.NONE, findErrorForPartition(alterReplicaDirResponse2, new TopicPartition(topic, 1)))

// Test AlterReplicaDirRequest after topic creation and log directory failure
servers.head.logDirFailureChannel.maybeAddOfflineLogDir(offlineDir, "", new java.io.IOException())
TestUtils.waitUntilTrue(() => !servers.head.logManager.isLogDirOnline(offlineDir), s"timed out waiting for $offlineDir to be offline", 3000)
brokers.head.logDirFailureChannel.maybeAddOfflineLogDir(offlineDir, "", new java.io.IOException())
TestUtils.waitUntilTrue(() => !brokers.head.logManager.isLogDirOnline(offlineDir), s"timed out waiting for $offlineDir to be offline", 3000)
val partitionDirs3 = mutable.Map.empty[TopicPartition, String]
partitionDirs3.put(new TopicPartition(topic, 0), "invalidDir")
partitionDirs3.put(new TopicPartition(topic, 1), validDir3)
Expand All @@ -124,19 +127,20 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
}

@Test
def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterReplicaLogDirsRequestWithRetention(quorum: String): Unit = {
val partitionNum = 1

// Alter replica dir before topic creation
val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
val logDir1 = new File(brokers.head.config.logDirs(1)).getAbsolutePath
val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap
val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1)

// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions
val tp = new TopicPartition(topic, 0)
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp))
assertTrue(servers.head.logManager.getLog(tp).isEmpty)
assertTrue(brokers.head.logManager.getLog(tp).isEmpty)

val topicProperties = new Properties()
topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
Expand All @@ -147,13 +151,13 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024")

createTopic(topic, partitionNum, 1, topicProperties)
assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent)
assertEquals(logDir1, brokers.head.logManager.getLog(tp).get.dir.getParent)

// send enough records to trigger log rolling
(0 until 20).foreach { _ =>
TestUtils.generateAndProduceMessages(servers, topic, 10, 1)
TestUtils.generateAndProduceMessages(brokers, topic, 10, 1)
}
TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new TopicPartition(topic, 0)).get.numberOfSegments > 1,
TestUtils.waitUntilTrue(() => brokers.head.logManager.getLog(new TopicPartition(topic, 0)).get.numberOfSegments > 1,
"timed out waiting for log segment to roll")

// Wait for log segment retention in original dir.
Expand All @@ -162,12 +166,12 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
}, "timed out waiting for log segment to retention")

// Alter replica dir again after topic creation
val logDir2 = new File(servers.head.config.logDirs(2)).getAbsolutePath
val logDir2 = new File(brokers.head.config.logDirs(2)).getAbsolutePath
val alterReplicaLogDirsResponse2 = sendAlterReplicaLogDirsRequest(Map(tp -> logDir2))
// The response should succeed for all partitions
assertEquals(Errors.NONE, findErrorForPartition(alterReplicaLogDirsResponse2, tp))
TestUtils.waitUntilTrue(() => {
logDir2 == servers.head.logManager.getLog(tp).get.dir.getParent
logDir2 == brokers.head.logManager.getLog(tp).get.dir.getParent
}, "timed out waiting for replica movement")

// Make sure the deleted log segment is removed
Expand All @@ -191,7 +195,6 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
val data = new AlterReplicaLogDirsRequestData()
.setDirs(new AlterReplicaLogDirsRequestData.AlterReplicaLogDirCollection(logDirs.asJava.iterator))
val request = new AlterReplicaLogDirsRequest.Builder(data).build()
connectAndReceive[AlterReplicaLogDirsResponse](request, destination = controllerSocketServer)
connectAndReceive[AlterReplicaLogDirsResponse](request, anySocketServer)
}

}