From b6c40745898a447ca52edeffa07ac9e3023b9ae9 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sun, 23 Jun 2019 05:25:47 +0900 Subject: [PATCH] Make broker to use CompressionConfig 1. Make LogValidator#validateMessagesAndAssignOffsets to use CompressionType and CompressionConfig; not CompressionCodec. --- core/src/main/scala/kafka/log/Log.scala | 2 +- .../main/scala/kafka/log/LogValidator.scala | 16 ++-- .../unit/kafka/log/LogValidatorTest.scala | 76 +++++++++---------- 3 files changed, 47 insertions(+), 47 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 154e2b9048881..d7cf31807033e 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -933,7 +933,7 @@ class Log(@volatile var dir: File, time, now, appendInfo.sourceType, - appendInfo.targetType, + CompressionConfig.of(appendInfo.targetType), config.compact, config.messageFormatVersion.recordVersion.value, config.messageTimestampType, diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 9ecdb61dd3b8b..d01065fabadb0 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -22,7 +22,7 @@ import kafka.api.{ApiVersion, KAFKA_2_1_IV0} import kafka.common.LongRef import kafka.utils.Logging import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} -import org.apache.kafka.common.record.{AbstractRecords, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType, BufferSupplier} +import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionConfig, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} import org.apache.kafka.common.utils.Time import scala.collection.mutable @@ -50,7 +50,7 @@ private[kafka] object LogValidator extends Logging { time: Time, now: Long, sourceType: CompressionType, - targetType: CompressionType, + targetConfig: CompressionConfig, compactedTopic: Boolean, magic: Byte, timestampType: TimestampType, @@ -58,7 +58,7 @@ private[kafka] object LogValidator extends Logging { partitionLeaderEpoch: Int, isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = { - if (sourceType == CompressionType.NONE && targetType == CompressionType.NONE) { + if (sourceType == CompressionType.NONE && targetConfig.getType == CompressionType.NONE) { // check the magic value if (!records.hasMatchingMagic(magic)) convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, time, now, timestampType, @@ -68,7 +68,7 @@ private[kafka] object LogValidator extends Logging { assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient, magic) } else { - validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, time, now, sourceType, targetType, compactedTopic, + validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, time, now, sourceType, targetConfig, compactedTopic, magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient, interBrokerProtocolVersion) } } @@ -273,7 +273,7 @@ private[kafka] object LogValidator extends Logging { time: Time, now: Long, sourceType: CompressionType, - targetType: CompressionType, + targetConfig: CompressionConfig, compactedTopic: Boolean, toMagic: Byte, timestampType: TimestampType, @@ -282,12 +282,12 @@ private[kafka] object LogValidator extends Logging { isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = { - if (targetType == CompressionType.ZSTD && interBrokerProtocolVersion < KAFKA_2_1_IV0) + if (targetConfig.getType == CompressionType.ZSTD && interBrokerProtocolVersion < KAFKA_2_1_IV0) throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression") // No in place assignment situation 1 - var inPlaceAssignment = sourceType == targetType + var inPlaceAssignment = sourceType == targetConfig.getType var maxTimestamp = RecordBatch.NO_TIMESTAMP val expectedInnerOffset = new LongRef(0) @@ -354,7 +354,7 @@ private[kafka] object LogValidator extends Logging { val first = records.batches.asScala.head (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional) } - buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, targetType, now, + buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, targetConfig.getType, now, validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch, isFromClient, uncompressedSizeInBytes) } else { diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 638e57cf8e9aa..ecd385f615ca1 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -83,7 +83,7 @@ class LogValidatorTest { time, now = 0L, sourceCompressionType, - targetCompressionType, + CompressionConfig.of(targetCompressionType), compactedTopic = false, magic, TimestampType.CREATE_TIME, @@ -108,7 +108,7 @@ class LogValidatorTest { time= time, now = now, sourceType = CompressionType.NONE, - targetType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = magic, timestampType = TimestampType.LOG_APPEND_TIME, @@ -146,7 +146,7 @@ class LogValidatorTest { time = time, now = now, sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = targetMagic, timestampType = TimestampType.LOG_APPEND_TIME, @@ -188,7 +188,7 @@ class LogValidatorTest { time = time, now = now, sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = magic, timestampType = TimestampType.LOG_APPEND_TIME, @@ -246,7 +246,7 @@ class LogValidatorTest { time = time, now = time.milliseconds(), sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.LOG_APPEND_TIME, @@ -288,7 +288,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.NONE, - targetType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = magic, timestampType = TimestampType.CREATE_TIME, @@ -355,7 +355,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.NONE, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = magic, timestampType = TimestampType.CREATE_TIME, @@ -406,7 +406,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), magic = toMagic, compactedTopic = false, timestampType = TimestampType.CREATE_TIME, @@ -448,7 +448,7 @@ class LogValidatorTest { time = time, now = timestamp, sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), magic = RecordBatch.MAGIC_VALUE_V2, compactedTopic = false, timestampType = TimestampType.CREATE_TIME, @@ -503,7 +503,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), magic = magic, compactedTopic = false, timestampType = TimestampType.CREATE_TIME, @@ -554,7 +554,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.NONE, - targetType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.CREATE_TIME, @@ -575,7 +575,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.NONE, - targetType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.CREATE_TIME, @@ -596,7 +596,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), magic = RecordBatch.MAGIC_VALUE_V1, compactedTopic = false, timestampType = TimestampType.CREATE_TIME, @@ -617,7 +617,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), magic = RecordBatch.MAGIC_VALUE_V1, compactedTopic = false, timestampType = TimestampType.CREATE_TIME, @@ -637,7 +637,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.NONE, - targetType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), magic = RecordBatch.MAGIC_VALUE_V0, compactedTopic = false, timestampType = TimestampType.CREATE_TIME, @@ -657,7 +657,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V0, timestampType = TimestampType.CREATE_TIME, @@ -678,7 +678,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.NONE, - targetType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.CREATE_TIME, @@ -700,7 +700,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.NONE, - targetType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.CREATE_TIME, @@ -723,7 +723,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.CREATE_TIME, @@ -746,7 +746,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.CREATE_TIME, @@ -767,7 +767,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.NONE, - targetType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.LOG_APPEND_TIME, @@ -790,7 +790,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.NONE, - targetType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.LOG_APPEND_TIME, @@ -813,7 +813,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.LOG_APPEND_TIME, @@ -836,7 +836,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.LOG_APPEND_TIME, @@ -859,7 +859,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.NONE, - targetType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.CURRENT_MAGIC_VALUE, timestampType = TimestampType.CREATE_TIME, @@ -879,7 +879,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.NONE, - targetType = CompressionType.SNAPPY, + targetConfig = CompressionConfig.of(CompressionType.SNAPPY), compactedTopic = false, magic = RecordBatch.CURRENT_MAGIC_VALUE, timestampType = TimestampType.CREATE_TIME, @@ -904,7 +904,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.NONE, - targetType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V0, timestampType = TimestampType.CREATE_TIME, @@ -925,7 +925,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V0, timestampType = TimestampType.CREATE_TIME, @@ -945,7 +945,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.NONE, - targetType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.LOG_APPEND_TIME, @@ -965,7 +965,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.LOG_APPEND_TIME, @@ -986,7 +986,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.NONE, - targetType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.CREATE_TIME, @@ -1007,7 +1007,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.CREATE_TIME, @@ -1030,7 +1030,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.CREATE_TIME, @@ -1053,7 +1053,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.CREATE_TIME, @@ -1074,7 +1074,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.NONE, - targetType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V0, timestampType = TimestampType.CREATE_TIME, @@ -1095,7 +1095,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = CompressionType.GZIP, - targetType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V0, timestampType = TimestampType.CREATE_TIME, @@ -1120,7 +1120,7 @@ class LogValidatorTest { time= time, now = now, sourceType = CompressionType.NONE, - targetType = CompressionType.ZSTD, + targetConfig = CompressionConfig.of(CompressionType.ZSTD), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.LOG_APPEND_TIME, @@ -1155,7 +1155,7 @@ class LogValidatorTest { time = time, now = System.currentTimeMillis(), sourceType = sourceType, - targetType = targetType, + targetConfig = CompressionConfig.of(targetType), compactedTopic = false, magic = RecordBatch.CURRENT_MAGIC_VALUE, timestampType = TimestampType.CREATE_TIME,