Skip to content

Commit

Permalink
KAFKA-7632: Support Compression Levels (KIP-390) (#15516)
Browse files Browse the repository at this point in the history
Reviewers: Jun Rao <[email protected]>,  Luke Chen <[email protected]>
Co-authored-by: Lee Dongjin <[email protected]>
  • Loading branch information
mimaison and dongjinleekr authored May 21, 2024
1 parent 9fe3932 commit affe8da
Show file tree
Hide file tree
Showing 134 changed files with 2,374 additions and 1,366 deletions.
1 change: 1 addition & 0 deletions checkstyle/import-control-group-coordinator.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.compress" />
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
Expand Down
10 changes: 7 additions & 3 deletions checkstyle/import-control-metadata.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,18 +173,22 @@
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.common.internals" />
</subpackage>
<subpackage name="migration">
<allow pkg="org.apache.kafka.controller" />
</subpackage>
<subpackage name="bootstrap">
<allow pkg="org.apache.kafka.snapshot" />
</subpackage>
<subpackage name="fault">
<allow pkg="org.apache.kafka.server.fault" />
</subpackage>
<subpackage name="migration">
<allow pkg="org.apache.kafka.controller" />
</subpackage>
<subpackage name="util">
<allow class="org.apache.kafka.common.compress.Compression" exact-match="true" />
</subpackage>
</subpackage>

<subpackage name="metalog">
<allow class="org.apache.kafka.common.compress.Compression" exact-match="true" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
Expand Down
10 changes: 8 additions & 2 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@
<allow pkg="com.github.luben.zstd" />
<allow pkg="net.jpountz.lz4" />
<allow pkg="net.jpountz.xxhash" />
<allow pkg="org.apache.kafka.common.compress" />
<allow pkg="org.xerial.snappy" />
<allow pkg="org.apache.kafka.common.compress" />
<allow class="org.apache.kafka.common.record.CompressionType" exact-match="true" />
<allow class="org.apache.kafka.common.record.RecordBatch" exact-match="true" />
</subpackage>

<subpackage name="message">
Expand Down Expand Up @@ -138,6 +140,7 @@

<subpackage name="protocol">
<allow pkg="org.apache.kafka.common.errors" />
<allow class="org.apache.kafka.common.compress.Compression" exact-match="true" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
Expand Down Expand Up @@ -166,6 +169,7 @@

<subpackage name="requests">
<allow pkg="org.apache.kafka.common.acl" />
<allow class="org.apache.kafka.common.compress.Compression" exact-match="true" />
<allow pkg="org.apache.kafka.common.feature" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.message" />
Expand Down Expand Up @@ -426,6 +430,7 @@
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.clients" />
<allow class="org.apache.kafka.common.compress.Compression" exact-match="true" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.feature" />
<allow pkg="org.apache.kafka.common.message" />
Expand All @@ -444,8 +449,9 @@
</subpackage>

<subpackage name="snapshot">
<allow pkg="org.apache.kafka.common.record" />
<allow class="org.apache.kafka.common.compress.Compression" exact-match="true" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.test"/>
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest|NetworkClientTest).java"/>

<suppress checks="BooleanExpressionComplexity"
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
files="(Utils|Topic|Lz4BlockOutputStream|AclData|JoinGroupRequest).java"/>

<suppress checks="CyclomaticComplexity"
files="(AbstractFetch|ClientTelemetryReporter|ConsumerCoordinator|CommitRequestManager|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler|MockAdminClient).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
Expand Down Expand Up @@ -250,7 +251,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private final RecordAccumulator accumulator;
private final Sender sender;
private final Thread ioThread;
private final CompressionType compressionType;
private final Compression compression;
private final Sensor errors;
private final Time time;
private final Serializer<K> keySerializer;
Expand Down Expand Up @@ -413,7 +414,7 @@ private void warnIfPartitionerDeprecated() {
Arrays.asList(this.keySerializer, this.valueSerializer));
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
this.compression = configureCompression(config);

