From 939831fdab16d5f4c58ed56134122b2292203ddb Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Tue, 12 Nov 2024 03:38:52 -0800 Subject: [PATCH] MINOR: Convert AlterReplicaLogDirsRequestTest to KRaft (#17764) Reviewers: Mickael Maison --- .../main/scala/kafka/server/KafkaBroker.scala | 2 + .../AlterReplicaLogDirsRequestTest.scala | 57 ++++++++++--------- 2 files changed, 32 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala index 22d111b896ff8..8660ced303a73 100644 --- a/core/src/main/scala/kafka/server/KafkaBroker.scala +++ b/core/src/main/scala/kafka/server/KafkaBroker.scala @@ -34,6 +34,7 @@ import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.NodeToControllerChannelManager import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector} import org.apache.kafka.server.util.Scheduler +import org.apache.kafka.storage.internals.log.LogDirFailureChannel import org.apache.kafka.storage.log.metrics.BrokerTopicStats import java.time.Duration @@ -83,6 +84,7 @@ trait KafkaBroker extends Logging { def dataPlaneRequestProcessor: KafkaApis def kafkaScheduler: Scheduler def kafkaYammerMetrics: KafkaYammerMetrics + def logDirFailureChannel: LogDirFailureChannel def logManager: LogManager def remoteLogManagerOpt: Option[RemoteLogManager] def metrics: Metrics diff --git a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala index 4b3e170191506..8e2698b0842cf 100644 --- a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala @@ -27,7 +27,8 @@ import org.apache.kafka.common.requests.{AlterReplicaLogDirsRequest, AlterReplic import org.apache.kafka.server.config.ServerLogConfigs import org.apache.kafka.storage.internals.log.LogFileUtils import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import java.util.Properties import scala.jdk.CollectionConverters._ @@ -51,12 +52,13 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { .find(p => p.partitionIndex == tp.partition).get.errorCode) } - @Test - def testAlterReplicaLogDirsRequest(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testAlterReplicaLogDirsRequest(quorum: String): Unit = { val partitionNum = 5 // Alter replica dir before topic creation - val logDir1 = new File(servers.head.config.logDirs(Random.nextInt(logDirCount))).getAbsolutePath + val logDir1 = new File(brokers.head.config.logDirs(Random.nextInt(logDirCount))).getAbsolutePath val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) @@ -64,16 +66,16 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { (0 until partitionNum).foreach { partition => val tp = new TopicPartition(topic, partition) assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) - assertTrue(servers.head.logManager.getLog(tp).isEmpty) + assertTrue(brokers.head.logManager.getLog(tp).isEmpty) } createTopic(topic, partitionNum) (0 until partitionNum).foreach { partition => - assertEquals(logDir1, servers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent) + assertEquals(logDir1, brokers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent) } // Alter replica dir again after topic creation - val logDir2 = new File(servers.head.config.logDirs(Random.nextInt(logDirCount))).getAbsolutePath + val logDir2 = new File(brokers.head.config.logDirs(Random.nextInt(logDirCount))).getAbsolutePath val partitionDirs2 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir2).toMap val alterReplicaLogDirsResponse2 = sendAlterReplicaLogDirsRequest(partitionDirs2) // The response should succeed for all partitions @@ -81,17 +83,18 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { val tp = new TopicPartition(topic, partition) assertEquals(Errors.NONE, findErrorForPartition(alterReplicaLogDirsResponse2, tp)) TestUtils.waitUntilTrue(() => { - logDir2 == servers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent + logDir2 == brokers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent }, "timed out waiting for replica movement") } } - @Test - def testAlterReplicaLogDirsRequestErrorCode(): Unit = { - val offlineDir = new File(servers.head.config.logDirs.tail.head).getAbsolutePath - val validDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath - val validDir2 = new File(servers.head.config.logDirs(2)).getAbsolutePath - val validDir3 = new File(servers.head.config.logDirs(3)).getAbsolutePath + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testAlterReplicaLogDirsRequestErrorCode(quorum: String): Unit = { + val offlineDir = new File(brokers.head.config.logDirs.tail.head).getAbsolutePath + val validDir1 = new File(brokers.head.config.logDirs(1)).getAbsolutePath + val validDir2 = new File(brokers.head.config.logDirs(2)).getAbsolutePath + val validDir3 = new File(brokers.head.config.logDirs(3)).getAbsolutePath // Test AlterReplicaDirRequest before topic creation val partitionDirs1 = mutable.Map.empty[TopicPartition, String] @@ -112,8 +115,8 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.NONE, findErrorForPartition(alterReplicaDirResponse2, new TopicPartition(topic, 1))) // Test AlterReplicaDirRequest after topic creation and log directory failure - servers.head.logDirFailureChannel.maybeAddOfflineLogDir(offlineDir, "", new java.io.IOException()) - TestUtils.waitUntilTrue(() => !servers.head.logManager.isLogDirOnline(offlineDir), s"timed out waiting for $offlineDir to be offline", 3000) + brokers.head.logDirFailureChannel.maybeAddOfflineLogDir(offlineDir, "", new java.io.IOException()) + TestUtils.waitUntilTrue(() => !brokers.head.logManager.isLogDirOnline(offlineDir), s"timed out waiting for $offlineDir to be offline", 3000) val partitionDirs3 = mutable.Map.empty[TopicPartition, String] partitionDirs3.put(new TopicPartition(topic, 0), "invalidDir") partitionDirs3.put(new TopicPartition(topic, 1), validDir3) @@ -124,19 +127,20 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } - @Test - def testAlterReplicaLogDirsRequestWithRetention(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testAlterReplicaLogDirsRequestWithRetention(quorum: String): Unit = { val partitionNum = 1 // Alter replica dir before topic creation - val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath + val logDir1 = new File(brokers.head.config.logDirs(1)).getAbsolutePath val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) // The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions val tp = new TopicPartition(topic, 0) assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) - assertTrue(servers.head.logManager.getLog(tp).isEmpty) + assertTrue(brokers.head.logManager.getLog(tp).isEmpty) val topicProperties = new Properties() topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") @@ -147,13 +151,13 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024") createTopic(topic, partitionNum, 1, topicProperties) - assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent) + assertEquals(logDir1, brokers.head.logManager.getLog(tp).get.dir.getParent) // send enough records to trigger log rolling (0 until 20).foreach { _ => - TestUtils.generateAndProduceMessages(servers, topic, 10, 1) + TestUtils.generateAndProduceMessages(brokers, topic, 10, 1) } - TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new TopicPartition(topic, 0)).get.numberOfSegments > 1, + TestUtils.waitUntilTrue(() => brokers.head.logManager.getLog(new TopicPartition(topic, 0)).get.numberOfSegments > 1, "timed out waiting for log segment to roll") // Wait for log segment retention in original dir. @@ -162,12 +166,12 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { }, "timed out waiting for log segment to retention") // Alter replica dir again after topic creation - val logDir2 = new File(servers.head.config.logDirs(2)).getAbsolutePath + val logDir2 = new File(brokers.head.config.logDirs(2)).getAbsolutePath val alterReplicaLogDirsResponse2 = sendAlterReplicaLogDirsRequest(Map(tp -> logDir2)) // The response should succeed for all partitions assertEquals(Errors.NONE, findErrorForPartition(alterReplicaLogDirsResponse2, tp)) TestUtils.waitUntilTrue(() => { - logDir2 == servers.head.logManager.getLog(tp).get.dir.getParent + logDir2 == brokers.head.logManager.getLog(tp).get.dir.getParent }, "timed out waiting for replica movement") // Make sure the deleted log segment is removed @@ -191,7 +195,6 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { val data = new AlterReplicaLogDirsRequestData() .setDirs(new AlterReplicaLogDirsRequestData.AlterReplicaLogDirCollection(logDirs.asJava.iterator)) val request = new AlterReplicaLogDirsRequest.Builder(data).build() - connectAndReceive[AlterReplicaLogDirsResponse](request, destination = controllerSocketServer) + connectAndReceive[AlterReplicaLogDirsResponse](request, anySocketServer) } - }