Skip to content

Commit

Permalink
Address 2nd batch of reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
mimaison committed May 15, 2024
1 parent 1282e3b commit bc27574
Show file tree
Hide file tree
Showing 11 changed files with 26 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -37,13 +38,13 @@ public class GzipCompressionTest {
@Test
public void testCompressionDecompression() throws IOException {
GzipCompression.Builder builder = Compression.gzip();
byte[] data = "data".getBytes(StandardCharsets.UTF_8);
byte[] data = String.join("", Collections.nCopies(256, "data")).getBytes(StandardCharsets.UTF_8);

for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, GzipCompression.DEFAULT_LEVEL, GzipCompression.MAX_LEVEL)) {
GzipCompression compression = builder.level(level).build();
ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4);
try (OutputStream out = compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) {
try (OutputStream out = compression.wrapForOutput(bufferStream, magic)) {
out.write(data);
out.flush();
}
Expand Down Expand Up @@ -77,7 +78,6 @@ public void testLevelValidator() {
validator.ensureValid("", level);
}
validator.ensureValid("", GzipCompression.DEFAULT_LEVEL);
assertThrows(ConfigException.class, () -> validator.ensureValid("", 0));
assertThrows(ConfigException.class, () -> validator.ensureValid("", GzipCompression.MIN_LEVEL - 1));
assertThrows(ConfigException.class, () -> validator.ensureValid("", GzipCompression.MAX_LEVEL + 1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;
Expand Down Expand Up @@ -85,13 +86,13 @@ public void testLz4FramingMagicV1() {
@Test
public void testCompressionDecompression() throws IOException {
Lz4Compression.Builder builder = Compression.lz4();
byte[] data = "data".getBytes(StandardCharsets.UTF_8);
byte[] data = String.join("", Collections.nCopies(256, "data")).getBytes(StandardCharsets.UTF_8);

for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
for (int level : Arrays.asList(Lz4Compression.MIN_LEVEL, Lz4Compression.DEFAULT_LEVEL, Lz4Compression.MAX_LEVEL)) {
Lz4Compression compression = builder.level(level).build();
ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4);
try (OutputStream out = compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) {
try (OutputStream out = compression.wrapForOutput(bufferStream, magic)) {
out.write(data);
out.flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -35,11 +36,11 @@ public class NoCompressionTest {
@Test
public void testCompressionDecompression() throws IOException {
NoCompression compression = Compression.NONE;
byte[] data = "data".getBytes(StandardCharsets.UTF_8);
byte[] data = String.join("", Collections.nCopies(256, "data")).getBytes(StandardCharsets.UTF_8);

for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4);
try (OutputStream out = compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) {
try (OutputStream out = compression.wrapForOutput(bufferStream, magic)) {
out.write(data);
out.flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -35,11 +36,11 @@ public class SnappyCompressionTest {
@Test
public void testCompressionDecompression() throws IOException {
SnappyCompression compression = Compression.snappy().build();
byte[] data = "data".getBytes(StandardCharsets.UTF_8);
byte[] data = String.join("", Collections.nCopies(256, "data")).getBytes(StandardCharsets.UTF_8);

for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4);
try (OutputStream out = compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) {
try (OutputStream out = compression.wrapForOutput(bufferStream, magic)) {
out.write(data);
out.flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -36,13 +37,13 @@ public class ZstdCompressionTest {
@Test
public void testCompressionDecompression() throws IOException {
ZstdCompression.Builder builder = Compression.zstd();
byte[] data = "data".getBytes(StandardCharsets.UTF_8);
byte[] data = String.join("", Collections.nCopies(256, "data")).getBytes(StandardCharsets.UTF_8);

for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
for (int level : Arrays.asList(ZstdCompression.MIN_LEVEL, ZstdCompression.DEFAULT_LEVEL, ZstdCompression.MAX_LEVEL)) {
ZstdCompression compression = builder.level(level).build();
ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4);
try (OutputStream out = compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) {
try (OutputStream out = compression.wrapForOutput(bufferStream, magic)) {
out.write(data);
out.flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Args(int bufferOffset, Compression compression, byte magic) {
public String toString() {
return "magic=" + magic +
", bufferOffset=" + bufferOffset +
", compressionType=" + compression;
", compression=" + compression;
}
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
Expand Down Expand Up @@ -790,7 +791,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
val offset = PrimitiveRef.ofLong(localLog.logEndOffset)
appendInfo.setFirstOffset(offset.value)
val validateAndOffsetAssignResult = try {
val targetCompression = config.compressionType.targetCompression(config.compression, appendInfo.sourceCompression())
val targetCompression = BrokerCompressionType.targetCompression(config.compression, appendInfo.sourceCompression())
val validator = new LogValidator(validRecords,
topicPartition,
time,
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class LogConfigTest {
case TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG => assertPropertyInvalid(name, "not_a_number", "-3")
case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG => assertPropertyInvalid(name, "not_a_number", "-3")
case TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-2")
case TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-1")
case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-0.1")

case _ => assertPropertyInvalid(name, "not_a_number", "-1")
Expand Down
8 changes: 3 additions & 5 deletions core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ class LogValidatorTest {

private def checkOnlyOneBatch(magic: Byte, sourceCompression: Compression, targetCompression: Compression): Unit = {
assertThrows(classOf[InvalidRecordException],
() => validateMessages(createTwoBatchedRecords(magic, 0L, sourceCompression), magic, sourceCompression.`type`(), targetCompression)
() => validateMessages(createTwoBatchedRecords(magic, sourceCompression), magic, sourceCompression.`type`(), targetCompression)
)
}

private def checkAllowMultiBatch(magic: Byte, sourceCompression: Compression, targetCompression: Compression): Unit = {
validateMessages(createTwoBatchedRecords(magic, 0L, sourceCompression), magic, sourceCompression.`type`(), targetCompression)
validateMessages(createTwoBatchedRecords(magic, sourceCompression), magic, sourceCompression.`type`(), targetCompression)
}

private def checkMismatchMagic(batchMagic: Byte, recordMagic: Byte, compression: Compression): Unit = {
Expand Down Expand Up @@ -1610,9 +1610,7 @@ class LogValidatorTest {
builder.build()
}

private def createTwoBatchedRecords(magicValue: Byte,
timestamp: Long,
codec: Compression): MemoryRecords = {
private def createTwoBatchedRecords(magicValue: Byte, codec: Compression): MemoryRecords = {
val buf = ByteBuffer.allocate(2048)
var builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L)
builder.append(10L, "1".getBytes(), "a".getBytes())
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ class KafkaConfigTest {
def testInvalidGzipCompressionLevel(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.setProperty(KafkaConfig.CompressionTypeProp, "gzip")
props.setProperty(KafkaConfig.CompressionGzipLevelProp, (GzipCompression.MAX_LEVEL+1).toString)
props.setProperty(KafkaConfig.CompressionGzipLevelProp, (GzipCompression.MAX_LEVEL + 1).toString)
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
}

Expand Down Expand Up @@ -942,7 +942,8 @@ class KafkaConfigTest {
case MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG => // ignore string

case KafkaConfig.CompressionGzipLevelProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case KafkaConfig.CompressionZstdLevelProp => assertPropertyInvalid(baseProperties, name, "not_a_number", ZstdCompression.MAX_LEVEL+1)
case KafkaConfig.CompressionLz4LevelProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case KafkaConfig.CompressionZstdLevelProp => assertPropertyInvalid(baseProperties, name, "not_a_number", ZstdCompression.MAX_LEVEL + 1)

case KafkaConfig.RackProp => // ignore string
//SSL Configs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public enum BrokerCompressionType {
this.name = name;
}

public Compression targetCompression(Optional<Compression> logCompression, CompressionType producerCompressionType) {
public static Compression targetCompression(Optional<Compression> logCompression, CompressionType producerCompressionType) {
return logCompression.orElseGet(() -> Compression.of(producerCompressionType).build());
}

Expand Down

0 comments on commit bc27574

Please sign in to comment.