Skip to content

Commit

Permalink
Make RecordAccumulator to use CompressionConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
dongjinleekr committed Jun 24, 2019
1 parent b6c4074 commit d116b47
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionConfig;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.serialization.Serializer;
Expand Down Expand Up @@ -395,7 +396,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
this.apiVersions = new ApiVersions();
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
CompressionConfig.of(this.compressionType),
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionConfig;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
Expand All @@ -71,7 +71,7 @@ public final class RecordAccumulator {
private final AtomicInteger flushesInProgress;
private final AtomicInteger appendsInProgress;
private final int batchSize;
private final CompressionType compression;
private final CompressionConfig compressionConfig;
private final int lingerMs;
private final long retryBackoffMs;
private final int deliveryTimeoutMs;
Expand All @@ -91,7 +91,7 @@ public final class RecordAccumulator {
*
* @param logContext The log context used for logging
* @param batchSize The size to use when allocating {@link MemoryRecords} instances
* @param compression The compression codec for the records
* @param compressionConfig The compression type/level/buffer size for the records
* @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
* sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
* latency for potentially better throughput due to more batching (and hence fewer, larger requests).
Expand All @@ -105,7 +105,7 @@ public final class RecordAccumulator {
*/
public RecordAccumulator(LogContext logContext,
int batchSize,
CompressionType compression,
CompressionConfig compressionConfig,
int lingerMs,
long retryBackoffMs,
int deliveryTimeoutMs,
Expand All @@ -121,7 +121,7 @@ public RecordAccumulator(LogContext logContext,
this.flushesInProgress = new AtomicInteger(0);
this.appendsInProgress = new AtomicInteger(0);
this.batchSize = batchSize;
this.compression = compression;
this.compressionConfig = compressionConfig;
this.lingerMs = lingerMs;
this.retryBackoffMs = retryBackoffMs;
this.deliveryTimeoutMs = deliveryTimeoutMs;
Expand Down Expand Up @@ -205,7 +205,7 @@ public RecordAppendResult append(TopicPartition tp,

// we don't have an in-progress record batch try to allocate a new batch
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compressionConfig.getType(), key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
Expand Down Expand Up @@ -242,7 +242,10 @@ private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMag
throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
"support the required message format (v2). The broker must be version 0.11 or later.");
}
return MemoryRecords.builder(buffer).magic(maxUsableMagic).compressionType(compression).build();
return MemoryRecords.builder(buffer)
.magic(maxUsableMagic)
.compressionConfig(compressionConfig)
.build();
}

/**
Expand Down Expand Up @@ -340,7 +343,7 @@ public int splitAndReenqueue(ProducerBatch bigBatch) {
// Reset the estimated compression ratio to the initial value or the big batch compression ratio, whichever
// is bigger. There are several different ways to do the reset. We chose the most conservative one to ensure
// the split doesn't happen too often.
CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression,
CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compressionConfig.getType(),
Math.max(1.0f, (float) bigBatch.compressionRatio()));
Deque<ProducerBatch> dq = bigBatch.split(this.batchSize);
int numSplitBatches = dq.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.CompressionConfig;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.DefaultRecord;
Expand Down Expand Up @@ -338,7 +339,7 @@ public void testRetryBackoff() throws Exception {
String metricGrpName = "producer-metrics";

final RecordAccumulator accum = new RecordAccumulator(logContext, batchSize,
CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, new ApiVersions(), null,
CompressionConfig.none(), lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, new ApiVersions(), null,
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));

long now = time.milliseconds();
Expand Down Expand Up @@ -707,7 +708,7 @@ public void testIdempotenceWithOldMagic() throws InterruptedException {
apiVersions.update("foobar", NodeApiVersions.create(Arrays.asList(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE,
(short) 0, (short) 2))));
RecordAccumulator accum = new RecordAccumulator(logContext, batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, new TransactionManager(),
CompressionConfig.none(), lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, new TransactionManager(),
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
}
Expand Down Expand Up @@ -1005,7 +1006,7 @@ private RecordAccumulator createTestRecordAccumulator(int deliveryTimeoutMs, int
return new RecordAccumulator(
logContext,
batchSize,
type,
CompressionConfig.of(type),
lingerMs,
retryBackoffMs,
deliveryTimeoutMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionConfig;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
Expand Down Expand Up @@ -1913,7 +1914,7 @@ private void testSplitBatchAndSend(TransactionManager txnManager,
// Set a good compression ratio.
CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
try (Metrics m = new Metrics()) {
accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.GZIP,
accumulator = new RecordAccumulator(logContext, batchSize, CompressionConfig.of(CompressionType.GZIP),
0, 0L, deliveryTimeoutMs, m, metricGrpName, time, new ApiVersions(), txnManager,
new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics"));
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Expand Down Expand Up @@ -2414,7 +2415,7 @@ private void setupWithTransactionState(TransactionManager transactionManager, bo
this.metrics = new Metrics(metricConfig, time);
BufferPool pool = (customPool == null) ? new BufferPool(totalSize, batchSize, metrics, time, metricGrpName) : customPool;

this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L,
this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionConfig.none(), 0, 0L,
deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, pool);
this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.CompressionConfig;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
Expand Down Expand Up @@ -134,7 +134,7 @@ public void setup() {
Metrics metrics = new Metrics(metricConfig, time);
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(metrics);

this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L,
this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionConfig.none(), 0, 0L,
deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager,
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
Expand Down

0 comments on commit d116b47

Please sign in to comment.