this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
Expand All @@ -432,7 +433,7 @@ private void warnIfPartitionerDeprecated() {
int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
this.accumulator = new RecordAccumulator(logContext,
batchSize,
this.compressionType,
compression,
lingerMs(config),
retryBackoffMs,
retryBackoffMaxMs,
Expand Down Expand Up @@ -501,7 +502,7 @@ private void warnIfPartitionerDeprecated() {
this.interceptors = interceptors;
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
this.compression = configureCompression(config);
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.partitionerIgnoreKeys = config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG);
this.apiVersions = new ApiVersions();
Expand Down Expand Up @@ -548,6 +549,29 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat
apiVersions);
}

private static Compression configureCompression(ProducerConfig config) {
CompressionType type = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
switch (type) {
case GZIP: {
return Compression.gzip()
.level(config.getInt(ProducerConfig.COMPRESSION_GZIP_LEVEL_CONFIG))
.build();
}
case LZ4: {
return Compression.lz4()
.level(config.getInt(ProducerConfig.COMPRESSION_LZ4_LEVEL_CONFIG))
.build();
}
case ZSTD: {
return Compression.zstd()
.level(config.getInt(ProducerConfig.COMPRESSION_ZSTD_LEVEL_CONFIG))
.build();
}
default:
return Compression.of(type).build();
}
}

private static int lingerMs(ProducerConfig config) {
return (int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE);
}
Expand Down Expand Up @@ -1033,7 +1057,7 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
Header[] headers = record.headers().toArray();

int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
compression.type(), serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.compress.GzipCompression;
import org.apache.kafka.common.compress.Lz4Compression;
import org.apache.kafka.common.compress.ZstdCompression;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
Expand Down Expand Up @@ -225,6 +228,18 @@ public class ProducerConfig extends AbstractConfig {
+ " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, <code>lz4</code>, or <code>zstd</code>. "
+ "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).";

/** <code>compression.gzip.level</code> */
public static final String COMPRESSION_GZIP_LEVEL_CONFIG = "compression.gzip.level";
private static final String COMPRESSION_GZIP_LEVEL_DOC = "The compression level to use if " + COMPRESSION_TYPE_CONFIG + " is set to <code>gzip</code>.";

/** <code>compression.lz4.level</code> */
public static final String COMPRESSION_LZ4_LEVEL_CONFIG = "compression.lz4.level";
private static final String COMPRESSION_LZ4_LEVEL_DOC = "The compression level to use if " + COMPRESSION_TYPE_CONFIG + " is set to <code>lz4</code>.";

/** <code>compression.zstd.level</code> */
public static final String COMPRESSION_ZSTD_LEVEL_CONFIG = "compression.zstd.level";
private static final String COMPRESSION_ZSTD_LEVEL_DOC = "The compression level to use if " + COMPRESSION_TYPE_CONFIG + " is set to <code>zstd</code>.";

/** <code>metrics.sample.window.ms</code> */
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;

Expand Down Expand Up @@ -364,6 +379,9 @@ public class ProducerConfig extends AbstractConfig {
Importance.LOW,
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, CompressionType.NONE.name, in(Utils.enumOptions(CompressionType.class)), Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(COMPRESSION_GZIP_LEVEL_CONFIG, Type.INT, GzipCompression.DEFAULT_LEVEL, new GzipCompression.LevelValidator(), Importance.MEDIUM, COMPRESSION_GZIP_LEVEL_DOC)
.define(COMPRESSION_LZ4_LEVEL_CONFIG, Type.INT, Lz4Compression.DEFAULT_LEVEL, between(Lz4Compression.MIN_LEVEL, Lz4Compression.MAX_LEVEL), Importance.MEDIUM, COMPRESSION_LZ4_LEVEL_DOC)
.define(COMPRESSION_ZSTD_LEVEL_CONFIG, Type.INT, ZstdCompression.DEFAULT_LEVEL, between(ZstdCompression.MIN_LEVEL, ZstdCompression.MAX_LEVEL), Importance.MEDIUM, COMPRESSION_ZSTD_LEVEL_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
.define(PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC)
.define(PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.LOW, PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon
this.retry = false;
this.isSplitBatch = isSplitBatch;
float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(),
recordsBuilder.compressionType());
recordsBuilder.compression().type());
this.currentLeaderEpoch = OptionalInt.empty();
this.attemptsWhenLeaderLastChanged = 0;
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
Expand Down Expand Up @@ -146,7 +146,7 @@ public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value,
} else {
this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
recordsBuilder.compression().type(), key, value, headers));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp,
Expand All @@ -172,7 +172,7 @@ private boolean tryAppendForSplit(long timestamp, ByteBuffer key, ByteBuffer val
// No need to get the CRC.
this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
recordsBuilder.compression().type(), key, value, headers));
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp,
key == null ? -1 : key.remaining(),
Expand Down Expand Up @@ -377,19 +377,19 @@ public Deque<ProducerBatch> split(int splitBatchSize) {

private ProducerBatch createBatchOffAccumulatorForRecord(Record record, int batchSize) {
int initialSize = Math.max(AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), record.key(), record.value(), record.headers()), batchSize);
recordsBuilder.compression().type(), record.key(), record.value(), record.headers()), batchSize);
ByteBuffer buffer = ByteBuffer.allocate(initialSize);

