-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-7632: Support Compression Level #10826
Conversation
@ijuma @guozhangwang Could you have a look when you are free? I refined the original work with a benchmark with a real-world dataset. As you can see in the updated KIP, this option can improve the producer's performance significantly. 🙏 |
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/record/CompressionConfig.java
Outdated
Show resolved
Hide resolved
6a63961
to
baa5914
Compare
This PR implements KIP-390 but is still open and has several conflicts. Unless I hear from you by EOD Thursday, July 8 (pacific time), I'll remove it from the KIP list for 3.0 |
baa5914
to
1899e8e
Compare
@kkonstantine Here it is - I rebased it onto the latest trunk. Could anyone review this PR? 🙏 |
@tombentley do you think you'll be able to finish the review soon? If we had an approval I'd be willing to make an exception and consider this feature for inclusion to AK 3.0. |
1899e8e
to
ff1a3ec
Compare
ff1a3ec
to
46bf0ac
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR!
I've only looked at the changes in clients so far. It looks good overall, I've left a few questions. Can you also rebase on trunk?
/** <code>compression.level</code> */ | ||
public static final String COMPRESSION_LEVEL_CONFIG = "compression.level"; | ||
private static final String COMPRESSION_LEVEL_DOC = "The compression level for all data generated by the producer. The default level and valid value is up to " | ||
+ "compression.type. (<code>none</code>, <code>snappy</code>: not available. <code>gzip</code>: 1~9. <code>lz4</code>: 1~17. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we include the defaults too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the reason why @dongjinleekr didn't include those is that they may change at the compression library level.
@@ -523,6 +532,45 @@ boolean idempotenceEnabled() { | |||
return userConfiguredTransactions || idempotenceEnabled; | |||
} | |||
|
|||
public CompressionConfig getCompressionConfig(CompressionType compressionType) { | |||
if (getString(ProducerConfig.COMPRESSION_LEVEL_CONFIG).isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since only 3 cases use the compression level, what about moving the if/else inside each case block?
if (getString(ProducerConfig.COMPRESSION_LEVEL_CONFIG).isEmpty()) { | |
public CompressionConfig getCompressionConfig(CompressionType compressionType) { | |
String compressionLevel = getString(ProducerConfig.COMPRESSION_LEVEL_CONFIG); | |
switch (compressionType) { | |
case NONE: | |
return CompressionConfig.NONE; | |
case GZIP: | |
if (compressionLevel.isEmpty()) | |
return CompressionConfig.gzip().build(); | |
else | |
return CompressionConfig.gzip().setLevel(Integer.parseInt(compressionLevel)).build(); | |
case SNAPPY: | |
return CompressionConfig.snappy().build(); | |
case LZ4: | |
if (compressionLevel.isEmpty()) | |
return CompressionConfig.lz4().build(); | |
else | |
return CompressionConfig.lz4().setLevel(Integer.parseInt(compressionLevel)).build(); | |
case ZSTD: | |
if (compressionLevel.isEmpty()) | |
return CompressionConfig.zstd().build(); | |
else | |
return CompressionConfig.zstd().setLevel(Integer.parseInt(compressionLevel)).build(); | |
default: | |
throw new IllegalArgumentException("Unknown compression type: " + compressionType.name); | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we push some of these things to the enum itself? CompressionType.config(String level)
or something like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why I did not choose a simple approach like CompressionType.config(String level)
(as @ijuma pointed out) here is simple: To make room for supporting per-codec configuration in the future, like KIP-780.
The reason why I chose to use compression.level
without default value (as @mimaison pointed out) is also simple: at the point of when KIP-390 is passed, the proposal included global configuration option (i.e., compression.level
). But, as I work on KIP-780 and running an in-house preview, I found that per-codec configuration (i.e., compression.{gzip|lz4|zstd}.level
) is much more preferable for the following reasons:
1. Easy to switch the compression type
For example, {compression.type=zstd,compression.level=10}
works but {compression.type=gzip,compression.level=10}
does not, since gzip only supports the level of 1 to 9. In other words, compression.type
is so error-prone when the user tries to switch the codec. (@junrao indicated a similar problem: #1 #2)
If we limit the scope of per-codec options like compression.gzip.level
, we can easily switch the whole options by changing compression.type
only. Moreover, this approach is consistent with additional compression options like compression.gzip.buffer
.
2. Can provide a default level.
As @ijuma pointed out, we can not provide a default compression level with compression.level
; but with compression.{gzip|lz4|zstd}.level
, we can.
For these reasons, I run the overall benchmarks in here with per-config options (see 'Final Notes' section.) If the reviewers prefer this scheme, I will happily change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dongjinleekr for the explanations. My suggested change was logically identical, it's just a bit shorter.
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockOutputStream.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/compress/ZstdConfig.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockOutputStream.java
Show resolved
Hide resolved
@mimaison fyi, I think we need more votes in the KIP thread. |
As far as I can tell this is implementing KIP-390 and it received 3 binding votes: https://lists.apache.org/thread/wnd6ky3kjzv9gkzc39qy5gg2pp20ovs1 |
@mimaison yeah, I just edited my message. :) |
clients/src/main/java/org/apache/kafka/common/compress/GzipConfig.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/compress/GzipOutputStream.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjinleekr : Thanks for the PR and sorry for the late review. Made a pass of all non-testing files. A few comments below.
@@ -312,6 +320,7 @@ public class ProducerConfig extends AbstractConfig { | |||
Importance.LOW, | |||
ACKS_DOC) | |||
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) | |||
.define(COMPRESSION_LEVEL_CONFIG, Type.STRING, "", Importance.MEDIUM, COMPRESSION_LEVEL_DOC) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The KIP says the default value is null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you kindly review the discussion here?
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
Outdated
Show resolved
Hide resolved
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kafka.common.record; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, it seems this class should be in common.compress package instead of here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first I tried so, but it has the following problems:
- Consistency with
org.apache.kafka.common.record.CompressionType
. CompressionConfig
usesorg.apache.kafka.common.record.(MemoryRecordsBuilder|RecordBatch)
but only the imports fromorg.apache.kafka.common.record
toorg.apache.kafka.common.compress
are allowed.
clients/src/main/java/org/apache/kafka/common/record/CompressionConfig.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/compress/GzipOutputStream.java
Show resolved
Hide resolved
@@ -96,6 +97,7 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String] | |||
val uncleanLeaderElectionEnable = getBoolean(LogConfig.UncleanLeaderElectionEnableProp) | |||
val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp) | |||
val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase(Locale.ROOT) | |||
val compressionLevel = getString(LogConfig.CompressionLevelProp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With topic level overriding, things can be a bit tricky. Suppose that a user sets up a broker level compression type and compression level, but only overrides the compression type for a topic. With this logic, the topic will pick up the broker level compression level, which could be intended for a different compression type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto: here
/** | ||
* Returns per-codec [[CompressionConfig]] for given `compressionType`. | ||
*/ | ||
private def getCompressionConfig(compressionType: CompressionType): CompressionConfig = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing is that if we set a topic or broker level compression type to be the same as the producer, but with a different compression level, whether we should trigger broker side recompression. Currently, we don't do recompression. Should we do it? If so, is it possible to know the producer compression level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we can't trigger broker side recompression in these scenario since there is no field for compression level in RecordBatch
. (Oh, this should be also addressed in the KIP.)
raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
Outdated
Show resolved
Hide resolved
…eConfig, GzipConfig, SnappyConfig, LZ4Config, ZstdConfig. - Now, each compression codec can have independent config entries. - Make MemoryRecordsBuilder, KafkaRaftClient, SnapshotWriter to use CompressionConfig instead of CompressionType. - Make LogAppendInfo to use (CompressionType, CompressionConfig) for compression source and target. - Add ProducerConfig#getCompressionConfig: parses compression configs and returns per-codec CompressionConfig from given configurations. - Add LogConfig#getCompressionConfig: parses compression configs and returns per-codec CompressionConfig from given configurations. - Separate CompressedRecordBatchValidationBenchmark into [Gzip, Snappy, LZ4, Zstd]CompressedRecordBatchValidationBenchmark
1. Remove unused constructor in KafkaLZ4BlockOutputStream. (and now it is used for the compression level instead.) 2. Add KafkaLZ4BlockOutputStream.[MIN_COMPRESSION_LEVEL, MAX_COMPRESSION_LEVEL, DEFAULT_COMPRESSION_LEVEL] (from LZ4Factory#highCompressor and LZ4Constants.[DEFAULT_COMPRESSION_LEVEL, MAX_COMPRESSION_LEVEL]
- Change scope of 'ProducerConfig#getCompressionConfig(CompressionType)' into package. - Revert import order + Fix parameter name in RecordAccumulator. - Add Javadoc params to CompressionConfig#wrapForInput. - Fix Javadoc param in RecordsSnapshotWriter. - Add '()' to 'CompressionCodec.Builder#build' call in scala.
46bf0ac
to
2c96abf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dongjinleekr for the updates. I've made another pass and left a few minor comments.
@@ -523,6 +532,45 @@ boolean idempotenceEnabled() { | |||
return userConfiguredTransactions || idempotenceEnabled; | |||
} | |||
|
|||
public CompressionConfig getCompressionConfig(CompressionType compressionType) { | |||
if (getString(ProducerConfig.COMPRESSION_LEVEL_CONFIG).isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dongjinleekr for the explanations. My suggested change was logically identical, it's just a bit shorter.
* <code>compression.level</code> | ||
*/ | ||
public static final String COMPRESSION_LEVEL_CONFIG = "compression.level"; | ||
public static final String COMPRESSION_LEVEL_DOC = "The compression level for all data generated by the producer. The default level and valid value is up to " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about:
The compression level to use when compressing records broker side. This is only used if compression.type is set to gzip, lz4 or zstd. The default level and valid value is up to...
@@ -289,8 +290,7 @@ private void testAppendLargeOldMessageFormat(CompressionType compressionType) th | |||
public void testLinger() throws Exception { | |||
int lingerMs = 10; | |||
RecordAccumulator accum = createTestRecordAccumulator( | |||
1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, lingerMs); | |||
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need this line for the test to pass?
private int level = DEFAULT_COMPRESSION_LEVEL; | ||
|
||
public Builder level(int level) { | ||
if (MAX_COMPRESSION_LEVEL < level) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also check the min level?
|
||
public Builder level(int level) { | ||
if (level < Lz4OutputStream.MIN_COMPRESSION_LEVEL || Lz4OutputStream.MAX_COMPRESSION_LEVEL < level) { | ||
throw new IllegalArgumentException("lz4 doesn't support given compression level: " + level); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we include the min and max values in the error message?
for (boolean close : Arrays.asList(false, true)) | ||
arguments.add(Arguments.of(new Args(broken, ignore, blockChecksum, close, payload))); | ||
// Available levels: 1, 2, ... 17. | ||
for (int compressionLevel = 1; compressionLevel < 18; ++compressionLevel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use Lz4OutputStream.MIN_COMPRESSION_LEVEL
and MAX_COMPRESSION_LEVEL
rather than hardcoding the levels?
val rand = new Random | ||
rand.nextBytes(buffer.array) | ||
val numMessages = bufferSize / (messageSize + Records.LOG_OVERHEAD) | ||
val createTime = System.currentTimeMillis | ||
val properties = new Properties() | ||
properties ++= extraProps | ||
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right this is only created in order to call getCompressionConfig
. For a second I was wondering where localhost:9000
came from.
Hello @dongjinleekr! What is the current state of this pull request? |
Hi, I was wondering if this PR would be prioritized in the foreseeable future given there has been a lot of work done already |
One discussion of the PR is around choosing the default value for |
Yes it would be good to get this over the line. @dongjinleekr Do you think you'll have time to update it? or would you need some help? |
I updated the KIP based on Jun's suggestion and sent an update to the VOTE thread. |
I opened #15516 to implement this KIP. |
Closing this PR since #15516 has been merged. |
@junrao @mimaison @clolov @ozturkberkay I greatly appreciate you finalizing this PR! After changing jobs, I could not get involved with the Kafka community since I had to focus on our in-house fork, aka Navercorp (@naver) Kafka. Now, I just restarted KIP-780 and am testing it with our in-house tiered storage plugin. As soon as I complete the work, I will re-open the PR. Thank you again, and stay tuned! 🙇♂️ |
Since I reworked KIP-390 from scratch, here I open a new PR.
Committer Checklist (excluded from commit message)