Skip to content

Commit

Permalink
Make broker to use CompressionConfig
Browse files Browse the repository at this point in the history
1. Make LogValidator#validateMessagesAndAssignOffsets to use CompressionType and CompressionConfig; not CompressionCodec.
  • Loading branch information
dongjinleekr committed Jun 24, 2019
1 parent 3c91af6 commit b6c4074
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 47 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ class Log(@volatile var dir: File,
time,
now,
appendInfo.sourceType,
appendInfo.targetType,
CompressionConfig.of(appendInfo.targetType),
config.compact,
config.messageFormatVersion.recordVersion.value,
config.messageTimestampType,
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/kafka/log/LogValidator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.api.{ApiVersion, KAFKA_2_1_IV0}
import kafka.common.LongRef
import kafka.utils.Logging
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record.{AbstractRecords, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType, BufferSupplier}
import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionConfig, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType}
import org.apache.kafka.common.utils.Time

import scala.collection.mutable
Expand Down Expand Up @@ -50,15 +50,15 @@ private[kafka] object LogValidator extends Logging {
time: Time,
now: Long,
sourceType: CompressionType,
targetType: CompressionType,
targetConfig: CompressionConfig,
compactedTopic: Boolean,
magic: Byte,
timestampType: TimestampType,
timestampDiffMaxMs: Long,
partitionLeaderEpoch: Int,
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = {
if (sourceType == CompressionType.NONE && targetType == CompressionType.NONE) {
if (sourceType == CompressionType.NONE && targetConfig.getType == CompressionType.NONE) {
// check the magic value
if (!records.hasMatchingMagic(magic))
convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, time, now, timestampType,
Expand All @@ -68,7 +68,7 @@ private[kafka] object LogValidator extends Logging {
assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs,
partitionLeaderEpoch, isFromClient, magic)
} else {
validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, time, now, sourceType, targetType, compactedTopic,
validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, time, now, sourceType, targetConfig, compactedTopic,
magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient, interBrokerProtocolVersion)
}
}
Expand Down Expand Up @@ -273,7 +273,7 @@ private[kafka] object LogValidator extends Logging {
time: Time,
now: Long,
sourceType: CompressionType,
targetType: CompressionType,
targetConfig: CompressionConfig,
compactedTopic: Boolean,
toMagic: Byte,
timestampType: TimestampType,
Expand All @@ -282,12 +282,12 @@ private[kafka] object LogValidator extends Logging {
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = {

if (targetType == CompressionType.ZSTD && interBrokerProtocolVersion < KAFKA_2_1_IV0)
if (targetConfig.getType == CompressionType.ZSTD && interBrokerProtocolVersion < KAFKA_2_1_IV0)
throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " +
"are not allowed to use ZStandard compression")

// No in place assignment situation 1
var inPlaceAssignment = sourceType == targetType
var inPlaceAssignment = sourceType == targetConfig.getType

var maxTimestamp = RecordBatch.NO_TIMESTAMP
val expectedInnerOffset = new LongRef(0)
Expand Down Expand Up @@ -354,7 +354,7 @@ private[kafka] object LogValidator extends Logging {
val first = records.batches.asScala.head
(first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
}
buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, targetType, now,
buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, targetConfig.getType, now,
validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch, isFromClient,
uncompressedSizeInBytes)
} else {
Expand Down
Loading

0 comments on commit b6c4074

Please sign in to comment.