// Note that we intentionally do not set producer state (producerId, epoch, sequence, and isTransactional)
// for the newly created batch. This will be set when the batch is dequeued for sending (which is consistent
// with how normal batches are handled).
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic(), recordsBuilder.compressionType(),
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic(), recordsBuilder.compression(),
TimestampType.CREATE_TIME, 0L);
return new ProducerBatch(topicPartition, builder, this.createdMs, true);
}

public boolean isCompressed() {
return recordsBuilder.compressionType() != CompressionType.NONE;
return recordsBuilder.compression().type() != CompressionType.NONE;
}

/**
Expand Down Expand Up @@ -491,7 +491,7 @@ public void close() {
recordsBuilder.close();
if (!recordsBuilder.isControlBatch()) {
CompressionRatioEstimator.updateEstimation(topicPartition.topic(),
recordsBuilder.compressionType(),
recordsBuilder.compression().type(),
(float) recordsBuilder.compressionRatio());
}
reopened = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.clients.MetadataSnapshot;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.Cluster;
Expand All @@ -48,7 +49,6 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
Expand All @@ -74,7 +74,7 @@ public class RecordAccumulator {
private final AtomicInteger flushesInProgress;
private final AtomicInteger appendsInProgress;
private final int batchSize;
private final CompressionType compression;
private final Compression compression;
private final int lingerMs;
private final ExponentialBackoff retryBackoff;
private final int deliveryTimeoutMs;
Expand Down Expand Up @@ -116,7 +116,7 @@ public class RecordAccumulator {
*/
public RecordAccumulator(LogContext logContext,
int batchSize,
CompressionType compression,
Compression compression,
int lingerMs,
long retryBackoffMs,
long retryBackoffMaxMs,
Expand Down Expand Up @@ -176,7 +176,7 @@ public RecordAccumulator(LogContext logContext,
*/
public RecordAccumulator(LogContext logContext,
int batchSize,
CompressionType compression,
Compression compression,
int lingerMs,
long retryBackoffMs,
long retryBackoffMaxMs,
Expand Down Expand Up @@ -344,7 +344,7 @@ public RecordAppendResult append(String topic,

if (buffer == null) {
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression.type(), key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
// This call may block if we exhausted buffer space.
buffer = free.allocate(size, maxTimeToBlock);
Expand Down Expand Up @@ -533,7 +533,7 @@ public int splitAndReenqueue(ProducerBatch bigBatch) {
// Reset the estimated compression ratio to the initial value or the big batch compression ratio, whichever
// is bigger. There are several different ways to do the reset. We chose the most conservative one to ensure
// the split doesn't happen too often.
CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression,
CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression.type(),
Math.max(1.0f, (float) bigBatch.compressionRatio()));
Deque<ProducerBatch> dq = bigBatch.split(this.batchSize);
int numSplitBatches = dq.size();
Expand Down
Loading

0 comments on commit affe8da

Please sign in to comment.