Skip to content

Commit

Permalink
Add CompressionConfig + expand CompressionType#wrapForOutput
Browse files Browse the repository at this point in the history
1. Add CompressionConfig: without this class, the constructor of MemoryRecordsBuilder violates parameter count limit. (it aready has 13 parameters.)
2. MemoryRecordsBuilder now uses CompressionConfig instead of CompressionType.
3. Add level, blockSize parameters to CompressionType#wrapForOutput.
4. Add compressionLevel, compressionBufferSize, compressionConfig to MemoryRecords.Builder.
  • Loading branch information
dongjinleekr committed Jun 24, 2019
1 parent b2c90b8 commit 3c91af6
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.utils.ByteBufferOutputStream;

import java.io.DataOutputStream;

/**
* This class holds all compression configurations: compression type, compression level and the size of compression buffer.
*/
public class CompressionConfig {
private final CompressionType type;
private final Integer level;
private final Integer bufferSize;

private CompressionConfig(CompressionType type, Integer level, Integer bufferSize) {
this.type = type;
this.level = level;
this.bufferSize = bufferSize;
}

public CompressionType getType() {
return type;
}

public Integer getLevel() {
return level;
}

public Integer getBufferSize() {
return bufferSize;
}

/**
* Returns an {@link DataOutputStream} that compresses given bytes into <code>output</code> {@link ByteBufferOutputStream}
* with specified <code>magic</code>.
*/
public DataOutputStream outputStream(ByteBufferOutputStream output, byte magic) {
return new DataOutputStream(type.wrapForOutput(output, magic, this.level, this.bufferSize));
}

/**
* Creates a not-compressing configuration.
*/
public static CompressionConfig none() {
return of(CompressionType.NONE);
}

/**
* Creates a configuration of specified {@link CompressionType}, default compression level, and compression buffer size.
*/
public static CompressionConfig of(CompressionType type) {
return of(type, null);
}

/**
* Creates a configuration of specified {@link CompressionType}, specified compression level, and default compression buffer size.
*/
public static CompressionConfig of(CompressionType type, Integer level) {
return of(type, level, null);
}

/**
* Creates a configuration of specified {@link CompressionType}, compression level, and compression buffer size.
*/
public static CompressionConfig of(CompressionType type, Integer level, Integer bufferSize) {
return new CompressionConfig(type, level, bufferSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
public enum CompressionType {
NONE(0, "none", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, Integer level, Integer blockSize) {
return buffer;
}

Expand All @@ -48,12 +48,12 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu

GZIP(1, "gzip", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, Integer level, Integer blockSize) {
try {
// Set input buffer (uncompressed) to 16 KB (none by default) and output buffer (compressed) to
// 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller passes a small
// number of bytes to write (potentially a single byte)
return new BufferedOutputStream(GZipOutputStream.of(buffer, null, null), 16 * 1024);
return new BufferedOutputStream(GZipOutputStream.of(buffer, level, blockSize), 16 * 1024);
} catch (Exception e) {
throw new KafkaException(e);
}
Expand All @@ -75,9 +75,14 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu

SNAPPY(2, "snappy", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, Integer level, Integer blockSize) {
try {
return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer);
// Snappy does not support compression level; so given parameter is ignored.
if (blockSize == null) {
return (OutputStream) SnappyConstructors.OUTPUT_WITHOUT_BLOCK_SIZE.invoke(buffer);
} else {
return (OutputStream) SnappyConstructors.OUTPUT_WITH_BLOCK_SIZE.invoke(buffer, blockSize);
}
} catch (Throwable e) {
throw new KafkaException(e);
}
Expand All @@ -95,9 +100,9 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu

LZ4(3, "lz4", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, Integer level, Integer blockSize) {
try {
return KafkaLZ4BlockOutputStream.of(buffer, messageVersion, null, null);
return KafkaLZ4BlockOutputStream.of(buffer, messageVersion, level, blockSize);
} catch (Throwable e) {
throw new KafkaException(e);
}
Expand All @@ -107,7 +112,7 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier,
messageVersion == RecordBatch.MAGIC_VALUE_V0);
messageVersion == RecordBatch.MAGIC_VALUE_V0);
} catch (Throwable e) {
throw new KafkaException(e);
}
Expand All @@ -116,9 +121,14 @@ public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, Buf

ZSTD(4, "zstd", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, Integer level, Integer blockSize) {
try {
return (OutputStream) ZstdConstructors.OUTPUT.invoke(buffer);
// Zstd does not support block size configuration; so given parameter is ignored.
if (level == null) {
return (OutputStream) ZstdConstructors.OUTPUT_WITHOUT_LEVEL.invoke(buffer);
} else {
return (OutputStream) ZstdConstructors.OUTPUT_WITH_LEVEL.invoke(buffer, level);
}
} catch (Throwable e) {
throw new KafkaException(e);
}
Expand All @@ -145,14 +155,17 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu
}

/**
* Wrap bufferStream with an OutputStream that will compress data with this CompressionType.
*
* Wrap bufferStream with an OutputStream that will compress data with this CompressionType with given buffer size and compression level.
* <p>
* Note: Unlike {@link #wrapForInput}, {@link #wrapForOutput} cannot take {@link ByteBuffer}s directly.
* Currently, {@link MemoryRecordsBuilder#writeDefaultBatchHeader()} and {@link MemoryRecordsBuilder#writeLegacyCompressedWrapperHeader()}
* write to the underlying buffer in the given {@link ByteBufferOutputStream} after the compressed data has been written.
* In the event that the buffer needs to be expanded while writing the data, access to the underlying buffer needs to be preserved.
*
* @param level The compression level to use. If null, it falls back to the default level.
* @param blockSize The buffer size to use during compression. If null, it falls back to the default block size.
*/
public abstract OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion);
public abstract OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion, Integer level, Integer blockSize);

/**
* Wrap buffer with an InputStream that will decompress data with this CompressionType.
Expand Down Expand Up @@ -209,15 +222,19 @@ else if (ZSTD.name.equals(name))
private static class SnappyConstructors {
static final MethodHandle INPUT = findConstructor("org.xerial.snappy.SnappyInputStream",
MethodType.methodType(void.class, InputStream.class));
static final MethodHandle OUTPUT = findConstructor("org.xerial.snappy.SnappyOutputStream",
static final MethodHandle OUTPUT_WITHOUT_BLOCK_SIZE = findConstructor("org.xerial.snappy.SnappyOutputStream",
MethodType.methodType(void.class, OutputStream.class));
static final MethodHandle OUTPUT_WITH_BLOCK_SIZE = findConstructor("org.xerial.snappy.SnappyOutputStream",
MethodType.methodType(void.class, OutputStream.class, int.class));
}

private static class ZstdConstructors {
static final MethodHandle INPUT = findConstructor("com.github.luben.zstd.ZstdInputStream",
MethodType.methodType(void.class, InputStream.class));
static final MethodHandle OUTPUT = findConstructor("com.github.luben.zstd.ZstdOutputStream",
MethodType.methodType(void.class, OutputStream.class));
MethodType.methodType(void.class, InputStream.class));
static final MethodHandle OUTPUT_WITHOUT_LEVEL = findConstructor("com.github.luben.zstd.ZstdOutputStream",
MethodType.methodType(void.class, OutputStream.class));
static final MethodHandle OUTPUT_WITH_LEVEL = findConstructor("com.github.luben.zstd.ZstdOutputStream",
MethodType.methodType(void.class, OutputStream.class, int.class));
}

private static MethodHandle findConstructor(String className, MethodType methodType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,10 @@ private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch origina
originalBatch.baseOffset() : retainedRecords.get(0).offset();

MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, magic,
originalBatch.compressionType(), timestampType, baseOffset, logAppendTime, originalBatch.producerId(),
originalBatch.producerEpoch(), originalBatch.baseSequence(), originalBatch.isTransactional(),
originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit());
CompressionConfig.of(originalBatch.compressionType()), timestampType, baseOffset, logAppendTime,
originalBatch.producerId(), originalBatch.producerEpoch(), originalBatch.baseSequence(),
originalBatch.isTransactional(), originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(),
bufferOutputStream.limit());

for (Record record : retainedRecords)
builder.append(record);
Expand Down Expand Up @@ -432,6 +433,8 @@ public static class Builder {
// attritube with default value
private byte magic = RecordBatch.CURRENT_MAGIC_VALUE;
private CompressionType compressionType = CompressionType.NONE;
private Integer compressionLevel = null;
private Integer compressionBufferSize = null;
private TimestampType timestampType = TimestampType.CREATE_TIME;
private long baseOffset = 0L;
private long logAppendTime = RecordBatch.NO_TIMESTAMP;
Expand Down Expand Up @@ -459,6 +462,23 @@ public Builder compressionType(CompressionType compressionType) {
return this;
}

public Builder compressionLevel(Integer compressionLevel) {
this.compressionLevel = compressionLevel;
return this;
}

public Builder compressionBufferSize(Integer compressionBufferSize) {
this.compressionBufferSize = compressionBufferSize;
return this;
}

public Builder compressionConfig(CompressionConfig compressionConfig) {
this.compressionType = compressionConfig.getType();
this.compressionLevel = compressionConfig.getLevel();
this.compressionBufferSize = compressionConfig.getBufferSize();
return this;
}

public Builder timestampType(TimestampType timestampType) {
this.timestampType = timestampType;

Expand Down Expand Up @@ -514,8 +534,9 @@ public MemoryRecordsBuilder build() {
}

return new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer),
magic, compressionType, timestampType, baseOffset, logAppendTime,
producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, writeLimit);
magic, CompressionConfig.of(compressionType, compressionLevel, compressionBufferSize),
timestampType, baseOffset, logAppendTime, producerId, producerEpoch, baseSequence,
isTransactional, isControlBatch, partitionLeaderEpoch, writeLimit);
}
}

Expand Down
Loading

0 comments on commit 3c91af6

Please sign in to comment.